0

comments and docstrings for the multiprocessing dispatcher

This commit is contained in:
Aaron Griffith
2011-12-20 05:39:41 -05:00
parent 8db7d37164
commit 192591c2b0

View File

@@ -84,6 +84,10 @@ class Dispatcher(object):
pass
class MultiprocessingDispatcherManager(multiprocessing.managers.SyncManager):
"""This multiprocessing manager is responsible for giving worker
processes access to the communication Queues, and also gives
workers access to the current tileset list.
"""
def __init__(self):
self.job_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue()
@@ -91,6 +95,7 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.SyncManager):
self.register("get_job_queue", callable=lambda: self.job_queue)
self.register("get_result_queue", callable=lambda: self.result_queue)
# SyncManager must be initialized to create the list below
super(MultiprocessingDispatcherManager, self).__init__()
self.start()
@@ -99,25 +104,52 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.SyncManager):
self.tileset_data = self.list([[], 0])
def set_tilesets(self, tilesets):
"""This is used in MultiprocessingDispatcher.setup_tilesets to
update the tilesets each worker has access to. It also
increments a `tileset_version` which is an easy way for
workers to see if their tileset list is out-of-date without
pickling and copying over the entire list.
"""
self.tilesets = tilesets
self.tileset_version += 1
self.tileset_data[0] = self.tilesets
self.tileset_data[1] = self.tileset_version
def get_tilesets(self):
"""This returns a (tilesetlist, tileset_version) tuple when
called from a worker process.
"""
return self.tileset_data._getvalue()
class MultiprocessingDispatcherProcess(multiprocessing.Process):
"""This class represents a single worker process. It is created
automatically by MultiprocessingDispatcher, but it can even be
used manually to spawn processes on different machines on the same
network.
"""
def __init__(self, manager):
"""Creates the process object. manager should be an instance
of MultiprocessingDispatcherManager connected to the one
created in MultiprocessingDispatcher.
"""
super(MultiprocessingDispatcherProcess, self).__init__()
self.manager = manager
self.job_queue = manager.get_job_queue()
self.result_queue = manager.get_result_queue()
def update_tilesets(self):
"""A convenience function to update our local tilesets to the
current version in use by the MultiprocessingDispatcher.
"""
self.tilesets, self.tileset_version = self.manager.get_tilesets()
def run(self):
"""The main work loop. Jobs are pulled from the job queue and
executed, then the result is pushed onto the result
queue. Updates to the tilesetlist are recognized and handled
automatically. This is the method that actually runs in the
new worker process.
"""
timeout = 1.0
self.update_tilesets()
while True:
@@ -142,7 +174,16 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
pass
class MultiprocessingDispatcher(Dispatcher):
"""A subclass of Dispatcher that spawns worker processes and
distributes jobs to them to speed up processing.
"""
def __init__(self, local_procs=0):
"""Creates the dispatcher. local_procs should be the number of
worker processes to spawn. If it's omitted (or non-positive)
the number of available CPUs is used instead.
"""
# automatic local_procs handling
if local_procs <= 0:
local_procs = multiprocessing.cpu_count()
self.local_procs = local_procs
@@ -152,6 +193,7 @@ class MultiprocessingDispatcher(Dispatcher):
self.job_queue = self.manager.job_queue
self.result_queue = self.manager.result_queue
# create and fill the pool
self.pool = []
for i in xrange(self.local_procs):
proc = MultiprocessingDispatcherProcess(self.manager)
@@ -172,17 +214,23 @@ class MultiprocessingDispatcher(Dispatcher):
self.manager.set_tilesets(tilesets)
def dispatch(self, tileset, workitem):
# create and submit the job
tileset_index = self.manager.tilesets.index(tileset)
self.job_queue.put((self.manager.tileset_version, tileset_index, workitem))
self.outstanding_jobs += 1
# make sure the queue doesn't fill up too much
while self.outstanding_jobs > self.local_procs * 10:
self._handle_messages()
def finish_work(self):
# empty the queue
while self.outstanding_jobs > 0:
self._handle_messages()
def _handle_messages(self):
# work function: takes results out of the result queue and
# keeps track of how many outstanding jobs remain
timeout = 1.0
try:
while True: # exits in except block