From 9309fd6c9687ae86f18f242b5d0e14f4b03b0f46 Mon Sep 17 00:00:00 2001 From: Andrew Brown Date: Sat, 11 Sep 2010 13:21:13 -0400 Subject: [PATCH] Uses a shared semaphore to spawn new processes when needed. This more effectively utilizes as many cores as you tell it. It should now spawn a new process whenever an old branch of the recursive tree finishes, to always use as many processes as you specify. --- world.py | 50 +++++++++++++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/world.py b/world.py index 5a077a2..79aed63 100644 --- a/world.py +++ b/world.py @@ -349,9 +349,10 @@ def generate_quadtree(chunkmap, colstart, colend, rowstart, rowend, prefix, proc # procs is -1 here since the main process always runs as well, only spawn # procs-1 /new/ processes - quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, "base", procs-1) + sem = multiprocessing.BoundedSemaphore(procs-1) + quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, "base", sem) -def quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, quadrant, procs): +def quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, quadrant, sem): """Recursive method that generates a quadtree. A single call generates, saves, and returns an image with the range specified by colstart,colend,rowstart, and rowend. @@ -386,6 +387,13 @@ def quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, quadr Each tile outputted is always 384 by 384 pixels. + The last parameter, sem, should be a multiprocessing.Semaphore or + BoundedSemaphore object. Before each recursive call, the semaphore is + acquired without blocking. If the acquire is successful, the recursive call + will spawn a new process. If it is not successful, the recursive call is + run in the same thread. The semaphore is passed to each recursive call, so + any call could spawn new processes if another one exits at some point. + The return from this function is (path, hash) where path is the path to the file saved, and hash is a byte string that depends on the tile's contents. If the tile is blank, path will be None, but hash will still be valid. @@ -481,34 +489,31 @@ def quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, quadr # Recurse to generate each quadrant of images # Quadrent 1: - if procs > 0: + if sem.acquire(False): Procobj = ReturnableProcess - procs -= 1 else: Procobj = FakeProcess - quad0result = Procobj(target=quadtree_recurse, - args=(chunkmap, colstart, colmid, rowstart, rowmid, newprefix, "0", 0) + quad0result = Procobj(sem, target=quadtree_recurse, + args=(chunkmap, colstart, colmid, rowstart, rowmid, newprefix, "0", sem) ) quad0result.start() - if procs > 0: + if sem.acquire(False): Procobj = ReturnableProcess - procs -= 1 else: Procobj = FakeProcess - quad1result = Procobj(target=quadtree_recurse, - args=(chunkmap, colmid, colend, rowstart, rowmid, newprefix, "1", 0) + quad1result = Procobj(sem, target=quadtree_recurse, + args=(chunkmap, colmid, colend, rowstart, rowmid, newprefix, "1", sem) ) quad1result.start() - if procs > 0: + if sem.acquire(False): Procobj = ReturnableProcess - procs -= 1 else: Procobj = FakeProcess - quad2result = Procobj(target=quadtree_recurse, - args=(chunkmap, colstart, colmid, rowmid, rowend, newprefix, "2", 0) + quad2result = Procobj(sem, target=quadtree_recurse, + args=(chunkmap, colstart, colmid, rowmid, rowend, newprefix, "2", sem) ) quad2result.start() @@ -516,7 +521,7 @@ def quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, quadr # since we're just going to turn around and wait for it. quad3file, hash3 = quadtree_recurse(chunkmap, colmid, colend, rowmid, rowend, - newprefix, "3", 0) + newprefix, "3", sem) quad0file, hash0 = quad0result.get() quad1file, hash1 = quad1result.get() @@ -603,10 +608,17 @@ def remove_tile(prefix, quadrent): class ReturnableProcess(multiprocessing.Process): """Like the standard multiprocessing.Process class, but the return value of - the target method is available by calling get()""" + the target method is available by calling get(). + + The given semaphore is released when the target finishes running""" + def __init__(self, semaphore, *args, **kwargs): + self.__sem = semaphore + multiprocessing.Process.__init__(self, *args, **kwargs) + def run(self): results = self._target(*self._args, **self._kwargs) self._respipe_in.send(results) + self.__sem.release() def get(self): self.join() @@ -621,11 +633,11 @@ class FakeProcess(object): Used to make the code simpler in quadtree_recurse """ - def __init__(self, target, args=None, kwargs=None): + def __init__(self, semaphore, target, args=None, kwargs=None): self._target = target self._args = args if args else () self._kwargs = kwargs if kwargs else {} def start(self): - return + self.ret = self._target(*self._args, **self._kwargs) def get(self): - return self._target(*self._args, **self._kwargs) + return self.ret