diff --git a/overviewer_core/dispatcher.py b/overviewer_core/dispatcher.py index 14e43e1..f037131 100644 --- a/overviewer_core/dispatcher.py +++ b/overviewer_core/dispatcher.py @@ -18,6 +18,9 @@ import multiprocessing import multiprocessing.managers import cPickle as pickle import Queue +import time + +from signals import Signal class Dispatcher(object): """This class coordinates the work of all the TileSet objects @@ -143,6 +146,7 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): def __init__(self, address=None, authkey=None): self.job_queue = multiprocessing.Queue() self.result_queue = multiprocessing.Queue() + self.signal_queue = multiprocessing.Queue() self.tilesets = [] self.tileset_version = 0 @@ -150,6 +154,7 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): self.register("get_job_queue", callable=lambda: self.job_queue) self.register("get_result_queue", callable=lambda: self.result_queue) + self.register("get_signal_queue", callable=lambda: self.signal_queue) self.register("get_tileset_data", callable=lambda: self.tileset_data, proxytype=multiprocessing.managers.ListProxy) super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey) @@ -188,6 +193,7 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): self.manager = manager self.job_queue = manager.get_job_queue() self.result_queue = manager.get_result_queue() + self.signal_queue = manager.get_signal_queue() def update_tilesets(self): """A convenience function to update our local tilesets to the @@ -202,10 +208,21 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process): automatically. This is the method that actually runs in the new worker process. """ + # per-process job get() timeout timeout = 1.0 + + # update our tilesets self.update_tilesets() - # signal that we're starting up + # register for all available signals + def register_signal(name, sig): + def handler(*args, **kwargs): + self.signal_queue.put((name, args, kwargs), False) + sig.set_interceptor(handler) + for name, sig in Signal.signals.iteritems(): + register_signal(name, sig) + + # notify that we're starting up self.result_queue.put(None, False) while True: try: @@ -250,6 +267,7 @@ class MultiprocessingDispatcher(Dispatcher): self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey) self.job_queue = self.manager.job_queue self.result_queue = self.manager.result_queue + self.signal_queue = self.manager.signal_queue self.manager.start() @@ -267,6 +285,9 @@ class MultiprocessingDispatcher(Dispatcher): for p in xrange(self.num_workers): self.job_queue.put(None, False) + # TODO better way to be sure worker processes get the message + time.sleep(1) + # and close the manager self.manager.shutdown() self.manager = None @@ -302,22 +323,34 @@ class MultiprocessingDispatcher(Dispatcher): # keeps track of how many outstanding jobs remain finished_jobs = [] timeout = 1.0 - try: - while True: # exits in except block - result = self.result_queue.get(True, timeout) - # timeout should only apply once - timeout = 0.0 - - if result != None: - # completed job - ti, workitem, ret = result - finished_jobs.append((self.manager.tilesets[ti], workitem)) - self.outstanding_jobs -= 1 - else: - # new worker - self.num_workers += 1 - except Queue.Empty: - pass + + result_empty = False + signal_empty = False + while not (result_empty and signal_empty): + if not result_empty: + try: + result = self.result_queue.get(False) + + if result != None: + # completed job + ti, workitem, ret = result + finished_jobs.append((self.manager.tilesets[ti], workitem)) + self.outstanding_jobs -= 1 + else: + # new worker + self.num_workers += 1 + except Queue.Empty: + result_empty = True + if not signal_empty: + try: + name, args, kwargs = self.signal_queue.get(True, timeout) + # timeout should only apply once + timeout = 0.0 + + sig = Signal.signals[name] + sig.emit_intercepted(*args, **kwargs) + except Queue.Empty: + signal_empty = True return finished_jobs diff --git a/overviewer_core/signals.py b/overviewer_core/signals.py new file mode 100755 index 0000000..3e8dc5e --- /dev/null +++ b/overviewer_core/signals.py @@ -0,0 +1,91 @@ +# This file is part of the Minecraft Overviewer. +# +# Minecraft Overviewer is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# Minecraft Overviewer is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the Overviewer. If not, see . + +""" +This module provides a way to create named "signals" that, when +emitted, call a set of registered functions. These signals also have +the ability to be intercepted, which lets Dispatchers re-route signals +back to the main process. +""" + +class Signal(object): + """A mechanism for registering functions to be called whenever + some specified event happens. This object is designed to work with + Dispatcher so that functions can register to always run in the + main Python instance.""" + + # a global list of registered signals, indexed by name + # this is used by JobManagers to register and relay signals + signals = {} + + def __init__(self, namespace, name): + """Creates a signal. Namespace and name should be the name of + the class this signal is for, and the name of the signal. They + are used to create a globally-unique name.""" + + self.namespace = namespace + self.name = name + self.fullname = namespace + '.' + name + self.interceptor = None + self.local_functions = [] + self.functions = [] + + # register this signal + self.signals[self.fullname] = self + + def register(self, func): + """Register a function to be called when this signal is + emitted. Functions registered in this way will always run in + the main Python instance.""" + self.functions.append(func) + return func + + def register_local(self, func): + """Register a function to be called when this signal is + emitted. Functions registered in this way will always run in + the Python instance in which they were emitted.""" + self.local_functions.append(func) + return func + + def set_interceptor(self, func): + """Sets an interceptor function. This function is called + instead of all the non-locally registered functions if it is + present, and should be used by JobManagers to intercept signal + emissions.""" + self.interceptor = func + + def emit(self, *args, **kwargs): + """Emits the signal with the given arguments. For convenience, + you can also call the signal object directly. + """ + for func in self.local_functions: + func(*args, **kwargs) + if self.interceptor: + self.interceptor(*args, **kwargs) + return + for func in self.functions: + func(*args, **kwargs) + + def emit_intercepted(self, *args, **kwargs): + """Re-emits an intercepted signal, and finishes the work that + would have been done during the original emission. This should + be used by Dispatchers to re-emit signals intercepted in + worker Python instances.""" + for func in self.functions: + func(*args, **kwargs) + + # convenience + def __call__(self, *args, **kwargs): + self.emit(*args, **kwargs)