From 2621997eccc35c0790775a6d12432cfacaedc141 Mon Sep 17 00:00:00 2001 From: Andrew Chin Date: Sat, 18 Feb 2012 01:19:45 -0500 Subject: [PATCH] Made multiprocessing work on windows \o/ --- overviewer_core/dispatcher.py | 43 +++++++++++++++++++++-------------- overviewer_core/world.py | 2 +- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/overviewer_core/dispatcher.py b/overviewer_core/dispatcher.py index 655fd04..2e8e353 100644 --- a/overviewer_core/dispatcher.py +++ b/overviewer_core/dispatcher.py @@ -121,7 +121,7 @@ class Dispatcher(object): dispatched_jobs = [] finished_jobs = [] - pending_jobs_nodeps = map(lambda j: (j[0], j[1]), self._pending_jobs) + pending_jobs_nodeps = [(j[0], j[1]) for j in self._pending_jobs] for pending_job in self._pending_jobs: tileset, workitem, deps = pending_job @@ -180,6 +180,15 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): processes access to the communication Queues, and also gives workers access to the current tileset list. """ + def _get_job_queue(self): + return self.job_queue + def _get_results_queue(self): + return self.result_queue + def _get_signal_queue(self): + return self.signal_queue + def _get_tileset_data(self): + return self.tileset_data + def __init__(self, address=None, authkey=None): self.job_queue = multiprocessing.Queue() self.result_queue = multiprocessing.Queue() @@ -189,13 +198,19 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): self.tileset_version = 0 self.tileset_data = [[], 0] - self.register("get_job_queue", callable=lambda: self.job_queue) - self.register("get_result_queue", callable=lambda: self.result_queue) - self.register("get_signal_queue", callable=lambda: self.signal_queue) - self.register("get_tileset_data", callable=lambda: self.tileset_data, proxytype=multiprocessing.managers.ListProxy) + self.register("get_job_queue", callable=self._get_job_queue) + self.register("get_result_queue", callable=self._get_results_queue) + self.register("get_signal_queue", callable=self._get_signal_queue) + self.register("get_tileset_data", callable=self._get_tileset_data, proxytype=multiprocessing.managers.ListProxy) super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey) - + + @classmethod + def from_address(cls, address, authkey, serializer): + "Required to be implemented to make multiprocessing happy" + c = cls(address=address, authkey=authkey) + return c + def set_tilesets(self, tilesets): """This is used in MultiprocessingDispatcher.setup_tilesets to update the tilesets each worker has access to. It also @@ -209,11 +224,6 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): data[0] = self.tilesets data[1] = self.tileset_version - def get_tilesets(self): - """This returns a (tilesetlist, tileset_version) tuple when - called from a worker process. - """ - return self.get_tileset_data()._getvalue() class MultiprocessingDispatcherProcess(multiprocessing.Process): """This class represents a single worker process. It is created @@ -227,16 +237,16 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): created in MultiprocessingDispatcher. """ super(MultiprocessingDispatcherProcess, self).__init__() - self.manager = manager self.job_queue = manager.get_job_queue() self.result_queue = manager.get_result_queue() self.signal_queue = manager.get_signal_queue() + self.tileset_proxy = manager.get_tileset_data() def update_tilesets(self): """A convenience function to update our local tilesets to the current version in use by the MultiprocessingDispatcher. """ - self.tilesets, self.tileset_version = self.manager.get_tilesets() + self.tilesets, self.tileset_version = self.tileset_proxy._getvalue() def run(self): """The main work loop. Jobs are pulled from the job queue and @@ -302,11 +312,10 @@ class MultiprocessingDispatcher(Dispatcher): self.outstanding_jobs = 0 self.num_workers = 0 self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey) - self.job_queue = self.manager.job_queue - self.result_queue = self.manager.result_queue - self.signal_queue = self.manager.signal_queue - self.manager.start() + self.job_queue = self.manager.get_job_queue() + self.result_queue = self.manager.get_result_queue() + self.signal_queue = self.manager.get_signal_queue() # create and fill the pool self.pool = [] diff --git a/overviewer_core/world.py b/overviewer_core/world.py index 07c7b90..5e6e0c5 100644 --- a/overviewer_core/world.py +++ b/overviewer_core/world.py @@ -112,7 +112,7 @@ class World(object): # files. for root, dirs, files in os.walk(self.worlddir): # any .mcr files in this directory? - mcrs = filter(lambda x: x.endswith(".mcr"), files) + mcrs = [x for x in files if x.endswith(".mcr")] if mcrs: # construct a regionset object for this rset = RegionSet(root)