diff --git a/opendm/concurrency.py b/opendm/concurrency.py index 53dea176..910cf6be 100644 --- a/opendm/concurrency.py +++ b/opendm/concurrency.py @@ -63,8 +63,10 @@ def parallel_map(func, items, max_workers=1): t.start() threads.append(t) + i = 1 for t in items: pq.put((i, t.copy())) + i += 1 def stop_workers(): for i in range(len(threads)): diff --git a/opendm/context.py b/opendm/context.py index 60963d79..2e2f4010 100644 --- a/opendm/context.py +++ b/opendm/context.py @@ -13,9 +13,13 @@ tests_path = os.path.join(root_path, 'tests') tests_data_path = os.path.join(root_path, 'tests/test_data') # add opencv,opensfm to python path -sys.path.append(os.path.join(superbuild_path, 'install/lib/python3.6/dist-packages')) -sys.path.append(os.path.join(superbuild_path, 'install/lib/python3/dist-packages')) -sys.path.append(os.path.join(superbuild_path, 'src/opensfm')) +python_packages_paths = [os.path.join(superbuild_path, p) for p in [ + 'install/lib/python3.6/dist-packages', + 'install/lib/python3/dist-packages', + 'src/opensfm' +]] +for p in python_packages_paths: + sys.path.append(p) # define opensfm path diff --git a/opendm/dem/commands.py b/opendm/dem/commands.py index 0d4683d6..5b2269a5 100755 --- a/opendm/dem/commands.py +++ b/opendm/dem/commands.py @@ -9,7 +9,7 @@ from opendm.system import run from opendm import point_cloud from opendm import io from opendm import system -from opendm.concurrency import get_max_memory +from opendm.concurrency import get_max_memory, parallel_map from scipy import ndimage from datetime import datetime from opendm import log @@ -81,8 +81,6 @@ def create_dem(input_point_cloud, dem_type, output_type='max', radiuses=['0.56'] apply_smoothing=True): """ Create DEM from multiple radii, and optionally gapfill """ - # TODO: refactor to use concurrency.parallel_map - global error error = None @@ -164,7 +162,7 @@ def create_dem(input_point_cloud, dem_type, output_type='max', radiuses=['0.56'] # Sort tiles by increasing radius tiles.sort(key=lambda t: float(t['radius']), reverse=True) - def process_one(q): + def process_tile(q): log.ODM_INFO("Generating %s (%s, radius: %s, resolution: %s)" % (q['filename'], output_type, q['radius'], resolution)) d = pdal.json_gdal_base(q['filename'], output_type, q['radius'], resolution, q['bounds']) @@ -178,63 +176,7 @@ def create_dem(input_point_cloud, dem_type, output_type='max', radiuses=['0.56'] pdal.json_add_readers(d, [input_point_cloud]) pdal.run_pipeline(d, verbose=verbose) - def worker(): - global error - - while True: - (num, q) = pq.get() - if q is None or error is not None: - pq.task_done() - break - - try: - process_one(q) - except Exception as e: - error = e - finally: - pq.task_done() - - if max_workers > 1: - use_single_thread = False - pq = queue.PriorityQueue() - threads = [] - for i in range(max_workers): - t = threading.Thread(target=worker) - t.start() - threads.append(t) - - for t in tiles: - pq.put((i, t.copy())) - - def stop_workers(): - for i in range(len(threads)): - pq.put((-1, None)) - for t in threads: - t.join() - - # block until all tasks are done - try: - while pq.unfinished_tasks > 0: - time.sleep(0.5) - except KeyboardInterrupt: - print("CTRL+C terminating...") - stop_workers() - sys.exit(1) - - stop_workers() - - if error is not None: - # Try to reprocess using a single thread - # in case this was a memory error - log.ODM_WARNING("DEM processing failed with multiple threads, let's retry with a single thread...") - use_single_thread = True - else: - use_single_thread = True - - if use_single_thread: - # Boring, single thread processing - for q in tiles: - process_one(q) + parallel_map(process_tile, tiles, max_workers) output_file = "%s.tif" % dem_type output_path = os.path.abspath(os.path.join(outdir, output_file)) diff --git a/opendm/dem/pdal.py b/opendm/dem/pdal.py index 11d9d31e..cea98c86 100644 --- a/opendm/dem/pdal.py +++ b/opendm/dem/pdal.py @@ -148,7 +148,7 @@ def run_pipeline(json, verbose=False): f, jsonfile = tempfile.mkstemp(suffix='.json') if verbose: log.ODM_INFO('Pipeline file: %s' % jsonfile) - os.write(f, jsonlib.dumps(json)) + os.write(f, jsonlib.dumps(json).encode('utf8')) os.close(f) cmd = [ diff --git a/opendm/point_cloud.py b/opendm/point_cloud.py index 5ac13164..60e787ec 100644 --- a/opendm/point_cloud.py +++ b/opendm/point_cloud.py @@ -16,7 +16,7 @@ def ply_info(input_ply): has_normals = False vertex_count = 0 - with open(input_ply, 'r') as f: + with open(input_ply, 'r', errors='ignore') as f: line = f.readline().strip().lower() i = 0 while line != "end_header": @@ -251,10 +251,10 @@ def fast_merge_ply(input_point_cloud_files, output_file): vertex_count = sum([ply_info(pcf)['vertex_count'] for pcf in input_point_cloud_files]) master_file = input_point_cloud_files[0] with open(output_file, "wb") as out: - with open(master_file, "r") as fhead: + with open(master_file, "r", errors="ignore") as fhead: # Copy header line = fhead.readline() - out.write(line) + out.write(line.encode('utf8')) i = 0 while line.strip().lower() != "end_header": @@ -262,9 +262,9 @@ def fast_merge_ply(input_point_cloud_files, output_file): # Intercept element vertex field if line.lower().startswith("element vertex "): - out.write("element vertex %s\n" % vertex_count) - else: - out.write(line) + out.write(("element vertex %s\n" % vertex_count).encode('utf8')) + else: + out.write(line.encode('utf8')) i += 1 if i > 100: @@ -275,7 +275,7 @@ def fast_merge_ply(input_point_cloud_files, output_file): with open(ipc, "rb") as fin: # Skip header line = fin.readline() - while line.strip().lower() != "end_header": + while line.strip().lower() != b"end_header": line = fin.readline() i += 1 diff --git a/opendm/progress.py b/opendm/progress.py index bd9975d2..264db82d 100644 --- a/opendm/progress.py +++ b/opendm/progress.py @@ -32,8 +32,9 @@ class Broadcaster: global_progress = 100 try: - sock.sendto("PGUP/{}/{}/{}".format(self.pid, self.project_name, float(global_progress)), (UDP_IP, self.port)) - except: - log.ODM_WARNING("Failed to broadcast progress update on UDP port %s" % str(self.port)) + sock.sendto("PGUP/{}/{}/{}".format(self.pid, self.project_name, float(global_progress)).encode('utf8'), + (UDP_IP, self.port)) + except Exception as e: + log.ODM_WARNING("Failed to broadcast progress update on UDP port %s (%s)" % (str(self.port), str(e))) progressbc = Broadcaster(PROGRESS_BROADCAST_PORT) \ No newline at end of file diff --git a/opendm/system.py b/opendm/system.py index f5512ae7..b183b062 100644 --- a/opendm/system.py +++ b/opendm/system.py @@ -53,7 +53,7 @@ def sighandler(signum, frame): signal.signal(signal.SIGINT, sighandler) signal.signal(signal.SIGTERM, sighandler) -def run(cmd, env_paths=[context.superbuild_bin_path], env_vars={}): +def run(cmd, env_paths=[context.superbuild_bin_path], env_vars={}, packages_paths=context.python_packages_paths): """Run a system command""" global running_subprocesses @@ -63,6 +63,9 @@ def run(cmd, env_paths=[context.superbuild_bin_path], env_vars={}): if len(env_paths) > 0: env["PATH"] = env["PATH"] + ":" + ":".join(env_paths) + if len(packages_paths) > 0: + env["PYTHONPATH"] = env.get("PYTHONPATH", "") + ":" + ":".join(packages_paths) + for k in env_vars: env[k] = str(env_vars[k]) diff --git a/stages/dataset.py b/stages/dataset.py index 93b7f651..0d747d4c 100644 --- a/stages/dataset.py +++ b/stages/dataset.py @@ -12,7 +12,7 @@ from opendm import progress def save_images_database(photos, database_file): with open(database_file, 'w') as f: - f.write(json.dumps(map(lambda p: p.__dict__, photos))) + f.write(json.dumps([p.__dict__ for p in photos])) log.ODM_INFO("Wrote images database: %s" % database_file)