diff --git a/overviewer_core/dispatcher.py b/overviewer_core/dispatcher.py index 500f0dc..921b519 100644 --- a/overviewer_core/dispatcher.py +++ b/overviewer_core/dispatcher.py @@ -84,6 +84,10 @@ class Dispatcher(object): pass class MultiprocessingDispatcherManager(multiprocessing.managers.SyncManager): + """This multiprocessing manager is responsible for giving worker + processes access to the communication Queues, and also gives + workers access to the current tileset list. + """ def __init__(self): self.job_queue = multiprocessing.Queue() self.result_queue = multiprocessing.Queue() @@ -91,6 +95,7 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.SyncManager): self.register("get_job_queue", callable=lambda: self.job_queue) self.register("get_result_queue", callable=lambda: self.result_queue) + # SyncManager must be initialized to create the list below super(MultiprocessingDispatcherManager, self).__init__() self.start() @@ -99,25 +104,52 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.SyncManager): self.tileset_data = self.list([[], 0]) def set_tilesets(self, tilesets): + """This is used in MultiprocessingDispatcher.setup_tilesets to + update the tilesets each worker has access to. It also + increments a `tileset_version` which is an easy way for + workers to see if their tileset list is out-of-date without + pickling and copying over the entire list. + """ self.tilesets = tilesets self.tileset_version += 1 self.tileset_data[0] = self.tilesets self.tileset_data[1] = self.tileset_version def get_tilesets(self): + """This returns a (tilesetlist, tileset_version) tuple when + called from a worker process. + """ return self.tileset_data._getvalue() class MultiprocessingDispatcherProcess(multiprocessing.Process): + """This class represents a single worker process. It is created + automatically by MultiprocessingDispatcher, but it can even be + used manually to spawn processes on different machines on the same + network. + """ def __init__(self, manager): + """Creates the process object. manager should be an instance + of MultiprocessingDispatcherManager connected to the one + created in MultiprocessingDispatcher. + """ super(MultiprocessingDispatcherProcess, self).__init__() self.manager = manager self.job_queue = manager.get_job_queue() self.result_queue = manager.get_result_queue() def update_tilesets(self): + """A convenience function to update our local tilesets to the + current version in use by the MultiprocessingDispatcher. + """ self.tilesets, self.tileset_version = self.manager.get_tilesets() def run(self): + """The main work loop. Jobs are pulled from the job queue and + executed, then the result is pushed onto the result + queue. Updates to the tilesetlist are recognized and handled + automatically. This is the method that actually runs in the + new worker process. + """ timeout = 1.0 self.update_tilesets() while True: @@ -142,7 +174,16 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): pass class MultiprocessingDispatcher(Dispatcher): + """A subclass of Dispatcher that spawns worker processes and + distributes jobs to them to speed up processing. + """ def __init__(self, local_procs=0): + """Creates the dispatcher. local_procs should be the number of + worker processes to spawn. If it's omitted (or non-positive) + the number of available CPUs is used instead. + """ + + # automatic local_procs handling if local_procs <= 0: local_procs = multiprocessing.cpu_count() self.local_procs = local_procs @@ -152,6 +193,7 @@ class MultiprocessingDispatcher(Dispatcher): self.job_queue = self.manager.job_queue self.result_queue = self.manager.result_queue + # create and fill the pool self.pool = [] for i in xrange(self.local_procs): proc = MultiprocessingDispatcherProcess(self.manager) @@ -172,17 +214,23 @@ class MultiprocessingDispatcher(Dispatcher): self.manager.set_tilesets(tilesets) def dispatch(self, tileset, workitem): + # create and submit the job tileset_index = self.manager.tilesets.index(tileset) self.job_queue.put((self.manager.tileset_version, tileset_index, workitem)) self.outstanding_jobs += 1 + + # make sure the queue doesn't fill up too much while self.outstanding_jobs > self.local_procs * 10: self._handle_messages() def finish_work(self): + # empty the queue while self.outstanding_jobs > 0: self._handle_messages() def _handle_messages(self): + # work function: takes results out of the result queue and + # keeps track of how many outstanding jobs remain timeout = 1.0 try: while True: # exits in except block