0

Uses a shared semaphore to spawn new processes when needed.

This more effectively utilizes as many cores as you tell it. It should
now spawn a new process whenever an old branch of the recursive tree
finishes, to always use as many processes as you specify.
This commit is contained in:
Andrew Brown
2010-09-11 13:21:13 -04:00
parent be146385e4
commit 9309fd6c96

View File

@@ -349,9 +349,10 @@ def generate_quadtree(chunkmap, colstart, colend, rowstart, rowend, prefix, proc
# procs is -1 here since the main process always runs as well, only spawn
# procs-1 /new/ processes
quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, "base", procs-1)
sem = multiprocessing.BoundedSemaphore(procs-1)
quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, "base", sem)
def quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, quadrant, procs):
def quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, quadrant, sem):
"""Recursive method that generates a quadtree.
A single call generates, saves, and returns an image with the range
specified by colstart,colend,rowstart, and rowend.
@@ -386,6 +387,13 @@ def quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, quadr
Each tile outputted is always 384 by 384 pixels.
The last parameter, sem, should be a multiprocessing.Semaphore or
BoundedSemaphore object. Before each recursive call, the semaphore is
acquired without blocking. If the acquire is successful, the recursive call
will spawn a new process. If it is not successful, the recursive call is
run in the same thread. The semaphore is passed to each recursive call, so
any call could spawn new processes if another one exits at some point.
The return from this function is (path, hash) where path is the path to the
file saved, and hash is a byte string that depends on the tile's contents.
If the tile is blank, path will be None, but hash will still be valid.
@@ -481,34 +489,31 @@ def quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, quadr
# Recurse to generate each quadrant of images
# Quadrent 1:
if procs > 0:
if sem.acquire(False):
Procobj = ReturnableProcess
procs -= 1
else:
Procobj = FakeProcess
quad0result = Procobj(target=quadtree_recurse,
args=(chunkmap, colstart, colmid, rowstart, rowmid, newprefix, "0", 0)
quad0result = Procobj(sem, target=quadtree_recurse,
args=(chunkmap, colstart, colmid, rowstart, rowmid, newprefix, "0", sem)
)
quad0result.start()
if procs > 0:
if sem.acquire(False):
Procobj = ReturnableProcess
procs -= 1
else:
Procobj = FakeProcess
quad1result = Procobj(target=quadtree_recurse,
args=(chunkmap, colmid, colend, rowstart, rowmid, newprefix, "1", 0)
quad1result = Procobj(sem, target=quadtree_recurse,
args=(chunkmap, colmid, colend, rowstart, rowmid, newprefix, "1", sem)
)
quad1result.start()
if procs > 0:
if sem.acquire(False):
Procobj = ReturnableProcess
procs -= 1
else:
Procobj = FakeProcess
quad2result = Procobj(target=quadtree_recurse,
args=(chunkmap, colstart, colmid, rowmid, rowend, newprefix, "2", 0)
quad2result = Procobj(sem, target=quadtree_recurse,
args=(chunkmap, colstart, colmid, rowmid, rowend, newprefix, "2", sem)
)
quad2result.start()
@@ -516,7 +521,7 @@ def quadtree_recurse(chunkmap, colstart, colend, rowstart, rowend, prefix, quadr
# since we're just going to turn around and wait for it.
quad3file, hash3 = quadtree_recurse(chunkmap,
colmid, colend, rowmid, rowend,
newprefix, "3", 0)
newprefix, "3", sem)
quad0file, hash0 = quad0result.get()
quad1file, hash1 = quad1result.get()
@@ -603,10 +608,17 @@ def remove_tile(prefix, quadrent):
class ReturnableProcess(multiprocessing.Process):
"""Like the standard multiprocessing.Process class, but the return value of
the target method is available by calling get()"""
the target method is available by calling get().
The given semaphore is released when the target finishes running"""
def __init__(self, semaphore, *args, **kwargs):
self.__sem = semaphore
multiprocessing.Process.__init__(self, *args, **kwargs)
def run(self):
results = self._target(*self._args, **self._kwargs)
self._respipe_in.send(results)
self.__sem.release()
def get(self):
self.join()
@@ -621,11 +633,11 @@ class FakeProcess(object):
Used to make the code simpler in quadtree_recurse
"""
def __init__(self, target, args=None, kwargs=None):
def __init__(self, semaphore, target, args=None, kwargs=None):
self._target = target
self._args = args if args else ()
self._kwargs = kwargs if kwargs else {}
def start(self):
return
self.ret = self._target(*self._args, **self._kwargs)
def get(self):
return self._target(*self._args, **self._kwargs)
return self.ret