0

regionTrimmer.py Python3 refactor

This commit is contained in:
Ben Steadman
2019-03-26 07:45:46 +00:00
parent ced67cf317
commit 9121a4b080
2 changed files with 180 additions and 95 deletions

View File

@@ -1,40 +1,51 @@
#!/usr/bin/env python #!/usr/bin/env python3
"""Deletes outlying and unconnected regions""" """Deletes outlying and unconnected regions"""
import argparse
import logging import logging
import os from pathlib import Path
import sys
import glob
import networkx import networkx
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
def get_region_file_from_node(regionset_path, node): def get_region_file_from_node(regionset_path, node):
return os.path.join(regionset_path, 'r.%d.%d.mca' % node) return regionset_path / ('r.%d.%d.mca' % node)
def get_nodes(regionset_path): def get_nodes(regionset_path):
return [tuple(map(int, r.split('.')[1:3])) \ return [
for r in glob.glob(os.path.join(regionset_path, 'r.*.*.mca'))] tuple(int(x) for x in r.stem.split('.')[1:3])
for r in regionset_path.glob('r.*.*.mca')
]
def generate_edges(graph): def generate_edges(graph):
offsets = (-1, 1) offsets = (-1, 1)
nodes = graph.nodes() nodes = graph.nodes()
for node in nodes: for node in nodes:
for offset in offsets: for offset in offsets:
graph.add_edges_from((node, offset_node) for offset_node in \ graph.add_edges_from(
[(node[0] + offset, node[1]), (node[0], node[1] + offset), \ (node, offset_node)
(node[0] + offset, node[1] + offset)] \ for offset_node in [
if offset_node in nodes) (node[0] + offset, node[1]),
(node[0], node[1] + offset),
(node[0] + offset, node[1] + offset),
]
if offset_node in nodes
)
return graph return graph
def generate_subgraphs(nodes): def generate_subgraphs(nodes):
graph = networkx.Graph() graph = networkx.Graph()
graph.add_nodes_from(nodes) graph.add_nodes_from(nodes)
generate_edges(graph) generate_edges(graph)
return graph, networkx.connected_component_subgraphs(graph) return graph, [graph.subgraph(c) for c in networkx.connected_components(graph)]
def get_graph_bounds(graph): def get_graph_bounds(graph):
nodes = graph.nodes() nodes = graph.nodes()
@@ -45,114 +56,130 @@ def get_graph_bounds(graph):
min(n[1] for n in nodes), min(n[1] for n in nodes),
) )
def get_graph_center_by_bounds(bounds): def get_graph_center_by_bounds(bounds):
dx = bounds[0] - bounds[1] dx = bounds[0] - bounds[1]
dy = bounds[2] - bounds[3] dy = bounds[2] - bounds[3]
return (dx / 2 + bounds[1], dy / 2 + bounds[3]) return (dx // 2 + bounds[1], dy // 2 + bounds[3])
def main(*args, **options):
if len(args) < 1: def trim_regions(graph, regions_path, dry_run=True, filter_func=lambda n: True):
logger.error('Missing region directory argument') regions = [
return (n, get_region_file_from_node(regions_path, n))
for path in args: for n in graph.nodes()
logger.info('Using regionset path: %s', path) if filter_func(n)
]
logger.info("Trimming regions: %s", ", ".join(x[1] for x in regions))
for n, region_file in regions:
graph.remove_node(n)
if dry_run is False:
unlink_file(region_file)
def is_outside_main(center, main_section_bounds):
return center[0] <= main_section_bounds[0] and center[0] >= main_section_bounds[1] and \
center[1] <= main_section_bounds[2] and center[1] >= main_section_bounds[3]
def is_outside_bounds(node, trim_center, trim_bounds):
return node[0] >= trim_center[0] + trim_bounds[0] or \
node[0] <= trim_center[0] - trim_bounds[0] or \
node[1] >= trim_center[1] + trim_bounds[1] or \
node[1] <= trim_center[1] - trim_bounds[1]
def unlink_file(path):
try:
path.unlink()
except OSError as err:
logger.warn("Unable to delete file: %s", path)
logger.warn("Error recieved was: %s", err)
def main(args):
for path in args.paths:
logger.info("Using regionset path: %s", path)
nodes = get_nodes(path) nodes = get_nodes(path)
if not len(nodes): if not len(nodes):
logger.error('Found no nodes, are you sure there are .mca files in %s ?', logger.error("Found no nodes, are you sure there are .mca files in %s ?",
path) path)
return return
logger.info('Found %d nodes', len(nodes)) logger.info("Found %d nodes", len(nodes))
logger.info('Generating graphing nodes...') logger.info("Generating graphing nodes...")
graph, subgraphs = generate_subgraphs(nodes) graph, subgraphs = generate_subgraphs(nodes)
assert len(graph.nodes()) == sum(len(sg.nodes()) for sg in subgraphs) assert len(graph.nodes()) == sum(len(sg.nodes()) for sg in subgraphs)
if len(subgraphs) == 1: if len(subgraphs) == 1:
logger.warn('All regions are contiguous, the needful is done!') logger.warn("All regions are contiguous, the needful is done!")
return return
logger.info('Found %d discrete region sections', len(subgraphs)) logger.info("Found %d discrete region sections", len(subgraphs))
subgraphs = sorted(subgraphs, key=lambda sg: len(sg), reverse=True) subgraphs = sorted(subgraphs, key=lambda sg: len(sg), reverse=True)
for i, sg in enumerate(subgraphs): for i, sg in enumerate(subgraphs):
logger.info('Region section #%02d: %04d nodes', i+1, len(sg.nodes())) logger.info("Region section #%02d: %04d nodes", i + 1, len(sg.nodes()))
bounds = get_graph_bounds(sg) bounds = get_graph_bounds(sg)
logger.info('Bounds: %d <-> %d x %d <-> %d', *get_graph_bounds(sg)) logger.info("Bounds: %d <-> %d x %d <-> %d", *get_graph_bounds(sg))
center = get_graph_center_by_bounds(bounds) center = get_graph_center_by_bounds(bounds)
logger.info('Center: %d x %d', *center) logger.info("Center: %d x %d", *center)
main_section = subgraphs[0] main_section = subgraphs[0]
main_section_bounds = get_graph_bounds(main_section) main_section_bounds = get_graph_bounds(main_section)
main_section_center = get_graph_center_by_bounds(main_section_bounds) main_section_center = get_graph_center_by_bounds(main_section_bounds)
logger.info('Using %d node graph as main section,', len(main_section.nodes())) logger.info("Using %d node graph as main section,", len(main_section.nodes()))
satellite_sections = subgraphs[1:] satellite_sections = subgraphs[1:]
for ss in satellite_sections: for ss in satellite_sections:
bounds = get_graph_bounds(ss) bounds = get_graph_bounds(ss)
center = get_graph_center_by_bounds(bounds) center = get_graph_center_by_bounds(bounds)
logger.info('Checking satellite section with %d nodes, %d <-> %d x %d <-> %d bounds and %d x %d center', logger.info(("Checking satellite section with %d nodes, "
len(ss.nodes()), *(bounds + center)) "%d <-> %d x %d <-> %d bounds and %d x %d center"),
if options['trim_disconnected']: len(ss.nodes()), *(bounds + center))
logger.info('Trimming regions: %s', ', '.join(
get_region_file_from_node(path, n) for n in ss.nodes())) if args.trim_disconnected:
for n, region_file in ((n, get_region_file_from_node(path, n)) \ trim_regions(ss, path, dry_run=args.dry_run)
for n in ss.nodes()):
ss.remove_node(n) if args.trim_outside_main:
if not options['dry_run']: if is_outside_main(ss, center, main_section_bounds):
unlink_file(region_file) logger.info("Section is outside main section bounds")
if options['trim_outside_main']: trim_regions(ss, path, dry_run=args.dry_run)
if center[0] <= main_section_bounds[0] and center[0] >= main_section_bounds[1] and \
center[1] <= main_section_bounds[2] and center[1] >= main_section_bounds[3]:
logger.info('Section falls inside main section bounds, ignoring')
else: else:
logger.info('Section is outside main section bounds') logger.info("Section falls inside main section bounds, ignoring")
logger.info('Trimming regions: %s', ', '.join(
get_region_file_from_node(path, n) for n in ss.nodes()))
for n, region_file in ((n, get_region_file_from_node(path, n)) \
for n in ss.nodes()):
ss.remove_node(n)
if not options['dry_run']:
unlink_file(region_file)
if options['trim_outside_bounds']:
x = map(int, options['trim_outside_bounds'].split(','))
if len(x) == 4:
trim_center = x[:2]
trim_bounds = x[2:]
elif len(x) == 2:
trim_center = main_section_center
trim_bounds = x
else:
logger.error('Invalid center/bound value: %s',
options['trim_outside_bounds'])
continue
for node in ss.nodes():
if node[0] >= trim_center[0] + trim_bounds[0] or \
node[0] <= trim_center[0] - trim_bounds[0] or \
node[1] >= trim_center[1] + trim_bounds[1] or \
node[1] <= trim_center[1] - trim_bounds[1]:
region_file = get_region_file_from_node(path, node)
logger.info('Region falls outside specified bounds, trimming: %s',
region_file)
ss.remove_node(node)
if not options['dry_run']:
unlink_file(region_file)
def unlink_file(path): if args.trim_outside_bounds:
try: logger.info("Checking regions outside specified bounds")
os.unlink(path) trim_center = args.trim_outside_bounds.get("center", main_section_center)
except OSError as err: trim_bounds = args.trim_outside_bounds["bounds"]
logger.warn('Unable to delete file: %s', path) trim_regions(ss, path, dry_run=args.dry_run,
logger.warn('Error recieved was: %s', err) filter_func=lambda n: is_outside_bounds(n, trim_center, trim_bounds))
if __name__ == '__main__': def dir_path(path):
import optparse p = Path(path)
if not p.is_dir():
raise argparse.ArgumentTypeError("Not a valid directory path")
return p
def center_bound(value):
x = [int(v) for v in value.split(",")]
if len(x) == 4:
return {"center": x[:2], "bounds": x[2:]}
elif len(x) == 2:
return {"bounds": x}
else:
raise argparse.ArgumentTypeError("Invalid center/bound value")
if __name__ == "__main__":
logging.basicConfig() logging.basicConfig()
parser = optparse.OptionParser(
usage='Usage: %prog [options] <path/to/region/directory>') parser = argparse.ArgumentParser(description=__doc__)
parser.add_option('-D', '--trim-disconnected', action='store_true', default=False, parser.add_argument("paths", metavar="<path/to/region/directory>", nargs="+", type=dir_path)
help='Trim all disconnected regions') parser.add_argument("-D", "--trim-disconnected", action="store_true",
parser.add_option('-M', '--trim-outside-main', action='store_true', default=False, default=False, help="Trim all disconnected regions")
help='Trim disconnected regions outside main section bounds') parser.add_argument("-M", "--trim-outside-main", action="store_true",
parser.add_option('-B', '--trim-outside-bounds', default=False, default=False, help="Trim disconnected regions outside main section bounds")
metavar='[center_X,center_Y,]bound_X,bound_Y', parser.add_argument("-B", "--trim-outside-bounds",
help='Trim outside given bounds (given as [center_X,center_Y,]bound_X,bound_Y)') metavar="[center_X,center_Y,]bound_X,bound_Y", type=center_bound,
parser.add_option('-n', '--dry-run', action='store_true', default=False, help=("Trim outside given bounds "
help='Don\'t actually delete anything') "(given as [center_X,center_Y,]bound_X,bound_Y)"))
opts, args = parser.parse_args() parser.add_argument("-n", "--dry-run", action="store_true", default=False,
main(*args, **vars(opts)) help="Don't actually delete anything")
args = parser.parse_args()
main(args)

View File

@@ -0,0 +1,58 @@
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
import networkx
import contrib.regionTrimmer as region_trimmer
class TestRegionTrimmer(unittest.TestCase):
def test_get_nodes(self):
coords = [(0, 0), (0, -1), (-1, 0), (-1, -1)]
with TemporaryDirectory() as tmpdirname:
region_file = Path(tmpdirname)
for x, z in coords:
region_fname = "r.{x}.{z}.mca".format(x=x, z=z)
(region_file / region_fname).touch()
nodes = region_trimmer.get_nodes(region_file)
self.assertListEqual(sorted(nodes), sorted(coords))
def test_get_nodes_returns_empty_list_when_no_region_files(self):
with TemporaryDirectory() as tmpdirname:
region_file = Path(tmpdirname)
(region_file / "not_region_file.txt").touch()
nodes = region_trimmer.get_nodes(region_file)
self.assertListEqual(nodes, [])
def test_get_region_file_from_node(self):
node = (0, 0)
regionset_path = Path('/path/to/regions')
self.assertEqual(region_trimmer.get_region_file_from_node(
regionset_path, node), Path('/path/to/regions/r.0.0.mca'))
def test_get_graph_bounds(self):
""" Should return (max_x, min_x, max_z, min_z) of all nodes
"""
graph = networkx.Graph()
graph.add_nodes_from([(0, 0), (0, -1), (-1, 0), (-1, -1)])
self.assertEqual(region_trimmer.get_graph_bounds(graph), (0, -1, 0, -1))
def test_get_graph_center_by_bounds(self):
self.assertEqual(region_trimmer.get_graph_center_by_bounds((0, -1, 0, -1)), (-1, -1))
def test_generate_edges(self):
graph = networkx.Graph()
graph.add_nodes_from([(0, 0), (0, -1), (-1, 0), (-1, -1)])
graph = region_trimmer.generate_edges(graph)
expected = [((-1, 0), (-1, -1)),
((0, -1), (-1, -1)),
((0, 0), (-1, -1)),
((0, 0), (-1, 0)),
((0, 0), (0, -1))]
self.assertListEqual(sorted(list(graph.edges)), expected)