0

some minor changes to allow multinode (!!!) dispatching

tracker: #564
This commit is contained in:
Aaron Griffith
2011-12-20 06:38:37 -05:00
parent 192591c2b0
commit 77ae3e2cb6

View File

@@ -83,25 +83,24 @@ class Dispatcher(object):
""" """
pass pass
class MultiprocessingDispatcherManager(multiprocessing.managers.SyncManager): class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
"""This multiprocessing manager is responsible for giving worker """This multiprocessing manager is responsible for giving worker
processes access to the communication Queues, and also gives processes access to the communication Queues, and also gives
workers access to the current tileset list. workers access to the current tileset list.
""" """
def __init__(self): def __init__(self, address=None, authkey=None):
self.job_queue = multiprocessing.Queue() self.job_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue() self.result_queue = multiprocessing.Queue()
self.register("get_job_queue", callable=lambda: self.job_queue)
self.register("get_result_queue", callable=lambda: self.result_queue)
# SyncManager must be initialized to create the list below
super(MultiprocessingDispatcherManager, self).__init__()
self.start()
self.tilesets = [] self.tilesets = []
self.tileset_version = 0 self.tileset_version = 0
self.tileset_data = self.list([[], 0]) self.tileset_data = [[], 0]
self.register("get_job_queue", callable=lambda: self.job_queue)
self.register("get_result_queue", callable=lambda: self.result_queue)
self.register("get_tileset_data", callable=lambda: self.tileset_data, proxytype=multiprocessing.managers.ListProxy)
super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey)
def set_tilesets(self, tilesets): def set_tilesets(self, tilesets):
"""This is used in MultiprocessingDispatcher.setup_tilesets to """This is used in MultiprocessingDispatcher.setup_tilesets to
@@ -112,14 +111,15 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.SyncManager):
""" """
self.tilesets = tilesets self.tilesets = tilesets
self.tileset_version += 1 self.tileset_version += 1
self.tileset_data[0] = self.tilesets data = self.get_tileset_data()
self.tileset_data[1] = self.tileset_version data[0] = self.tilesets
data[1] = self.tileset_version
def get_tilesets(self): def get_tilesets(self):
"""This returns a (tilesetlist, tileset_version) tuple when """This returns a (tilesetlist, tileset_version) tuple when
called from a worker process. called from a worker process.
""" """
return self.tileset_data._getvalue() return self.get_tileset_data()._getvalue()
class MultiprocessingDispatcherProcess(multiprocessing.Process): class MultiprocessingDispatcherProcess(multiprocessing.Process):
"""This class represents a single worker process. It is created """This class represents a single worker process. It is created
@@ -152,6 +152,9 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
""" """
timeout = 1.0 timeout = 1.0
self.update_tilesets() self.update_tilesets()
# signal that we're starting up
self.result_queue.put(None, False)
while True: while True:
try: try:
job = self.job_queue.get(True, timeout) job = self.job_queue.get(True, timeout)
@@ -168,7 +171,8 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
assert tv == self.tileset_version assert tv == self.tileset_version
# do job # do job
result = self.tilesets[ti].do_work(workitem) ret = self.tilesets[ti].do_work(workitem)
result = (ret,)
self.result_queue.put(result, False) self.result_queue.put(result, False)
except Queue.Empty: except Queue.Empty:
pass pass
@@ -177,22 +181,25 @@ class MultiprocessingDispatcher(Dispatcher):
"""A subclass of Dispatcher that spawns worker processes and """A subclass of Dispatcher that spawns worker processes and
distributes jobs to them to speed up processing. distributes jobs to them to speed up processing.
""" """
def __init__(self, local_procs=0): def __init__(self, local_procs=-1, address=None, authkey=None):
"""Creates the dispatcher. local_procs should be the number of """Creates the dispatcher. local_procs should be the number of
worker processes to spawn. If it's omitted (or non-positive) worker processes to spawn. If it's omitted (or negative)
the number of available CPUs is used instead. the number of available CPUs is used instead.
""" """
# automatic local_procs handling # automatic local_procs handling
if local_procs <= 0: if local_procs < 0:
local_procs = multiprocessing.cpu_count() local_procs = multiprocessing.cpu_count()
self.local_procs = local_procs self.local_procs = local_procs
self.outstanding_jobs = 0 self.outstanding_jobs = 0
self.manager = MultiprocessingDispatcherManager() self.num_workers = 0
self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey)
self.job_queue = self.manager.job_queue self.job_queue = self.manager.job_queue
self.result_queue = self.manager.result_queue self.result_queue = self.manager.result_queue
self.manager.start()
# create and fill the pool # create and fill the pool
self.pool = [] self.pool = []
for i in xrange(self.local_procs): for i in xrange(self.local_procs):
@@ -201,9 +208,11 @@ class MultiprocessingDispatcher(Dispatcher):
self.pool.append(proc) self.pool.append(proc)
def close(self): def close(self):
self.finish_work()
# send of the end-of-jobs sentinel # send of the end-of-jobs sentinel
for p in self.pool: for p in xrange(self.num_workers):
self.job_queue.put(None) self.job_queue.put(None, False)
# and close the manager # and close the manager
self.manager.shutdown() self.manager.shutdown()
@@ -216,15 +225,16 @@ class MultiprocessingDispatcher(Dispatcher):
def dispatch(self, tileset, workitem): def dispatch(self, tileset, workitem):
# create and submit the job # create and submit the job
tileset_index = self.manager.tilesets.index(tileset) tileset_index = self.manager.tilesets.index(tileset)
self.job_queue.put((self.manager.tileset_version, tileset_index, workitem)) self.job_queue.put((self.manager.tileset_version, tileset_index, workitem), False)
self.outstanding_jobs += 1 self.outstanding_jobs += 1
# make sure the queue doesn't fill up too much # make sure the queue doesn't fill up too much
while self.outstanding_jobs > self.local_procs * 10: while self.outstanding_jobs > self.num_workers * 10:
self._handle_messages() self._handle_messages()
def finish_work(self): def finish_work(self):
# empty the queue # empty the queue
self._handle_messages()
while self.outstanding_jobs > 0: while self.outstanding_jobs > 0:
self._handle_messages() self._handle_messages()
@@ -238,6 +248,23 @@ class MultiprocessingDispatcher(Dispatcher):
# timeout should only apply once # timeout should only apply once
timeout = 0.0 timeout = 0.0
if result != None:
# completed job
self.outstanding_jobs -= 1 self.outstanding_jobs -= 1
else:
# new worker
self.num_workers += 1
except Queue.Empty: except Queue.Empty:
pass pass
@classmethod
def start_manual_process(cls, address, authkey):
"""A convenience method to start up a manual process, possibly
on another machine. Address is a (hostname, port) tuple, and
authkey must be the same as that provided to the
MultiprocessingDispatcher constructor.
"""
m = MultiprocessingDispatcherManager(address=address, authkey=authkey)
m.connect()
p = MultiprocessingDispatcherProcess(m)
p.run()