0

dispatcher class now supports work-item dependencies

Thanks to ion in #overviewer for a particularly slick way to implement
this without using too much memory.

tracker: Issue #564
This commit is contained in:
Aaron Griffith
2011-12-21 04:43:17 -05:00
parent 1933df32d6
commit 2717485031

View File

@@ -26,6 +26,16 @@ class Dispatcher(object):
close(), it is possible to create a Dispatcher that distributes
this work to many worker processes.
"""
def __init__(self):
super(Dispatcher, self).__init__()
# list of (tileset, workitem) tuples
# keeps track of dispatched but unfinished jobs
self._running_jobs = []
# list of (tileset, workitem, dependencies) tuples
# keeps track of jobs waiting to run after dependencies finish
self._pending_jobs = []
def render_all(self, tilesetlist, status_callback):
"""Render all of the tilesets in the given
tilesetlist. status_callback is called periodically to update
@@ -51,12 +61,48 @@ class Dispatcher(object):
work_iterators.append(make_work_iterator(tileset, phase))
# go through these iterators round-robin style
for tileset, workitem in util.roundrobin(work_iterators):
self.dispatch(tileset, workitem)
for tileset, (workitem, deps) in util.roundrobin(work_iterators):
self._pending_jobs.append((tileset, workitem, deps))
self._dispatch_jobs()
# after each phase, wait for the work to finish
while len(self._pending_jobs) > 0 or len(self._running_jobs) > 0:
self._dispatch_jobs()
self.finish_work()
def _dispatch_jobs(self):
# helper function to dispatch pending jobs when their
# dependencies are met, and to manage self._running_jobs
dispatched_jobs = []
finished_jobs = []
pending_jobs_nodeps = map(lambda j: (j[0], j[1]), self._pending_jobs)
for pending_job in self._pending_jobs:
tileset, workitem, deps = pending_job
# see if any of the deps are in _running_jobs or _pending_jobs
for dep in deps:
if (tileset, dep) in self._running_jobs or (tileset, dep) in pending_jobs_nodeps:
# it is! don't dispatch this item yet
break
else:
# it isn't! all dependencies are finished
finished_jobs += self.dispatch(tileset, workitem)
self._running_jobs.append((tileset, workitem))
dispatched_jobs.append(pending_job)
# make sure to at least get finished jobs, even if we don't
# submit any new ones...
if len(dispatched_jobs) == 0:
finished_jobs += self.dispatch(None, None)
# clean out the appropriate lists
for job in finished_jobs:
self._running_jobs.remove(job)
for job in dispatched_jobs:
self._pending_jobs.remove(job)
def close(self):
"""Close the Dispatcher. This should be called when you are
done with the dispatcher, to ensure that it cleans up any
@@ -72,9 +118,15 @@ class Dispatcher(object):
def dispatch(self, tileset, workitem):
"""Dispatch the given work item. The end result of this call
should be running tileset.do_work(workitem) somewhere.
should be running tileset.do_work(workitem) somewhere. This
function should return a list of (tileset, workitem) tuples
that have completed since the last call. If tileset is None,
then returning completed jobs is all this function should do.
"""
if not tileset is None:
tileset.do_work(workitem)
return [(tileset, workitem),]
return []
def finish_work(self):
"""This call should block until all dispatched jobs have
@@ -172,7 +224,7 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
# do job
ret = self.tilesets[ti].do_work(workitem)
result = (ret,)
result = (ti, workitem, ret,)
self.result_queue.put(result, False)
except Queue.Empty:
pass
@@ -186,6 +238,7 @@ class MultiprocessingDispatcher(Dispatcher):
worker processes to spawn. If it's omitted (or negative)
the number of available CPUs is used instead.
"""
super(MultiprocessingDispatcher, self).__init__()
# automatic local_procs handling
if local_procs < 0:
@@ -223,14 +276,20 @@ class MultiprocessingDispatcher(Dispatcher):
self.manager.set_tilesets(tilesets)
def dispatch(self, tileset, workitem):
# handle the no-new-work case
if tileset is None:
return self._handle_messages()
# create and submit the job
tileset_index = self.manager.tilesets.index(tileset)
self.job_queue.put((self.manager.tileset_version, tileset_index, workitem), False)
self.outstanding_jobs += 1
# make sure the queue doesn't fill up too much
finished_jobs = self._handle_messages()
while self.outstanding_jobs > self.num_workers * 10:
self._handle_messages()
finished_jobs += self._handle_messages()
return finished_jobs
def finish_work(self):
# empty the queue
@@ -241,6 +300,7 @@ class MultiprocessingDispatcher(Dispatcher):
def _handle_messages(self):
# work function: takes results out of the result queue and
# keeps track of how many outstanding jobs remain
finished_jobs = []
timeout = 1.0
try:
while True: # exits in except block
@@ -250,6 +310,8 @@ class MultiprocessingDispatcher(Dispatcher):
if result != None:
# completed job
ti, workitem, ret = result
finished_jobs.append((self.manager.tilesets[ti], workitem))
self.outstanding_jobs -= 1
else:
# new worker
@@ -257,6 +319,8 @@ class MultiprocessingDispatcher(Dispatcher):
except Queue.Empty:
pass
return finished_jobs
@classmethod
def start_manual_process(cls, address, authkey):
"""A convenience method to start up a manual process, possibly