0

Made multiprocessing work on windows \o/

This commit is contained in:
Andrew Chin
2012-02-18 01:19:45 -05:00
parent ff6425d358
commit 2621997ecc
2 changed files with 27 additions and 18 deletions

View File

@@ -121,7 +121,7 @@ class Dispatcher(object):
dispatched_jobs = [] dispatched_jobs = []
finished_jobs = [] finished_jobs = []
pending_jobs_nodeps = map(lambda j: (j[0], j[1]), self._pending_jobs) pending_jobs_nodeps = [(j[0], j[1]) for j in self._pending_jobs]
for pending_job in self._pending_jobs: for pending_job in self._pending_jobs:
tileset, workitem, deps = pending_job tileset, workitem, deps = pending_job
@@ -180,6 +180,15 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
processes access to the communication Queues, and also gives processes access to the communication Queues, and also gives
workers access to the current tileset list. workers access to the current tileset list.
""" """
def _get_job_queue(self):
return self.job_queue
def _get_results_queue(self):
return self.result_queue
def _get_signal_queue(self):
return self.signal_queue
def _get_tileset_data(self):
return self.tileset_data
def __init__(self, address=None, authkey=None): def __init__(self, address=None, authkey=None):
self.job_queue = multiprocessing.Queue() self.job_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue() self.result_queue = multiprocessing.Queue()
@@ -189,13 +198,19 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
self.tileset_version = 0 self.tileset_version = 0
self.tileset_data = [[], 0] self.tileset_data = [[], 0]
self.register("get_job_queue", callable=lambda: self.job_queue) self.register("get_job_queue", callable=self._get_job_queue)
self.register("get_result_queue", callable=lambda: self.result_queue) self.register("get_result_queue", callable=self._get_results_queue)
self.register("get_signal_queue", callable=lambda: self.signal_queue) self.register("get_signal_queue", callable=self._get_signal_queue)
self.register("get_tileset_data", callable=lambda: self.tileset_data, proxytype=multiprocessing.managers.ListProxy) self.register("get_tileset_data", callable=self._get_tileset_data, proxytype=multiprocessing.managers.ListProxy)
super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey) super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey)
@classmethod
def from_address(cls, address, authkey, serializer):
"Required to be implemented to make multiprocessing happy"
c = cls(address=address, authkey=authkey)
return c
def set_tilesets(self, tilesets): def set_tilesets(self, tilesets):
"""This is used in MultiprocessingDispatcher.setup_tilesets to """This is used in MultiprocessingDispatcher.setup_tilesets to
update the tilesets each worker has access to. It also update the tilesets each worker has access to. It also
@@ -209,11 +224,6 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
data[0] = self.tilesets data[0] = self.tilesets
data[1] = self.tileset_version data[1] = self.tileset_version
def get_tilesets(self):
"""This returns a (tilesetlist, tileset_version) tuple when
called from a worker process.
"""
return self.get_tileset_data()._getvalue()
class MultiprocessingDispatcherProcess(multiprocessing.Process): class MultiprocessingDispatcherProcess(multiprocessing.Process):
"""This class represents a single worker process. It is created """This class represents a single worker process. It is created
@@ -227,16 +237,16 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
created in MultiprocessingDispatcher. created in MultiprocessingDispatcher.
""" """
super(MultiprocessingDispatcherProcess, self).__init__() super(MultiprocessingDispatcherProcess, self).__init__()
self.manager = manager
self.job_queue = manager.get_job_queue() self.job_queue = manager.get_job_queue()
self.result_queue = manager.get_result_queue() self.result_queue = manager.get_result_queue()
self.signal_queue = manager.get_signal_queue() self.signal_queue = manager.get_signal_queue()
self.tileset_proxy = manager.get_tileset_data()
def update_tilesets(self): def update_tilesets(self):
"""A convenience function to update our local tilesets to the """A convenience function to update our local tilesets to the
current version in use by the MultiprocessingDispatcher. current version in use by the MultiprocessingDispatcher.
""" """
self.tilesets, self.tileset_version = self.manager.get_tilesets() self.tilesets, self.tileset_version = self.tileset_proxy._getvalue()
def run(self): def run(self):
"""The main work loop. Jobs are pulled from the job queue and """The main work loop. Jobs are pulled from the job queue and
@@ -302,11 +312,10 @@ class MultiprocessingDispatcher(Dispatcher):
self.outstanding_jobs = 0 self.outstanding_jobs = 0
self.num_workers = 0 self.num_workers = 0
self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey) self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey)
self.job_queue = self.manager.job_queue
self.result_queue = self.manager.result_queue
self.signal_queue = self.manager.signal_queue
self.manager.start() self.manager.start()
self.job_queue = self.manager.get_job_queue()
self.result_queue = self.manager.get_result_queue()
self.signal_queue = self.manager.get_signal_queue()
# create and fill the pool # create and fill the pool
self.pool = [] self.pool = []

View File

@@ -112,7 +112,7 @@ class World(object):
# files. # files.
for root, dirs, files in os.walk(self.worlddir): for root, dirs, files in os.walk(self.worlddir):
# any .mcr files in this directory? # any .mcr files in this directory?
mcrs = filter(lambda x: x.endswith(".mcr"), files) mcrs = [x for x in files if x.endswith(".mcr")]
if mcrs: if mcrs:
# construct a regionset object for this # construct a regionset object for this
rset = RegionSet(root) rset = RegionSet(root)