diff --git a/overviewer_core/dispatcher.py b/overviewer_core/dispatcher.py index 921b519..8228e31 100644 --- a/overviewer_core/dispatcher.py +++ b/overviewer_core/dispatcher.py @@ -83,25 +83,24 @@ class Dispatcher(object): """ pass -class MultiprocessingDispatcherManager(multiprocessing.managers.SyncManager): +class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): """This multiprocessing manager is responsible for giving worker processes access to the communication Queues, and also gives workers access to the current tileset list. """ - def __init__(self): + def __init__(self, address=None, authkey=None): self.job_queue = multiprocessing.Queue() self.result_queue = multiprocessing.Queue() - self.register("get_job_queue", callable=lambda: self.job_queue) - self.register("get_result_queue", callable=lambda: self.result_queue) - - # SyncManager must be initialized to create the list below - super(MultiprocessingDispatcherManager, self).__init__() - self.start() - self.tilesets = [] self.tileset_version = 0 - self.tileset_data = self.list([[], 0]) + self.tileset_data = [[], 0] + + self.register("get_job_queue", callable=lambda: self.job_queue) + self.register("get_result_queue", callable=lambda: self.result_queue) + self.register("get_tileset_data", callable=lambda: self.tileset_data, proxytype=multiprocessing.managers.ListProxy) + + super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey) def set_tilesets(self, tilesets): """This is used in MultiprocessingDispatcher.setup_tilesets to @@ -112,14 +111,15 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.SyncManager): """ self.tilesets = tilesets self.tileset_version += 1 - self.tileset_data[0] = self.tilesets - self.tileset_data[1] = self.tileset_version + data = self.get_tileset_data() + data[0] = self.tilesets + data[1] = self.tileset_version def get_tilesets(self): """This returns a (tilesetlist, tileset_version) tuple when called from a worker process. """ - return self.tileset_data._getvalue() + return self.get_tileset_data()._getvalue() class MultiprocessingDispatcherProcess(multiprocessing.Process): """This class represents a single worker process. It is created @@ -152,6 +152,9 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): """ timeout = 1.0 self.update_tilesets() + + # signal that we're starting up + self.result_queue.put(None, False) while True: try: job = self.job_queue.get(True, timeout) @@ -168,7 +171,8 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): assert tv == self.tileset_version # do job - result = self.tilesets[ti].do_work(workitem) + ret = self.tilesets[ti].do_work(workitem) + result = (ret,) self.result_queue.put(result, False) except Queue.Empty: pass @@ -177,22 +181,25 @@ class MultiprocessingDispatcher(Dispatcher): """A subclass of Dispatcher that spawns worker processes and distributes jobs to them to speed up processing. """ - def __init__(self, local_procs=0): + def __init__(self, local_procs=-1, address=None, authkey=None): """Creates the dispatcher. local_procs should be the number of - worker processes to spawn. If it's omitted (or non-positive) + worker processes to spawn. If it's omitted (or negative) the number of available CPUs is used instead. """ # automatic local_procs handling - if local_procs <= 0: + if local_procs < 0: local_procs = multiprocessing.cpu_count() self.local_procs = local_procs - self.outstanding_jobs = 0 - self.manager = MultiprocessingDispatcherManager() + self.outstanding_jobs = 0 + self.num_workers = 0 + self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey) self.job_queue = self.manager.job_queue self.result_queue = self.manager.result_queue + self.manager.start() + # create and fill the pool self.pool = [] for i in xrange(self.local_procs): @@ -201,9 +208,11 @@ class MultiprocessingDispatcher(Dispatcher): self.pool.append(proc) def close(self): + self.finish_work() + # send of the end-of-jobs sentinel - for p in self.pool: - self.job_queue.put(None) + for p in xrange(self.num_workers): + self.job_queue.put(None, False) # and close the manager self.manager.shutdown() @@ -216,15 +225,16 @@ class MultiprocessingDispatcher(Dispatcher): def dispatch(self, tileset, workitem): # create and submit the job tileset_index = self.manager.tilesets.index(tileset) - self.job_queue.put((self.manager.tileset_version, tileset_index, workitem)) + self.job_queue.put((self.manager.tileset_version, tileset_index, workitem), False) self.outstanding_jobs += 1 # make sure the queue doesn't fill up too much - while self.outstanding_jobs > self.local_procs * 10: + while self.outstanding_jobs > self.num_workers * 10: self._handle_messages() def finish_work(self): # empty the queue + self._handle_messages() while self.outstanding_jobs > 0: self._handle_messages() @@ -238,6 +248,23 @@ class MultiprocessingDispatcher(Dispatcher): # timeout should only apply once timeout = 0.0 - self.outstanding_jobs -= 1 + if result != None: + # completed job + self.outstanding_jobs -= 1 + else: + # new worker + self.num_workers += 1 except Queue.Empty: pass + + @classmethod + def start_manual_process(cls, address, authkey): + """A convenience method to start up a manual process, possibly + on another machine. Address is a (hostname, port) tuple, and + authkey must be the same as that provided to the + MultiprocessingDispatcher constructor. + """ + m = MultiprocessingDispatcherManager(address=address, authkey=authkey) + m.connect() + p = MultiprocessingDispatcherProcess(m) + p.run()