diff --git a/overviewer_core/dispatcher.py b/overviewer_core/dispatcher.py index 8228e31..14e43e1 100644 --- a/overviewer_core/dispatcher.py +++ b/overviewer_core/dispatcher.py @@ -26,6 +26,16 @@ class Dispatcher(object): close(), it is possible to create a Dispatcher that distributes this work to many worker processes. """ + def __init__(self): + super(Dispatcher, self).__init__() + + # list of (tileset, workitem) tuples + # keeps track of dispatched but unfinished jobs + self._running_jobs = [] + # list of (tileset, workitem, dependencies) tuples + # keeps track of jobs waiting to run after dependencies finish + self._pending_jobs = [] + def render_all(self, tilesetlist, status_callback): """Render all of the tilesets in the given tilesetlist. status_callback is called periodically to update @@ -51,12 +61,48 @@ class Dispatcher(object): work_iterators.append(make_work_iterator(tileset, phase)) # go through these iterators round-robin style - for tileset, workitem in util.roundrobin(work_iterators): - self.dispatch(tileset, workitem) + for tileset, (workitem, deps) in util.roundrobin(work_iterators): + self._pending_jobs.append((tileset, workitem, deps)) + self._dispatch_jobs() # after each phase, wait for the work to finish + while len(self._pending_jobs) > 0 or len(self._running_jobs) > 0: + self._dispatch_jobs() self.finish_work() + def _dispatch_jobs(self): + # helper function to dispatch pending jobs when their + # dependencies are met, and to manage self._running_jobs + dispatched_jobs = [] + finished_jobs = [] + + pending_jobs_nodeps = map(lambda j: (j[0], j[1]), self._pending_jobs) + + for pending_job in self._pending_jobs: + tileset, workitem, deps = pending_job + + # see if any of the deps are in _running_jobs or _pending_jobs + for dep in deps: + if (tileset, dep) in self._running_jobs or (tileset, dep) in pending_jobs_nodeps: + # it is! don't dispatch this item yet + break + else: + # it isn't! all dependencies are finished + finished_jobs += self.dispatch(tileset, workitem) + self._running_jobs.append((tileset, workitem)) + dispatched_jobs.append(pending_job) + + # make sure to at least get finished jobs, even if we don't + # submit any new ones... + if len(dispatched_jobs) == 0: + finished_jobs += self.dispatch(None, None) + + # clean out the appropriate lists + for job in finished_jobs: + self._running_jobs.remove(job) + for job in dispatched_jobs: + self._pending_jobs.remove(job) + def close(self): """Close the Dispatcher. This should be called when you are done with the dispatcher, to ensure that it cleans up any @@ -72,9 +118,15 @@ class Dispatcher(object): def dispatch(self, tileset, workitem): """Dispatch the given work item. The end result of this call - should be running tileset.do_work(workitem) somewhere. + should be running tileset.do_work(workitem) somewhere. This + function should return a list of (tileset, workitem) tuples + that have completed since the last call. If tileset is None, + then returning completed jobs is all this function should do. """ - tileset.do_work(workitem) + if not tileset is None: + tileset.do_work(workitem) + return [(tileset, workitem),] + return [] def finish_work(self): """This call should block until all dispatched jobs have @@ -172,7 +224,7 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): # do job ret = self.tilesets[ti].do_work(workitem) - result = (ret,) + result = (ti, workitem, ret,) self.result_queue.put(result, False) except Queue.Empty: pass @@ -186,6 +238,7 @@ class MultiprocessingDispatcher(Dispatcher): worker processes to spawn. If it's omitted (or negative) the number of available CPUs is used instead. """ + super(MultiprocessingDispatcher, self).__init__() # automatic local_procs handling if local_procs < 0: @@ -223,14 +276,20 @@ class MultiprocessingDispatcher(Dispatcher): self.manager.set_tilesets(tilesets) def dispatch(self, tileset, workitem): + # handle the no-new-work case + if tileset is None: + return self._handle_messages() + # create and submit the job tileset_index = self.manager.tilesets.index(tileset) self.job_queue.put((self.manager.tileset_version, tileset_index, workitem), False) self.outstanding_jobs += 1 # make sure the queue doesn't fill up too much + finished_jobs = self._handle_messages() while self.outstanding_jobs > self.num_workers * 10: - self._handle_messages() + finished_jobs += self._handle_messages() + return finished_jobs def finish_work(self): # empty the queue @@ -241,6 +300,7 @@ class MultiprocessingDispatcher(Dispatcher): def _handle_messages(self): # work function: takes results out of the result queue and # keeps track of how many outstanding jobs remain + finished_jobs = [] timeout = 1.0 try: while True: # exits in except block @@ -250,12 +310,16 @@ class MultiprocessingDispatcher(Dispatcher): if result != None: # completed job + ti, workitem, ret = result + finished_jobs.append((self.manager.tilesets[ti], workitem)) self.outstanding_jobs -= 1 else: # new worker self.num_workers += 1 except Queue.Empty: pass + + return finished_jobs @classmethod def start_manual_process(cls, address, authkey):