0

dispatcher.finish_jobs() has been removed

the other dispatcher functions can provide the same functionality, so
removing finish_jobs() only makes the dispatcher interface slimmer.
This commit is contained in:
Aaron Griffith
2011-12-26 10:05:57 -05:00
parent c7ee75d68d
commit 740f5ee628

View File

@@ -25,9 +25,9 @@ from signals import Signal
class Dispatcher(object): class Dispatcher(object):
"""This class coordinates the work of all the TileSet objects """This class coordinates the work of all the TileSet objects
among one worker process. By subclassing this class and among one worker process. By subclassing this class and
implementing setup_tilesets(), dispatch(), finish_work() and implementing setup_tilesets(), dispatch(), and close(), it is
close(), it is possible to create a Dispatcher that distributes possible to create a Dispatcher that distributes this work to many
this work to many worker processes. worker processes.
""" """
def __init__(self): def __init__(self):
super(Dispatcher, self).__init__() super(Dispatcher, self).__init__()
@@ -71,7 +71,6 @@ class Dispatcher(object):
# after each phase, wait for the work to finish # after each phase, wait for the work to finish
while len(self._pending_jobs) > 0 or len(self._running_jobs) > 0: while len(self._pending_jobs) > 0 or len(self._running_jobs) > 0:
self._dispatch_jobs() self._dispatch_jobs()
self.finish_work()
def _dispatch_jobs(self): def _dispatch_jobs(self):
# helper function to dispatch pending jobs when their # helper function to dispatch pending jobs when their
@@ -131,13 +130,6 @@ class Dispatcher(object):
return [(tileset, workitem),] return [(tileset, workitem),]
return [] return []
def finish_work(self):
"""This call should block until all dispatched jobs have
completed. It's used at the end of each phase to ensure that
phases are always run in serial.
"""
pass
class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
"""This multiprocessing manager is responsible for giving worker """This multiprocessing manager is responsible for giving worker
processes access to the communication Queues, and also gives processes access to the communication Queues, and also gives
@@ -279,7 +271,10 @@ class MultiprocessingDispatcher(Dispatcher):
self.pool.append(proc) self.pool.append(proc)
def close(self): def close(self):
self.finish_work() # empty the queue
self._handle_messages(timeout=0.0)
while self.outstanding_jobs > 0:
self._handle_messages()
# send of the end-of-jobs sentinel # send of the end-of-jobs sentinel
for p in xrange(self.num_workers): for p in xrange(self.num_workers):
@@ -312,12 +307,6 @@ class MultiprocessingDispatcher(Dispatcher):
finished_jobs += self._handle_messages() finished_jobs += self._handle_messages()
return finished_jobs return finished_jobs
def finish_work(self):
# empty the queue
self._handle_messages(timeout=0.0)
while self.outstanding_jobs > 0:
self._handle_messages()
def _handle_messages(self, timeout=1.0): def _handle_messages(self, timeout=1.0):
# work function: takes results out of the result queue and # work function: takes results out of the result queue and
# keeps track of how many outstanding jobs remain # keeps track of how many outstanding jobs remain