clean up observer code
This commit is contained in:
@@ -19,10 +19,6 @@ import multiprocessing.managers
|
||||
import cPickle as pickle
|
||||
import Queue
|
||||
import time
|
||||
import logging
|
||||
import progressbar
|
||||
import sys
|
||||
|
||||
from signals import Signal
|
||||
|
||||
class Dispatcher(object):
|
||||
@@ -32,7 +28,7 @@ class Dispatcher(object):
|
||||
possible to create a Dispatcher that distributes this work to many
|
||||
worker processes.
|
||||
"""
|
||||
def __init__(self, observer=None):
|
||||
def __init__(self):
|
||||
super(Dispatcher, self).__init__()
|
||||
|
||||
# list of (tileset, workitem) tuples
|
||||
@@ -42,9 +38,7 @@ class Dispatcher(object):
|
||||
# keeps track of jobs waiting to run after dependencies finish
|
||||
self._pending_jobs = []
|
||||
|
||||
self.observer_type = observer or LoggingObserver
|
||||
|
||||
def render_all(self, tilesetlist):
|
||||
def render_all(self, tilesetlist, observer):
|
||||
"""Render all of the tilesets in the given
|
||||
tilesetlist. status_callback is called periodically to update
|
||||
status. The callback should take the following arguments:
|
||||
@@ -78,54 +72,18 @@ class Dispatcher(object):
|
||||
break
|
||||
else:
|
||||
total_jobs += jobs_for_tileset
|
||||
finished_jobs = 0
|
||||
|
||||
self.observer = self.observer_type(total_jobs)
|
||||
|
||||
# do the first status update
|
||||
#self._status_update(status_callback, phase, finished_jobs, total_jobs, force=True)
|
||||
self.observer.start()
|
||||
self._status_update(phase, finished_jobs, True)
|
||||
|
||||
observer.start(total_jobs)
|
||||
# go through these iterators round-robin style
|
||||
for tileset, (workitem, deps) in util.roundrobin(work_iterators):
|
||||
self._pending_jobs.append((tileset, workitem, deps))
|
||||
finished_jobs += self._dispatch_jobs()
|
||||
self._status_update(phase, finished_jobs)
|
||||
observer.add(self._dispatch_jobs())
|
||||
|
||||
# after each phase, wait for the work to finish
|
||||
while len(self._pending_jobs) > 0 or len(self._running_jobs) > 0:
|
||||
finished_jobs += self._dispatch_jobs()
|
||||
self._status_update(phase, finished_jobs)
|
||||
observer.add(self._dispatch_jobs())
|
||||
|
||||
self.observer.finish()
|
||||
|
||||
def _status_update(self, phase, completed, force=False):
|
||||
if force or completed - self.observer.get_current_value() > \
|
||||
self.observer.MIN_UPDATE_INTERVAL:
|
||||
self.observer.update(completed)
|
||||
|
||||
"""
|
||||
def _status_update(self, callback, phase, completed, total, force=False):
|
||||
# always called with force=True at the beginning, so that can
|
||||
# be used to set up state. After that, it is called after
|
||||
# every _dispatch_jobs() often; this function is used to
|
||||
# decide how often the actual status callback should be
|
||||
# called.
|
||||
if force:
|
||||
self._last_status_update = completed
|
||||
if callback:
|
||||
callback(phase, completed, total)
|
||||
return
|
||||
|
||||
if callback is None:
|
||||
return
|
||||
|
||||
update_interval = 100 # XXX arbitrary
|
||||
if self._last_status_update < 0 or completed >= self._last_status_update + update_interval or completed < self._last_status_update:
|
||||
self._last_status_update = completed
|
||||
callback(phase, completed, total)
|
||||
"""
|
||||
observer.finish()
|
||||
|
||||
def _dispatch_jobs(self):
|
||||
# helper function to dispatch pending jobs when their
|
||||
@@ -309,12 +267,12 @@ class MultiprocessingDispatcher(Dispatcher):
|
||||
"""A subclass of Dispatcher that spawns worker processes and
|
||||
distributes jobs to them to speed up processing.
|
||||
"""
|
||||
def __init__(self, local_procs=-1, address=None, authkey=None, observer=None):
|
||||
def __init__(self, local_procs=-1, address=None, authkey=None):
|
||||
"""Creates the dispatcher. local_procs should be the number of
|
||||
worker processes to spawn. If it's omitted (or negative)
|
||||
the number of available CPUs is used instead.
|
||||
"""
|
||||
super(MultiprocessingDispatcher, self).__init__(observer=observer)
|
||||
super(MultiprocessingDispatcher, self).__init__()
|
||||
|
||||
# automatic local_procs handling
|
||||
if local_procs < 0:
|
||||
@@ -422,91 +380,3 @@ class MultiprocessingDispatcher(Dispatcher):
|
||||
m.connect()
|
||||
p = MultiprocessingDispatcherProcess(m)
|
||||
p.run()
|
||||
|
||||
class Observer(object):
|
||||
"""
|
||||
"""
|
||||
|
||||
MIN_UPDATE_INTERVAL = 100
|
||||
|
||||
def __init__(self, max_value):
|
||||
self._current_value = None
|
||||
self._max_value = max_value
|
||||
self.start_time = None
|
||||
self.end_time = None
|
||||
|
||||
def start(self):
|
||||
self.start_time = time.time()
|
||||
self.update(0)
|
||||
return self
|
||||
|
||||
def is_started(self):
|
||||
return self.start_time is not None
|
||||
|
||||
def finish(self):
|
||||
self.end_time = time.time()
|
||||
|
||||
def is_finished(self):
|
||||
return self.end_time is not None
|
||||
|
||||
def is_running(self):
|
||||
return self.is_started() and not self.is_finished()
|
||||
|
||||
def add(self, amount):
|
||||
self.update(self.get_current_value() + amount)
|
||||
|
||||
def update(self, current_value):
|
||||
"""
|
||||
"""
|
||||
self._current_value = current_value
|
||||
|
||||
def get_percentage(self):
|
||||
if self.get_max_value() is 0:
|
||||
return 100.0
|
||||
else:
|
||||
return self.get_current_value() * 100.0 / self.get_max_value()
|
||||
|
||||
def get_current_value(self):
|
||||
return self._current_value
|
||||
|
||||
def get_max_value(self):
|
||||
return self._max_value
|
||||
|
||||
class LoggingObserver(Observer):
|
||||
"""
|
||||
"""
|
||||
def update(self, current_value):
|
||||
super(LoggingObserver, self).update(current_value)
|
||||
if self.get_max_value() is None:
|
||||
logging.info("Rendered %d tiles.", self.get_current_value())
|
||||
else:
|
||||
logging.info("Rendered %d of %d. %d%% complete",
|
||||
self.get_current_value(), self.get_max_value(),
|
||||
self.get_percentage())
|
||||
|
||||
default_widgets = ['Rendering: ', progressbar.FractionWidget(), ' (',
|
||||
progressbar.Percentage(), ') ', progressbar.Bar(left='[', right=']'),
|
||||
' ', progressbar.ETA()]
|
||||
class ProgressBarObserver(progressbar.ProgressBar):
|
||||
"""
|
||||
"""
|
||||
|
||||
MIN_UPDATE_INTERVAL = 100
|
||||
|
||||
def __init__(self, max_value, widgets=default_widgets, term_width=None,
|
||||
fd=sys.stderr):
|
||||
super(ProgressBarObserver, self).__init__(maxval=max_value,
|
||||
widgets=widgets, term_width=term_width, fd=fd)
|
||||
|
||||
def is_started(self):
|
||||
return self.start_time is not None
|
||||
|
||||
def finish(self):
|
||||
self._end_time = time.time()
|
||||
super(ProgressBarObserver, self).finish()
|
||||
|
||||
def get_current_value(self):
|
||||
return self.currval
|
||||
|
||||
def get_max_value(self):
|
||||
return self.maxval
|
||||
|
||||
Reference in New Issue
Block a user