Merge remote-tracking branch 'origin/rewrite' into anvil
Conflicts: overviewer_core/world.py
This commit is contained in:
@@ -121,7 +121,7 @@ class Dispatcher(object):
|
|||||||
dispatched_jobs = []
|
dispatched_jobs = []
|
||||||
finished_jobs = []
|
finished_jobs = []
|
||||||
|
|
||||||
pending_jobs_nodeps = map(lambda j: (j[0], j[1]), self._pending_jobs)
|
pending_jobs_nodeps = [(j[0], j[1]) for j in self._pending_jobs]
|
||||||
|
|
||||||
for pending_job in self._pending_jobs:
|
for pending_job in self._pending_jobs:
|
||||||
tileset, workitem, deps = pending_job
|
tileset, workitem, deps = pending_job
|
||||||
@@ -180,6 +180,15 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
|
|||||||
processes access to the communication Queues, and also gives
|
processes access to the communication Queues, and also gives
|
||||||
workers access to the current tileset list.
|
workers access to the current tileset list.
|
||||||
"""
|
"""
|
||||||
|
def _get_job_queue(self):
|
||||||
|
return self.job_queue
|
||||||
|
def _get_results_queue(self):
|
||||||
|
return self.result_queue
|
||||||
|
def _get_signal_queue(self):
|
||||||
|
return self.signal_queue
|
||||||
|
def _get_tileset_data(self):
|
||||||
|
return self.tileset_data
|
||||||
|
|
||||||
def __init__(self, address=None, authkey=None):
|
def __init__(self, address=None, authkey=None):
|
||||||
self.job_queue = multiprocessing.Queue()
|
self.job_queue = multiprocessing.Queue()
|
||||||
self.result_queue = multiprocessing.Queue()
|
self.result_queue = multiprocessing.Queue()
|
||||||
@@ -189,13 +198,19 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
|
|||||||
self.tileset_version = 0
|
self.tileset_version = 0
|
||||||
self.tileset_data = [[], 0]
|
self.tileset_data = [[], 0]
|
||||||
|
|
||||||
self.register("get_job_queue", callable=lambda: self.job_queue)
|
self.register("get_job_queue", callable=self._get_job_queue)
|
||||||
self.register("get_result_queue", callable=lambda: self.result_queue)
|
self.register("get_result_queue", callable=self._get_results_queue)
|
||||||
self.register("get_signal_queue", callable=lambda: self.signal_queue)
|
self.register("get_signal_queue", callable=self._get_signal_queue)
|
||||||
self.register("get_tileset_data", callable=lambda: self.tileset_data, proxytype=multiprocessing.managers.ListProxy)
|
self.register("get_tileset_data", callable=self._get_tileset_data, proxytype=multiprocessing.managers.ListProxy)
|
||||||
|
|
||||||
super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey)
|
super(MultiprocessingDispatcherManager, self).__init__(address=address, authkey=authkey)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_address(cls, address, authkey, serializer):
|
||||||
|
"Required to be implemented to make multiprocessing happy"
|
||||||
|
c = cls(address=address, authkey=authkey)
|
||||||
|
return c
|
||||||
|
|
||||||
def set_tilesets(self, tilesets):
|
def set_tilesets(self, tilesets):
|
||||||
"""This is used in MultiprocessingDispatcher.setup_tilesets to
|
"""This is used in MultiprocessingDispatcher.setup_tilesets to
|
||||||
update the tilesets each worker has access to. It also
|
update the tilesets each worker has access to. It also
|
||||||
@@ -209,11 +224,6 @@ class MultiprocessingDispatcherManager(multiprocessing.managers.BaseManager):
|
|||||||
data[0] = self.tilesets
|
data[0] = self.tilesets
|
||||||
data[1] = self.tileset_version
|
data[1] = self.tileset_version
|
||||||
|
|
||||||
def get_tilesets(self):
|
|
||||||
"""This returns a (tilesetlist, tileset_version) tuple when
|
|
||||||
called from a worker process.
|
|
||||||
"""
|
|
||||||
return self.get_tileset_data()._getvalue()
|
|
||||||
|
|
||||||
class MultiprocessingDispatcherProcess(multiprocessing.Process):
|
class MultiprocessingDispatcherProcess(multiprocessing.Process):
|
||||||
"""This class represents a single worker process. It is created
|
"""This class represents a single worker process. It is created
|
||||||
@@ -227,16 +237,16 @@ class MultiprocessingDispatcherProcess(multiprocessing.Process):
|
|||||||
created in MultiprocessingDispatcher.
|
created in MultiprocessingDispatcher.
|
||||||
"""
|
"""
|
||||||
super(MultiprocessingDispatcherProcess, self).__init__()
|
super(MultiprocessingDispatcherProcess, self).__init__()
|
||||||
self.manager = manager
|
|
||||||
self.job_queue = manager.get_job_queue()
|
self.job_queue = manager.get_job_queue()
|
||||||
self.result_queue = manager.get_result_queue()
|
self.result_queue = manager.get_result_queue()
|
||||||
self.signal_queue = manager.get_signal_queue()
|
self.signal_queue = manager.get_signal_queue()
|
||||||
|
self.tileset_proxy = manager.get_tileset_data()
|
||||||
|
|
||||||
def update_tilesets(self):
|
def update_tilesets(self):
|
||||||
"""A convenience function to update our local tilesets to the
|
"""A convenience function to update our local tilesets to the
|
||||||
current version in use by the MultiprocessingDispatcher.
|
current version in use by the MultiprocessingDispatcher.
|
||||||
"""
|
"""
|
||||||
self.tilesets, self.tileset_version = self.manager.get_tilesets()
|
self.tilesets, self.tileset_version = self.tileset_proxy._getvalue()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""The main work loop. Jobs are pulled from the job queue and
|
"""The main work loop. Jobs are pulled from the job queue and
|
||||||
@@ -302,11 +312,10 @@ class MultiprocessingDispatcher(Dispatcher):
|
|||||||
self.outstanding_jobs = 0
|
self.outstanding_jobs = 0
|
||||||
self.num_workers = 0
|
self.num_workers = 0
|
||||||
self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey)
|
self.manager = MultiprocessingDispatcherManager(address=address, authkey=authkey)
|
||||||
self.job_queue = self.manager.job_queue
|
|
||||||
self.result_queue = self.manager.result_queue
|
|
||||||
self.signal_queue = self.manager.signal_queue
|
|
||||||
|
|
||||||
self.manager.start()
|
self.manager.start()
|
||||||
|
self.job_queue = self.manager.get_job_queue()
|
||||||
|
self.result_queue = self.manager.get_result_queue()
|
||||||
|
self.signal_queue = self.manager.get_signal_queue()
|
||||||
|
|
||||||
# create and fill the pool
|
# create and fill the pool
|
||||||
self.pool = []
|
self.pool = []
|
||||||
|
|||||||
@@ -111,8 +111,8 @@ class World(object):
|
|||||||
# just scan the directory heirarchy to find a directory with .mca
|
# just scan the directory heirarchy to find a directory with .mca
|
||||||
# files.
|
# files.
|
||||||
for root, dirs, files in os.walk(self.worlddir):
|
for root, dirs, files in os.walk(self.worlddir):
|
||||||
# any .mca files in this directory?
|
# any .mcr files in this directory?
|
||||||
mcas = filter(lambda x: x.endswith(".mca"), files)
|
mcas = [x for x in files if x.endswith(".mca")]
|
||||||
if mcas:
|
if mcas:
|
||||||
# construct a regionset object for this
|
# construct a regionset object for this
|
||||||
rset = RegionSet(root)
|
rset = RegionSet(root)
|
||||||
@@ -255,11 +255,11 @@ class RegionSet(object):
|
|||||||
this regionset. Either "nether", "end" or "overworld"
|
this regionset. Either "nether", "end" or "overworld"
|
||||||
"""
|
"""
|
||||||
# path will be normalized in __init__
|
# path will be normalized in __init__
|
||||||
if self.regiondir.endswith("/DIM-1/region"):
|
if self.regiondir.endswith(os.path.normpath("/DIM-1/region")):
|
||||||
return "nether"
|
return "nether"
|
||||||
elif self.regiondir.endswith("/DIM1/region"):
|
elif self.regiondir.endswith(os.path.normpath("/DIM1/region")):
|
||||||
return "end"
|
return "end"
|
||||||
elif self.regiondir.endswith("/region"):
|
elif self.regiondir.endswith(os.path.normpath("/region")):
|
||||||
return "overworld"
|
return "overworld"
|
||||||
else:
|
else:
|
||||||
raise Exception("Woah, what kind of dimension is this! %r" % self.regiondir)
|
raise Exception("Woah, what kind of dimension is this! %r" % self.regiondir)
|
||||||
|
|||||||
Reference in New Issue
Block a user