0

moved in signals from the gist

gist: https://gist.github.com/1479733
tracker: #564
This commit is contained in:
Aaron Griffith
2011-12-21 06:10:34 -05:00
parent 2717485031
commit 2863876589
2 changed files with 141 additions and 17 deletions

View File

@@ -18,6 +18,9 @@ import multiprocessing
import multiprocessing.managers
import cPickle as pickle
import Queue
import time
from signals import Signal
class Dispatcher(object):
"""This class coordinates the work of all the TileSet objects
@@ -143,6 +146,7 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
def __init__(self, address=None, authkey=None):
self.job_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue()
self.signal_queue = multiprocessing.Queue()
self.tilesets = []
self.tileset_version = 0
@@ -150,6 +154,7 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
self.register("get_job_queue", callable=lambda: self.job_queue)
self.register("get_result_queue", callable=lambda: self.result_queue)
self.register("get_signal_queue", callable=lambda: self.signal_queue)
self.register("get_tileset_data", callable=lambda: self.tileset_data, proxytype=multiprocessing.managers.ListProxy)
super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey)
@@ -188,6 +193,7 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
self.manager = manager
self.job_queue = manager.get_job_queue()
self.result_queue = manager.get_result_queue()
self.signal_queue = manager.get_signal_queue()
def update_tilesets(self):
"""A convenience function to update our local tilesets to the
@@ -202,10 +208,21 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
automatically. This is the method that actually runs in the
new worker process.
"""
# per-process job get() timeout
timeout = 1.0
# update our tilesets
self.update_tilesets()
# signal that we're starting up
# register for all available signals
def register_signal(name, sig):
def handler(*args, **kwargs):
self.signal_queue.put((name, args, kwargs), False)
sig.set_interceptor(handler)
for name, sig in Signal.signals.iteritems():
register_signal(name, sig)
# notify that we're starting up
self.result_queue.put(None, False)
while True:
try:
@@ -250,6 +267,7 @@ class MultiprocessingDispatcher(Dispatcher):
self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey)
self.job_queue = self.manager.job_queue
self.result_queue = self.manager.result_queue
self.signal_queue = self.manager.signal_queue
self.manager.start()
@@ -267,6 +285,9 @@ class MultiprocessingDispatcher(Dispatcher):
for p in xrange(self.num_workers):
self.job_queue.put(None, False)
# TODO better way to be sure worker processes get the message
time.sleep(1)
# and close the manager
self.manager.shutdown()
self.manager = None
@@ -302,22 +323,34 @@ class MultiprocessingDispatcher(Dispatcher):
# keeps track of how many outstanding jobs remain
finished_jobs = []
timeout = 1.0
try:
while True: # exits in except block
result = self.result_queue.get(True, timeout)
# timeout should only apply once
timeout = 0.0
if result != None:
# completed job
ti, workitem, ret = result
finished_jobs.append((self.manager.tilesets[ti], workitem))
self.outstanding_jobs -= 1
else:
# new worker
self.num_workers += 1
except Queue.Empty:
pass
result_empty = False
signal_empty = False
while not (result_empty and signal_empty):
if not result_empty:
try:
result = self.result_queue.get(False)
if result != None:
# completed job
ti, workitem, ret = result
finished_jobs.append((self.manager.tilesets[ti], workitem))
self.outstanding_jobs -= 1
else:
# new worker
self.num_workers += 1
except Queue.Empty:
result_empty = True
if not signal_empty:
try:
name, args, kwargs = self.signal_queue.get(True, timeout)
# timeout should only apply once
timeout = 0.0
sig = Signal.signals[name]
sig.emit_intercepted(*args, **kwargs)
except Queue.Empty:
signal_empty = True
return finished_jobs