diff --git a/overviewer_core/dispatcher.py b/overviewer_core/dispatcher.py index 44680bb..3fd66e8 100644 --- a/overviewer_core/dispatcher.py +++ b/overviewer_core/dispatcher.py @@ -25,9 +25,9 @@ from signals import Signal class Dispatcher(object): """This class coordinates the work of all the TileSet objects among one worker process. By subclassing this class and - implementing setup_tilesets(), dispatch(), finish_work() and - close(), it is possible to create a Dispatcher that distributes - this work to many worker processes. + implementing setup_tilesets(), dispatch(), and close(), it is + possible to create a Dispatcher that distributes this work to many + worker processes. """ def __init__(self): super(Dispatcher, self).__init__() @@ -71,7 +71,6 @@ class Dispatcher(object): # after each phase, wait for the work to finish while len(self._pending_jobs) > 0 or len(self._running_jobs) > 0: self._dispatch_jobs() - self.finish_work() def _dispatch_jobs(self): # helper function to dispatch pending jobs when their @@ -130,13 +129,6 @@ class Dispatcher(object): tileset.do_work(workitem) return [(tileset, workitem),] return [] - - def finish_work(self): - """This call should block until all dispatched jobs have - completed. It's used at the end of each phase to ensure that - phases are always run in serial. - """ - pass class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): """This multiprocessing manager is responsible for giving worker @@ -279,7 +271,10 @@ class MultiprocessingDispatcher(Dispatcher): self.pool.append(proc) def close(self): - self.finish_work() + # empty the queue + self._handle_messages(timeout=0.0) + while self.outstanding_jobs > 0: + self._handle_messages() # send of the end-of-jobs sentinel for p in xrange(self.num_workers): @@ -312,12 +307,6 @@ class MultiprocessingDispatcher(Dispatcher): finished_jobs += self._handle_messages() return finished_jobs - def finish_work(self): - # empty the queue - self._handle_messages(timeout=0.0) - while self.outstanding_jobs > 0: - self._handle_messages() - def _handle_messages(self, timeout=1.0): # work function: takes results out of the result queue and # keeps track of how many outstanding jobs remain