0

saner timeout handling on MultiprocessingDispatcher

_handle_messages() now *doesn't* block if it's being called simply to
see what jobs are done, but still blocks and times out when it's
*waiting* on jobs to be done.
This commit is contained in:
Aaron Griffith
2011-12-21 06:32:55 -05:00
parent 2863876589
commit e8683fd0bc

View File

@@ -307,22 +307,21 @@ class MultiprocessingDispatcher(Dispatcher):
self.outstanding_jobs += 1 self.outstanding_jobs += 1
# make sure the queue doesn't fill up too much # make sure the queue doesn't fill up too much
finished_jobs = self._handle_messages() finished_jobs = self._handle_messages(timeout=0.0)
while self.outstanding_jobs > self.num_workers * 10: while self.outstanding_jobs > self.num_workers * 10:
finished_jobs += self._handle_messages() finished_jobs += self._handle_messages()
return finished_jobs return finished_jobs
def finish_work(self): def finish_work(self):
# empty the queue # empty the queue
self._handle_messages() self._handle_messages(timeout=0.0)
while self.outstanding_jobs > 0: while self.outstanding_jobs > 0:
self._handle_messages() self._handle_messages()
def _handle_messages(self): def _handle_messages(self, timeout=1.0):
# work function: takes results out of the result queue and # work function: takes results out of the result queue and
# keeps track of how many outstanding jobs remain # keeps track of how many outstanding jobs remain
finished_jobs = [] finished_jobs = []
timeout = 1.0
result_empty = False result_empty = False
signal_empty = False signal_empty = False
@@ -343,7 +342,10 @@ class MultiprocessingDispatcher(Dispatcher):
result_empty = True result_empty = True
if not signal_empty: if not signal_empty:
try: try:
if timeout > 0.0:
name, args, kwargs = self.signal_queue.get(True, timeout) name, args, kwargs = self.signal_queue.get(True, timeout)
else:
name, args, kwargs = self.signal_queue.get(False)
# timeout should only apply once # timeout should only apply once
timeout = 0.0 timeout = 0.0