0

implement observer model

This commit is contained in:
aheadley
2012-03-16 22:26:16 -04:00
parent b1dba9518c
commit 46b04dd09a
3 changed files with 205 additions and 120 deletions

View File

@@ -20,6 +20,8 @@ import cPickle as pickle
import Queue
import time
import logging
import progressbar
import sys
from signals import Signal
@@ -30,17 +32,19 @@ class Dispatcher(object):
possible to create a Dispatcher that distributes this work to many
worker processes.
"""
def __init__(self):
def __init__(self, observer=None):
super(Dispatcher, self).__init__()
# list of (tileset, workitem) tuples
# keeps track of dispatched but unfinished jobs
self._running_jobs = []
# list of (tileset, workitem, dependencies) tuples
# keeps track of jobs waiting to run after dependencies finish
self._pending_jobs = []
def render_all(self, tilesetlist, status_callback):
self.observer_type = observer or LoggingObserver
def render_all(self, tilesetlist):
"""Render all of the tilesets in the given
tilesetlist. status_callback is called periodically to update
status. The callback should take the following arguments:
@@ -48,10 +52,10 @@ class Dispatcher(object):
be none if there is no useful estimate.
"""
# TODO use status callback
# setup tilesetlist
self.setup_tilesets(tilesetlist)
# iterate through all possible phases
num_phases = [tileset.get_num_phases() for tileset in tilesetlist]
for phase in xrange(max(num_phases)):
@@ -62,7 +66,7 @@ class Dispatcher(object):
def make_work_iterator(tset, p):
return ((tset, workitem) for workitem in tset.iterate_work_items(p))
work_iterators.append(make_work_iterator(tileset, phase))
# keep track of total jobs, and how many jobs are done
total_jobs = 0
for tileset, phases in zip(tilesetlist, num_phases):
@@ -75,21 +79,33 @@ class Dispatcher(object):
else:
total_jobs += jobs_for_tileset
finished_jobs = 0
self.observer = self.observer_type(total_jobs)
# do the first status update
self._status_update(status_callback, phase, finished_jobs, total_jobs, force=True)
#self._status_update(status_callback, phase, finished_jobs, total_jobs, force=True)
self.observer.start()
self._status_update(phase, finished_jobs, True)
# go through these iterators round-robin style
for tileset, (workitem, deps) in util.roundrobin(work_iterators):
self._pending_jobs.append((tileset, workitem, deps))
finished_jobs += self._dispatch_jobs()
self._status_update(status_callback, phase, finished_jobs, total_jobs)
self._status_update(phase, finished_jobs)
# after each phase, wait for the work to finish
while len(self._pending_jobs) > 0 or len(self._running_jobs) > 0:
finished_jobs += self._dispatch_jobs()
self._status_update(status_callback, phase, finished_jobs, total_jobs)
self._status_update(phase, finished_jobs)
self.observer.finish()
def _status_update(self, phase, completed, force=False):
if force or completed - self.observer.get_current_value() > \
self.observer.MIN_UPDATE_INTERVAL:
self.observer.update(completed)
"""
def _status_update(self, callback, phase, completed, total, force=False):
# always called with force=True at the beginning, so that can
# be used to set up state. After that, it is called after
@@ -101,26 +117,27 @@ class Dispatcher(object):
if callback:
callback(phase, completed, total)
return
if callback is None:
return
update_interval = 100 # XXX arbitrary
if self._last_status_update < 0 or completed >= self._last_status_update + update_interval or completed < self._last_status_update:
self._last_status_update = completed
callback(phase, completed, total)
"""
def _dispatch_jobs(self):
# helper function to dispatch pending jobs when their
# dependencies are met, and to manage self._running_jobs
dispatched_jobs = []
finished_jobs = []
pending_jobs_nodeps = [(j[0], j[1]) for j in self._pending_jobs]
for pending_job in self._pending_jobs:
tileset, workitem, deps = pending_job
# see if any of the deps are in _running_jobs or _pending_jobs
for dep in deps:
if (tileset, dep) in self._running_jobs or (tileset, dep) in pending_jobs_nodeps:
@@ -131,33 +148,33 @@ class Dispatcher(object):
finished_jobs += self.dispatch(tileset, workitem)
self._running_jobs.append((tileset, workitem))
dispatched_jobs.append(pending_job)
# make sure to at least get finished jobs, even if we don't
# submit any new ones...
if len(dispatched_jobs) == 0:
finished_jobs += self.dispatch(None, None)
# clean out the appropriate lists
for job in finished_jobs:
self._running_jobs.remove(job)
for job in dispatched_jobs:
self._pending_jobs.remove(job)
return len(finished_jobs)
def close(self):
"""Close the Dispatcher. This should be called when you are
done with the dispatcher, to ensure that it cleans up any
processes or connections it may still have around.
"""
pass
def setup_tilesets(self, tilesetlist):
"""Called whenever a new list of tilesets are being used. This
lets subclasses distribute the whole list at once, instead of
for each work item."""
pass
def dispatch(self, tileset, workitem):
"""Dispatch the given work item. The end result of this call
should be running tileset.do_work(workitem) somewhere. This
@@ -176,31 +193,31 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
workers access to the current tileset list.
"""
def _get_job_queue(self):
return self.job_queue
return self.job_queue
def _get_results_queue(self):
return self.result_queue
return self.result_queue
def _get_signal_queue(self):
return self.signal_queue
return self.signal_queue
def _get_tileset_data(self):
return self.tileset_data
return self.tileset_data
def __init__(self, address=None, authkey=None):
self.job_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue()
self.signal_queue = multiprocessing.Queue()
self.tilesets = []
self.tileset_version = 0
self.tileset_data = [[], 0]
self.register("get_job_queue", callable=self._get_job_queue)
self.register("get_result_queue", callable=self._get_results_queue)
self.register("get_signal_queue", callable=self._get_signal_queue)
self.register("get_tileset_data", callable=self._get_tileset_data, proxytype=multiprocessing.managers.ListProxy)
super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey)
@classmethod
@classmethod
def from_address(cls, address, authkey, serializer):
"Required to be implemented to make multiprocessing happy"
c = cls(address=address, authkey=authkey)
@@ -218,7 +235,7 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
data = self.get_tileset_data()
data[0] = self.tilesets
data[1] = self.tileset_version
class MultiprocessingDispatcherProcess(multiprocessing.Process):
"""This class represents a single worker process. It is created
@@ -236,13 +253,13 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
self.result_queue = manager.get_result_queue()
self.signal_queue = manager.get_signal_queue()
self.tileset_proxy = manager.get_tileset_data()
def update_tilesets(self):
"""A convenience function to update our local tilesets to the
current version in use by the MultiprocessingDispatcher.
"""
self.tilesets, self.tileset_version = self.tileset_proxy._getvalue()
def run(self):
"""The main work loop. Jobs are pulled from the job queue and
executed, then the result is pushed onto the result
@@ -252,10 +269,10 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
"""
# per-process job get() timeout
timeout = 1.0
# update our tilesets
self.update_tilesets()
# register for all available signals
def register_signal(name, sig):
def handler(*args, **kwargs):
@@ -263,7 +280,7 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
sig.set_interceptor(handler)
for name, sig in Signal.signals.iteritems():
register_signal(name, sig)
# notify that we're starting up
self.result_queue.put(None, False)
while True:
@@ -272,15 +289,15 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
if job == None:
# this is a end-of-jobs sentinel
return
# unpack job
tv, ti, workitem = job
if tv != self.tileset_version:
# our tilesets changed!
self.update_tilesets()
assert tv == self.tileset_version
# do job
ret = self.tilesets[ti].do_work(workitem)
result = (ti, workitem, ret,)
@@ -292,18 +309,18 @@ class MultiprocessingDispatcher(Dispatcher):
"""A subclass of Dispatcher that spawns worker processes and
distributes jobs to them to speed up processing.
"""
def __init__(self, local_procs=-1, address=None, authkey=None):
def __init__(self, local_procs=-1, address=None, authkey=None, observer=None):
"""Creates the dispatcher. local_procs should be the number of
worker processes to spawn. If it's omitted (or negative)
the number of available CPUs is used instead.
"""
super(MultiprocessingDispatcher, self).__init__()
super(MultiprocessingDispatcher, self).__init__(observer=observer)
# automatic local_procs handling
if local_procs < 0:
local_procs = multiprocessing.cpu_count()
self.local_procs = local_procs
self.outstanding_jobs = 0
self.num_workers = 0
self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey)
@@ -311,63 +328,63 @@ class MultiprocessingDispatcher(Dispatcher):
self.job_queue = self.manager.get_job_queue()
self.result_queue = self.manager.get_result_queue()
self.signal_queue = self.manager.get_signal_queue()
# create and fill the pool
self.pool = []
for i in xrange(self.local_procs):
proc = MultiprocessingDispatcherProcess(self.manager)
proc.start()
self.pool.append(proc)
def close(self):
# empty the queue
self._handle_messages(timeout=0.0)
while self.outstanding_jobs > 0:
self._handle_messages()
# send of the end-of-jobs sentinel
for p in xrange(self.num_workers):
self.job_queue.put(None, False)
# TODO better way to be sure worker processes get the message
time.sleep(1)
# and close the manager
self.manager.shutdown()
self.manager = None
self.pool = None
def setup_tilesets(self, tilesets):
self.manager.set_tilesets(tilesets)
def dispatch(self, tileset, workitem):
# handle the no-new-work case
if tileset is None:
return self._handle_messages()
# create and submit the job
tileset_index = self.manager.tilesets.index(tileset)
self.job_queue.put((self.manager.tileset_version, tileset_index, workitem), False)
self.outstanding_jobs += 1
# make sure the queue doesn't fill up too much
finished_jobs = self._handle_messages(timeout=0.0)
while self.outstanding_jobs > self.num_workers * 10:
finished_jobs += self._handle_messages()
return finished_jobs
def _handle_messages(self, timeout=0.01):
# work function: takes results out of the result queue and
# keeps track of how many outstanding jobs remain
finished_jobs = []
result_empty = False
signal_empty = False
while not (result_empty and signal_empty):
if not result_empty:
if not result_empty:
try:
result = self.result_queue.get(False)
if result != None:
# completed job
ti, workitem, ret = result
@@ -386,14 +403,14 @@ class MultiprocessingDispatcher(Dispatcher):
name, args, kwargs = self.signal_queue.get(False)
# timeout should only apply once
timeout = 0.0
sig = Signal.signals[name]
sig.emit_intercepted(*args, **kwargs)
except Queue.Empty:
signal_empty = True
return finished_jobs
@classmethod
def start_manual_process(cls, address, authkey):
"""A convenience method to start up a manual process, possibly
@@ -405,3 +422,91 @@ class MultiprocessingDispatcher(Dispatcher):
m.connect()
p = MultiprocessingDispatcherProcess(m)
p.run()
class Observer(object):
"""
"""
MIN_UPDATE_INTERVAL = 100
def __init__(self, max_value):
self._current_value = None
self._max_value = max_value
self.start_time = None
self.end_time = None
def start(self):
self.start_time = time.time()
self.update(0)
return self
def is_started(self):
return self.start_time is not None
def finish(self):
self.end_time = time.time()
def is_finished(self):
return self.end_time is not None
def is_running(self):
return self.is_started() and not self.is_finished()
def add(self, amount):
self.update(self.get_current_value() + amount)
def update(self, current_value):
"""
"""
self._current_value = current_value
def get_percentage(self):
if self.get_max_value() is 0:
return 100.0
else:
return self.get_current_value() * 100.0 / self.get_max_value()
def get_current_value(self):
return self._current_value
def get_max_value(self):
return self._max_value
class LoggingObserver(Observer):
"""
"""
def update(self, current_value):
super(LoggingObserver, self).update(current_value)
if self.get_max_value() is None:
logging.info("Rendered %d tiles.", self.get_current_value())
else:
logging.info("Rendered %d of %d. %d%% complete",
self.get_current_value(), self.get_max_value(),
self.get_percentage())
default_widgets = ['Rendering: ', progressbar.FractionWidget(), ' (',
progressbar.Percentage(), ') ', progressbar.Bar(left='[', right=']'),
' ', progressbar.ETA()]
class ProgressBarObserver(progressbar.ProgressBar):
"""
"""
MIN_UPDATE_INTERVAL = 100
def __init__(self, max_value, widgets=default_widgets, term_width=None,
fd=sys.stderr):
super(ProgressBarObserver, self).__init__(maxval=max_value,
widgets=widgets, term_width=term_width, fd=fd)
def is_started(self):
return self.start_time is not None
def finish(self):
self._end_time = time.time()
super(ProgressBarObserver, self).finish()
def get_current_value(self):
return self.currval
def get_max_value(self):
return self.maxval