From 46b04dd09a09a2249ef35c4cf7ab0235386a9dd7 Mon Sep 17 00:00:00 2001 From: aheadley Date: Fri, 16 Mar 2012 22:26:16 -0400 Subject: [PATCH] implement observer model --- overviewer.py | 29 +---- overviewer_core/dispatcher.py | 233 ++++++++++++++++++++++++---------- overviewer_core/tileset.py | 63 ++++----- 3 files changed, 205 insertions(+), 120 deletions(-) diff --git a/overviewer.py b/overviewer.py index 0df2e0c..798e597 100755 --- a/overviewer.py +++ b/overviewer.py @@ -393,32 +393,11 @@ dir but you forgot to put quotes around the directory, since it contains spaces. # multiprocessing dispatcher if config['processes'] == 1: - dispatch = dispatcher.Dispatcher() + dispatch = dispatcher.Dispatcher(observer=dispatcher.ProgressBarObserver) else: - dispatch = dispatcher.MultiprocessingDispatcher(local_procs=config['processes']) - last_status_print = time.time() - def print_status(phase, completed, total): - # phase is ignored. it's always zero? - if (total == 0): - percent = 100 - logging.info("Rendered %d of %d tiles. %d%% complete", completed, total, percent) - elif total == None: - logging.info("Rendered %d tiles.", completed) - else: - percent = int(100* completed/total) - logging.info("Rendered %d of %d. %d%% complete", completed, total, percent) - - def update_pbar(phase, completed, total): - if total is None or total == 0: - print_status(phase, completed, total) - else: - pbar = progressbar.ProgressBar( - widgets=['Rendering: ', progressbar.FractionWidget(), ' (', - progressbar.Percentage(), ') ', - progressbar.Bar(left='[', right=']'), ' ', progressbar.ETA()], - maxval=total).start().update(completed) - - dispatch.render_all(tilesets, update_pbar) + dispatch = dispatcher.MultiprocessingDispatcher(local_procs=config['processes'], + observer=dispatcher.ProgressBarObserver) + dispatch.render_all(tilesets) dispatch.close() assetMrg.finalize(tilesets) diff --git a/overviewer_core/dispatcher.py b/overviewer_core/dispatcher.py index 9823eef..5d35835 100644 --- a/overviewer_core/dispatcher.py +++ b/overviewer_core/dispatcher.py @@ -20,6 +20,8 @@ import cPickle as pickle import Queue import time import logging +import progressbar +import sys from signals import Signal @@ -30,17 +32,19 @@ class Dispatcher(object): possible to create a Dispatcher that distributes this work to many worker processes. """ - def __init__(self): + def __init__(self, observer=None): super(Dispatcher, self).__init__() - + # list of (tileset, workitem) tuples # keeps track of dispatched but unfinished jobs self._running_jobs = [] # list of (tileset, workitem, dependencies) tuples # keeps track of jobs waiting to run after dependencies finish self._pending_jobs = [] - - def render_all(self, tilesetlist, status_callback): + + self.observer_type = observer or LoggingObserver + + def render_all(self, tilesetlist): """Render all of the tilesets in the given tilesetlist. status_callback is called periodically to update status. The callback should take the following arguments: @@ -48,10 +52,10 @@ class Dispatcher(object): be none if there is no useful estimate. """ # TODO use status callback - + # setup tilesetlist self.setup_tilesets(tilesetlist) - + # iterate through all possible phases num_phases = [tileset.get_num_phases() for tileset in tilesetlist] for phase in xrange(max(num_phases)): @@ -62,7 +66,7 @@ class Dispatcher(object): def make_work_iterator(tset, p): return ((tset, workitem) for workitem in tset.iterate_work_items(p)) work_iterators.append(make_work_iterator(tileset, phase)) - + # keep track of total jobs, and how many jobs are done total_jobs = 0 for tileset, phases in zip(tilesetlist, num_phases): @@ -75,21 +79,33 @@ class Dispatcher(object): else: total_jobs += jobs_for_tileset finished_jobs = 0 - + + self.observer = self.observer_type(total_jobs) + # do the first status update - self._status_update(status_callback, phase, finished_jobs, total_jobs, force=True) - + #self._status_update(status_callback, phase, finished_jobs, total_jobs, force=True) + self.observer.start() + self._status_update(phase, finished_jobs, True) + # go through these iterators round-robin style for tileset, (workitem, deps) in util.roundrobin(work_iterators): self._pending_jobs.append((tileset, workitem, deps)) finished_jobs += self._dispatch_jobs() - self._status_update(status_callback, phase, finished_jobs, total_jobs) - + self._status_update(phase, finished_jobs) + # after each phase, wait for the work to finish while len(self._pending_jobs) > 0 or len(self._running_jobs) > 0: finished_jobs += self._dispatch_jobs() - self._status_update(status_callback, phase, finished_jobs, total_jobs) - + self._status_update(phase, finished_jobs) + + self.observer.finish() + + def _status_update(self, phase, completed, force=False): + if force or completed - self.observer.get_current_value() > \ + self.observer.MIN_UPDATE_INTERVAL: + self.observer.update(completed) + + """ def _status_update(self, callback, phase, completed, total, force=False): # always called with force=True at the beginning, so that can # be used to set up state. After that, it is called after @@ -101,26 +117,27 @@ class Dispatcher(object): if callback: callback(phase, completed, total) return - + if callback is None: return - + update_interval = 100 # XXX arbitrary if self._last_status_update < 0 or completed >= self._last_status_update + update_interval or completed < self._last_status_update: self._last_status_update = completed callback(phase, completed, total) - + """ + def _dispatch_jobs(self): # helper function to dispatch pending jobs when their # dependencies are met, and to manage self._running_jobs dispatched_jobs = [] finished_jobs = [] - + pending_jobs_nodeps = [(j[0], j[1]) for j in self._pending_jobs] - + for pending_job in self._pending_jobs: tileset, workitem, deps = pending_job - + # see if any of the deps are in _running_jobs or _pending_jobs for dep in deps: if (tileset, dep) in self._running_jobs or (tileset, dep) in pending_jobs_nodeps: @@ -131,33 +148,33 @@ class Dispatcher(object): finished_jobs += self.dispatch(tileset, workitem) self._running_jobs.append((tileset, workitem)) dispatched_jobs.append(pending_job) - + # make sure to at least get finished jobs, even if we don't # submit any new ones... if len(dispatched_jobs) == 0: finished_jobs += self.dispatch(None, None) - + # clean out the appropriate lists for job in finished_jobs: self._running_jobs.remove(job) for job in dispatched_jobs: self._pending_jobs.remove(job) - + return len(finished_jobs) - + def close(self): """Close the Dispatcher. This should be called when you are done with the dispatcher, to ensure that it cleans up any processes or connections it may still have around. """ pass - + def setup_tilesets(self, tilesetlist): """Called whenever a new list of tilesets are being used. This lets subclasses distribute the whole list at once, instead of for each work item.""" pass - + def dispatch(self, tileset, workitem): """Dispatch the given work item. The end result of this call should be running tileset.do_work(workitem) somewhere. This @@ -176,31 +193,31 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): workers access to the current tileset list. """ def _get_job_queue(self): - return self.job_queue + return self.job_queue def _get_results_queue(self): - return self.result_queue + return self.result_queue def _get_signal_queue(self): - return self.signal_queue + return self.signal_queue def _get_tileset_data(self): - return self.tileset_data + return self.tileset_data def __init__(self, address=None, authkey=None): self.job_queue = multiprocessing.Queue() self.result_queue = multiprocessing.Queue() self.signal_queue = multiprocessing.Queue() - + self.tilesets = [] self.tileset_version = 0 self.tileset_data = [[], 0] - + self.register("get_job_queue", callable=self._get_job_queue) self.register("get_result_queue", callable=self._get_results_queue) self.register("get_signal_queue", callable=self._get_signal_queue) self.register("get_tileset_data", callable=self._get_tileset_data, proxytype=multiprocessing.managers.ListProxy) - + super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey) - - @classmethod + + @classmethod def from_address(cls, address, authkey, serializer): "Required to be implemented to make multiprocessing happy" c = cls(address=address, authkey=authkey) @@ -218,7 +235,7 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): data = self.get_tileset_data() data[0] = self.tilesets data[1] = self.tileset_version - + class MultiprocessingDispatcherProcess(multiprocessing.Process): """This class represents a single worker process. It is created @@ -236,13 +253,13 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): self.result_queue = manager.get_result_queue() self.signal_queue = manager.get_signal_queue() self.tileset_proxy = manager.get_tileset_data() - + def update_tilesets(self): """A convenience function to update our local tilesets to the current version in use by the MultiprocessingDispatcher. """ self.tilesets, self.tileset_version = self.tileset_proxy._getvalue() - + def run(self): """The main work loop. Jobs are pulled from the job queue and executed, then the result is pushed onto the result @@ -252,10 +269,10 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): """ # per-process job get() timeout timeout = 1.0 - + # update our tilesets self.update_tilesets() - + # register for all available signals def register_signal(name, sig): def handler(*args, **kwargs): @@ -263,7 +280,7 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): sig.set_interceptor(handler) for name, sig in Signal.signals.iteritems(): register_signal(name, sig) - + # notify that we're starting up self.result_queue.put(None, False) while True: @@ -272,15 +289,15 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): if job == None: # this is a end-of-jobs sentinel return - + # unpack job tv, ti, workitem = job - + if tv != self.tileset_version: # our tilesets changed! self.update_tilesets() assert tv == self.tileset_version - + # do job ret = self.tilesets[ti].do_work(workitem) result = (ti, workitem, ret,) @@ -292,18 +309,18 @@ class MultiprocessingDispatcher(Dispatcher): """A subclass of Dispatcher that spawns worker processes and distributes jobs to them to speed up processing. """ - def __init__(self, local_procs=-1, address=None, authkey=None): + def __init__(self, local_procs=-1, address=None, authkey=None, observer=None): """Creates the dispatcher. local_procs should be the number of worker processes to spawn. If it's omitted (or negative) the number of available CPUs is used instead. """ - super(MultiprocessingDispatcher, self).__init__() - + super(MultiprocessingDispatcher, self).__init__(observer=observer) + # automatic local_procs handling if local_procs < 0: local_procs = multiprocessing.cpu_count() self.local_procs = local_procs - + self.outstanding_jobs = 0 self.num_workers = 0 self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey) @@ -311,63 +328,63 @@ class MultiprocessingDispatcher(Dispatcher): self.job_queue = self.manager.get_job_queue() self.result_queue = self.manager.get_result_queue() self.signal_queue = self.manager.get_signal_queue() - + # create and fill the pool self.pool = [] for i in xrange(self.local_procs): proc = MultiprocessingDispatcherProcess(self.manager) proc.start() self.pool.append(proc) - + def close(self): # empty the queue self._handle_messages(timeout=0.0) while self.outstanding_jobs > 0: self._handle_messages() - + # send of the end-of-jobs sentinel for p in xrange(self.num_workers): self.job_queue.put(None, False) - + # TODO better way to be sure worker processes get the message time.sleep(1) - + # and close the manager self.manager.shutdown() self.manager = None self.pool = None - + def setup_tilesets(self, tilesets): self.manager.set_tilesets(tilesets) - + def dispatch(self, tileset, workitem): # handle the no-new-work case if tileset is None: return self._handle_messages() - + # create and submit the job tileset_index = self.manager.tilesets.index(tileset) self.job_queue.put((self.manager.tileset_version, tileset_index, workitem), False) self.outstanding_jobs += 1 - + # make sure the queue doesn't fill up too much finished_jobs = self._handle_messages(timeout=0.0) while self.outstanding_jobs > self.num_workers * 10: finished_jobs += self._handle_messages() return finished_jobs - + def _handle_messages(self, timeout=0.01): # work function: takes results out of the result queue and # keeps track of how many outstanding jobs remain finished_jobs = [] - + result_empty = False signal_empty = False while not (result_empty and signal_empty): - if not result_empty: + if not result_empty: try: result = self.result_queue.get(False) - + if result != None: # completed job ti, workitem, ret = result @@ -386,14 +403,14 @@ class MultiprocessingDispatcher(Dispatcher): name, args, kwargs = self.signal_queue.get(False) # timeout should only apply once timeout = 0.0 - + sig = Signal.signals[name] sig.emit_intercepted(*args, **kwargs) except Queue.Empty: signal_empty = True - + return finished_jobs - + @classmethod def start_manual_process(cls, address, authkey): """A convenience method to start up a manual process, possibly @@ -405,3 +422,91 @@ class MultiprocessingDispatcher(Dispatcher): m.connect() p = MultiprocessingDispatcherProcess(m) p.run() + +class Observer(object): + """ + """ + + MIN_UPDATE_INTERVAL = 100 + + def __init__(self, max_value): + self._current_value = None + self._max_value = max_value + self.start_time = None + self.end_time = None + + def start(self): + self.start_time = time.time() + self.update(0) + return self + + def is_started(self): + return self.start_time is not None + + def finish(self): + self.end_time = time.time() + + def is_finished(self): + return self.end_time is not None + + def is_running(self): + return self.is_started() and not self.is_finished() + + def add(self, amount): + self.update(self.get_current_value() + amount) + + def update(self, current_value): + """ + """ + self._current_value = current_value + + def get_percentage(self): + if self.get_max_value() is 0: + return 100.0 + else: + return self.get_current_value() * 100.0 / self.get_max_value() + + def get_current_value(self): + return self._current_value + + def get_max_value(self): + return self._max_value + +class LoggingObserver(Observer): + """ + """ + def update(self, current_value): + super(LoggingObserver, self).update(current_value) + if self.get_max_value() is None: + logging.info("Rendered %d tiles.", self.get_current_value()) + else: + logging.info("Rendered %d of %d. %d%% complete", + self.get_current_value(), self.get_max_value(), + self.get_percentage()) + +default_widgets = ['Rendering: ', progressbar.FractionWidget(), ' (', + progressbar.Percentage(), ') ', progressbar.Bar(left='[', right=']'), + ' ', progressbar.ETA()] +class ProgressBarObserver(progressbar.ProgressBar): + """ + """ + + MIN_UPDATE_INTERVAL = 100 + + def __init__(self, max_value, widgets=default_widgets, term_width=None, + fd=sys.stderr): + super(ProgressBarObserver, self).__init__(maxval=max_value, + widgets=widgets, term_width=term_width, fd=fd) + + def is_started(self): + return self.start_time is not None + + def finish(self): + self._end_time = time.time() + super(ProgressBarObserver, self).finish() + + def get_current_value(self): + return self.currval + + def get_max_value(self): + return self.maxval diff --git a/overviewer_core/tileset.py b/overviewer_core/tileset.py index 8f78c97..a22358e 100644 --- a/overviewer_core/tileset.py +++ b/overviewer_core/tileset.py @@ -99,10 +99,10 @@ Bounds = namedtuple("Bounds", ("mincol", "maxcol", "minrow", "maxrow")) # 0 # Only render tiles that have chunks with a greater mtime than the last # render timestamp, and their ancestors. -# +# # In other words, only renders parts of the map that have changed since # last render, nothing more, nothing less. -# +# # This is the fastest option, but will not detect tiles that have e.g. # been deleted from the directory tree, or pick up where a partial # interrupted render left off. @@ -110,10 +110,10 @@ Bounds = namedtuple("Bounds", ("mincol", "maxcol", "minrow", "maxrow")) # 1 # For render-tiles, render all whose chunks have an mtime greater than # the mtime of the tile on disk, and their composite-tile ancestors. -# +# # Also check all other composite-tiles and render any that have children # with more rencent mtimes than itself. -# +# # This is slower due to stat calls to determine tile mtimes, but safe if # the last render was interrupted. @@ -179,7 +179,7 @@ class TileSet(object): outputdir is the absolute path to the tile output directory where the tiles are saved. It is created if it doesn't exist - + Current valid options for the options dictionary are shown below. All the options must be specified unless they are not relevant. If the given options do not conform to the specifications, behavior is @@ -199,10 +199,10 @@ class TileSet(object): 0 Only render tiles that have chunks with a greater mtime than the last render timestamp, and their ancestors. - + In other words, only renders parts of the map that have changed since last render, nothing more, nothing less. - + This is the fastest option, but will not detect tiles that have e.g. been deleted from the directory tree, or pick up where a partial interrupted render left off. @@ -211,13 +211,13 @@ class TileSet(object): "check-tiles" mode. For render-tiles, render all whose chunks have an mtime greater than the mtime of the tile on disk, and their upper-tile ancestors. - + Also check all other upper-tiles and render any that have children with more rencent mtimes than itself. Also remove tiles and directory trees that do exist but shouldn't. - + This is slower due to stat calls to determine tile mtimes, but safe if the last render was interrupted. @@ -246,7 +246,7 @@ class TileSet(object): rendermode Perhaps the most important/relevant option: a string indicating the render mode to render. This rendermode must have already been - registered with the C extension module. + registered with the C extension module. rerenderprob A floating point number between 0 and 1 indicating the probability @@ -388,7 +388,7 @@ class TileSet(object): """ return 1 - + def get_phase_length(self, phase): """Returns the number of work items in a given phase, or None if there is no good estimate. @@ -396,7 +396,8 @@ class TileSet(object): # Yeah functional programming! return { 0: lambda: self.dirtytree.count_all(), - 1: lambda: None, + #there is no good way to guess this so just give total count + 1: lambda: (4**(self.treedepth+1)-1)/3, 2: lambda: self.dirtytree.count_all(), }[self.options['renderchecks']]() @@ -513,7 +514,7 @@ class TileSet(object): path = self.options.get('name'), base = '', bgcolor = bgcolorformat(self.options.get('bgcolor')), - world = self.options.get('worldname_orig') + + world = self.options.get('worldname_orig') + (" - " + self.options.get('dimension') if self.options.get('dimension') != 'default' else ''), last_rendertime = self.max_chunk_mtime, imgextension = self.imgextension, @@ -585,7 +586,7 @@ class TileSet(object): curdepth = self.config['zoomLevels'] except KeyError: return - + if curdepth == 1: # Skip a depth 1 tree. A depth 1 tree pretty much can't happen, so # when we detect this it usually means the tree is actually empty @@ -725,7 +726,7 @@ class TileSet(object): if chunkmtime > max_chunk_mtime: max_chunk_mtime = chunkmtime - + # Convert to diagonal coordinates chunkcol, chunkrow = convert_coords(chunkx, chunkz) @@ -773,7 +774,7 @@ class TileSet(object): dirty.add(tile.path) t = int(time.time()-stime) - logging.debug("Finished chunk scan for %s. %s chunks scanned in %s second%s", + logging.debug("Finished chunk scan for %s. %s chunks scanned in %s second%s", self.options['name'], chunkcount, t, "s" if t != 1 else "") @@ -843,10 +844,10 @@ class TileSet(object): # Create the actual image now img = Image.new("RGBA", (384, 384), self.options['bgcolor']) - + # we'll use paste (NOT alpha_over) for quadtree generation because # this is just straight image stitching, not alpha blending - + for path in quadPath_filtered: try: quad = Image.open(path[1]).resize((192,192), Image.ANTIALIAS) @@ -865,7 +866,7 @@ class TileSet(object): img.save(tmppath, "jpeg", quality=self.options['imgquality'], subsampling=0) else: # png img.save(tmppath, "png") - + if self.options['optimizeimg']: optimize_image(tmppath, imgformat, self.options['optimizeimg']) @@ -953,7 +954,7 @@ class TileSet(object): ## Which chunk this is: #draw.text((96,48), "C: %s,%s" % (chunkx, chunkz), fill='red') #draw.text((96,96), "c,r: %s,%s" % (col, row), fill='red') - + # Save them with FileReplacer(imgpath) as tmppath: if self.imgextension == 'jpg': @@ -1006,7 +1007,7 @@ class TileSet(object): if e.errno != errno.ENOENT: raise tile_mtime = 0 - + max_chunk_mtime = max(c[5] for c in get_chunks_by_tile(tileobj, self.regionset)) if tile_mtime > 120 + max_chunk_mtime: @@ -1032,7 +1033,7 @@ class TileSet(object): # This doesn't need rendering. Return mtime to parent in case # its mtime is less, indicating the parent DOES need a render yield path, max_chunk_mtime, False - + else: # A composite-tile. render_me = False @@ -1125,7 +1126,7 @@ def convert_coords(chunkx, chunkz): """Takes a coordinate (chunkx, chunkz) where chunkx and chunkz are in the chunk coordinate system, and figures out the row and column in the image each one should be. Returns (col, row).""" - + # columns are determined by the sum of the chunk coords, rows are the # difference # change this function, and you MUST change unconvert_coords @@ -1133,7 +1134,7 @@ def convert_coords(chunkx, chunkz): def unconvert_coords(col, row): """Undoes what convert_coords does. Returns (chunkx, chunkz).""" - + # col + row = chunkz + chunkz => (col + row)/2 = chunkz # col - row = chunkx + chunkx => (col - row)/2 = chunkx return ((col - row) / 2, (col + row) / 2) @@ -1147,7 +1148,7 @@ def unconvert_coords(col, row): def get_tiles_by_chunk(chunkcol, chunkrow): """For the given chunk, returns an iterator over Render Tiles that this chunk touches. Iterates over (tilecol, tilerow) - + """ # find tile coordinates. Remember tiles are identified by the # address of the chunk in their upper left corner. @@ -1178,7 +1179,7 @@ def get_chunks_by_tile(tile, regionset): This function is expected to return the chunk sections in the correct order for rendering, i.e. back to front. - + Returns an iterator over chunks tuples where each item is (col, row, chunkx, chunky, chunkz, mtime) """ @@ -1229,7 +1230,7 @@ class RendertileSet(object): """This object holds a set of render-tiles using a quadtree data structure. It is typically used to hold tiles that need rendering. This implementation collapses subtrees that are completely in or out of the set to save memory. - + Each instance of this class is a node in the tree, and therefore each instance is the root of a subtree. @@ -1242,7 +1243,7 @@ class RendertileSet(object): level; level 1 nodes keep track of leaf image state. Level 2 nodes keep track of level 1 state, and so fourth. - + """ __slots__ = ("depth", "children") def __init__(self, depth): @@ -1314,10 +1315,10 @@ class RendertileSet(object): def add(self, path): """Marks the requested leaf node as in this set - + Path is an iterable of integers representing the path to the leaf node that is to be added to the set - + """ path = list(path) assert len(path) == self.depth @@ -1582,7 +1583,7 @@ class RenderTile(object): @classmethod def compute_path(cls, col, row, depth): - """Constructor that takes a col,row of a tile and computes the path. + """Constructor that takes a col,row of a tile and computes the path. """ assert col % 2 == 0