0

dispatcher: code style fixes

This commit is contained in:
Nicolas F
2019-07-24 09:29:13 +02:00
parent da80e50022
commit cf74500efa

View File

@@ -13,14 +13,16 @@
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with the Overviewer. If not, see <http://www.gnu.org/licenses/>. # with the Overviewer. If not, see <http://www.gnu.org/licenses/>.
from . import util
import multiprocessing import multiprocessing
import multiprocessing.managers import multiprocessing.managers
import queue import queue
import time import time
from . import util
from .signals import Signal from .signals import Signal
class Dispatcher(object):
class Dispatcher:
"""This class coordinates the work of all the TileSet objects """This class coordinates the work of all the TileSet objects
among one worker process. By subclassing this class and among one worker process. By subclassing this class and
implementing setup_tilesets(), dispatch(), and close(), it is implementing setup_tilesets(), dispatch(), and close(), it is
@@ -79,7 +81,7 @@ class Dispatcher(object):
observer.add(self._dispatch_jobs()) observer.add(self._dispatch_jobs())
# after each phase, wait for the work to finish # after each phase, wait for the work to finish
while len(self._pending_jobs) > 0 or len(self._running_jobs) > 0: while self._pending_jobs or self._running_jobs:
observer.add(self._dispatch_jobs()) observer.add(self._dispatch_jobs())
observer.finish() observer.finish()
@@ -108,7 +110,7 @@ class Dispatcher(object):
# make sure to at least get finished jobs, even if we don't # make sure to at least get finished jobs, even if we don't
# submit any new ones... # submit any new ones...
if len(dispatched_jobs) == 0: if not dispatched_jobs:
finished_jobs += self.dispatch(None, None) finished_jobs += self.dispatch(None, None)
# clean out the appropriate lists # clean out the appropriate lists
@@ -139,11 +141,12 @@ class Dispatcher(object):
that have completed since the last call. If tileset is None, that have completed since the last call. If tileset is None,
then returning completed jobs is all this function should do. then returning completed jobs is all this function should do.
""" """
if not tileset is None: if tileset is not None:
tileset.do_work(workitem) tileset.do_work(workitem)
return [(tileset, workitem),] return [(tileset, workitem)]
return [] return []
class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager): class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
"""This multiprocessing manager is responsible for giving worker """This multiprocessing manager is responsible for giving worker
processes access to the communication Queues, and also gives processes access to the communication Queues, and also gives
@@ -151,10 +154,13 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
""" """
def _get_job_queue(self): def _get_job_queue(self):
return self.job_queue return self.job_queue
def _get_results_queue(self): def _get_results_queue(self):
return self.result_queue return self.result_queue
def _get_signal_queue(self): def _get_signal_queue(self):
return self.signal_queue return self.signal_queue
def _get_tileset_data(self): def _get_tileset_data(self):
return self.tileset_data return self.tileset_data
@@ -170,7 +176,8 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
self.register("get_job_queue", callable=self._get_job_queue) self.register("get_job_queue", callable=self._get_job_queue)
self.register("get_result_queue", callable=self._get_results_queue) self.register("get_result_queue", callable=self._get_results_queue)
self.register("get_signal_queue", callable=self._get_signal_queue) self.register("get_signal_queue", callable=self._get_signal_queue)
self.register("get_tileset_data", callable=self._get_tileset_data, proxytype=multiprocessing.managers.ListProxy) self.register("get_tileset_data", callable=self._get_tileset_data,
proxytype=multiprocessing.managers.ListProxy)
super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey) super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey)
@@ -243,7 +250,7 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
while True: while True:
try: try:
job = self.job_queue.get(True, timeout) job = self.job_queue.get(True, timeout)
if job == None: if job is None:
# this is a end-of-jobs sentinel # this is a end-of-jobs sentinel
return return
@@ -264,6 +271,7 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
except KeyboardInterrupt: except KeyboardInterrupt:
return return
class MultiprocessingDispatcher(Dispatcher): class MultiprocessingDispatcher(Dispatcher):
"""A subclass of Dispatcher that spawns worker processes and """A subclass of Dispatcher that spawns worker processes and
distributes jobs to them to speed up processing. distributes jobs to them to speed up processing.
@@ -344,7 +352,7 @@ class MultiprocessingDispatcher(Dispatcher):
try: try:
result = self.result_queue.get(False) result = self.result_queue.get(False)
if result != None: if result is not None:
# completed job # completed job
ti, workitem, ret = result ti, workitem, ret = result
finished_jobs.append((self.manager.tilesets[ti], workitem)) finished_jobs.append((self.manager.tilesets[ti], workitem))