0

dispatcher status callback

Right now it's called every 100 work items. This number is completely
arbitrary and should probably be tuned.
This commit is contained in:
Aaron Griffith
2011-12-26 10:36:52 -05:00
parent 740f5ee628
commit 24d8dc4b99
2 changed files with 59 additions and 4 deletions

View File

@@ -38,11 +38,14 @@ class Dispatcher(object):
# list of (tileset, workitem, dependencies) tuples
# keeps track of jobs waiting to run after dependencies finish
self._pending_jobs = []
def render_all(self, tilesetlist, status_callback):
"""Render all of the tilesets in the given
tilesetlist. status_callback is called periodically to update
status. """
status. The callback should take the following arguments:
(phase, items_completed, total_items), where total_items may
be none if there is no useful estimate.
"""
# TODO use status callback
# setup tilesetlist
@@ -63,14 +66,52 @@ class Dispatcher(object):
return ((tset, workitem) for workitem in tset.iterate_work_items(p))
work_iterators.append(make_work_iterator(tileset, phase))
# keep track of total jobs, and how many jobs are done
total_jobs = 0
for tileset, phases in zip(tilesetlist, num_phases):
if phase < phases:
jobs_for_tileset = tileset.get_phase_length(phase)
# if one is unknown, the total is unknown
if jobs_for_tileset is None:
total_jobs = None
break
else:
total_jobs += jobs_for_tileset
finished_jobs = 0
# do the first status update
self._status_update(status_callback, phase, finished_jobs, total_jobs, force=True)
# go through these iterators round-robin style
for tileset, (workitem, deps) in util.roundrobin(work_iterators):
self._pending_jobs.append((tileset, workitem, deps))
self._dispatch_jobs()
finished_jobs += self._dispatch_jobs()
self._status_update(status_callback, phase, finished_jobs, total_jobs)
# after each phase, wait for the work to finish
while len(self._pending_jobs) > 0 or len(self._running_jobs) > 0:
self._dispatch_jobs()
finished_jobs += self._dispatch_jobs()
self._status_update(status_callback, phase, finished_jobs, total_jobs)
def _status_update(self, callback, phase, completed, total, force=False):
# always called with force=True at the beginning, so that can
# be used to set up state. After that, it is called after
# every _dispatch_jobs() often; this function is used to
# decide how often the actual status callback should be
# called.
if force:
self._last_status_update = completed
if callback:
callback(phase, completed, total)
return
if callback is None:
return
update_interval = 100 # XXX arbitrary
if self._last_status_update < 0 or completed >= self._last_status_update + update_interval or completed < self._last_status_update:
self._last_status_update = completed
callback(phase, completed, total)
def _dispatch_jobs(self):
# helper function to dispatch pending jobs when their
@@ -104,6 +145,8 @@ class Dispatcher(object):
self._running_jobs.remove(job)
for job in dispatched_jobs:
self._pending_jobs.remove(job)
return len(finished_jobs)
def close(self):
"""Close the Dispatcher. This should be called when you are