LRE hang fix, medfilt fallback, favor local processing

Former-commit-id: 217f9f7844
pull/1161/head
Piero Toffanin 2019-05-13 07:18:03 -04:00
rodzic ad1c63f2f8
commit 387aad2992
3 zmienionych plików z 40 dodań i 10 usunięć

Wyświetl plik

@ -275,8 +275,12 @@ def post_process(geotiff_path, output_path, smoothing_iterations=1):
# these edge cases, but it's slower.
for i in range(smoothing_iterations):
log.ODM_INFO("Smoothing iteration %s" % str(i + 1))
arr = signal.medfilt(arr, 5)
try:
arr = signal.medfilt(arr, 5)
except MemoryError:
log.ODM_WARNING("medfilt ran out of memory, switching to slower median_filter")
arr = ndimage.median_filter(arr, size=5)
# Fill corner points with nearest value
if arr.shape >= (4, 4):
arr[0][:2] = arr[1][0] = arr[1][1]

Wyświetl plik

@ -63,6 +63,7 @@ class LocalRemoteExecutor:
class nonloc:
error = None
semaphore = None
local_processing = False
calculate_task_limit_lock = threading.Lock()
finished_tasks = AtomicCounter(0)
@ -87,12 +88,12 @@ class LocalRemoteExecutor:
log.ODM_INFO("LRE: No remote tasks to cleanup")
for task in self.params['tasks']:
log.ODM_DEBUG("Removing remote task %s... %s" % (task.uuid, 'OK' if remove_task_safe(task) else 'NO'))
log.ODM_DEBUG("LRE: Removing remote task %s... %s" % (task.uuid, 'OK' if remove_task_safe(task) else 'NO'))
def handle_result(task, local, error = None, partial=False):
def cleanup_remote():
if not partial and task.remote_task:
log.ODM_DEBUG("Cleaning up remote task (%s)... %s" % (task.remote_task.uuid, 'OK' if remove_task_safe(task.remote_task) else 'NO'))
log.ODM_DEBUG("LRE: Cleaning up remote task (%s)... %s" % (task.remote_task.uuid, 'OK' if remove_task_safe(task.remote_task) else 'NO'))
self.params['tasks'].remove(task.remote_task)
task.remote_task = None
@ -113,7 +114,7 @@ class LocalRemoteExecutor:
for t in self.params['tasks']:
try:
info = t.info()
if info.status == TaskStatus.RUNNING:
if info.status == TaskStatus.RUNNING and info.processing_time >= 0:
node_task_limit += 1
except exceptions.OdmError:
pass
@ -150,7 +151,7 @@ class LocalRemoteExecutor:
if not local and not partial and nonloc.semaphore: nonloc.semaphore.release()
if not partial: q.task_done()
def local_worker():
while True:
# Block until a new queue item is available
@ -162,9 +163,12 @@ class LocalRemoteExecutor:
# Process local
try:
nonloc.local_processing = True
task.process(True, handle_result)
except Exception as e:
handle_result(task, True, e)
finally:
nonloc.local_processing = False
def remote_worker():
@ -182,14 +186,23 @@ class LocalRemoteExecutor:
q.task_done()
if nonloc.semaphore: nonloc.semaphore.release()
break
# Special case in which we've just created a semaphore
if not had_semaphore and nonloc.semaphore:
log.ODM_INFO("Just found semaphore, sending %s back to the queue" % task)
log.ODM_INFO("LRE: Just found semaphore, sending %s back to the queue" % task)
q.put(task)
q.task_done()
continue
# Yield to local processing
if not nonloc.local_processing:
log.ODM_DEBUG("LRE: Yielding to local processing, sending %s back to the queue" % task)
q.put(task)
q.task_done()
if nonloc.semaphore: nonloc.semaphore.release()
time.sleep(0.05)
continue
# Process remote
try:
task.process(False, handle_result)
@ -210,7 +223,7 @@ class LocalRemoteExecutor:
# block until all tasks are done (or CTRL+C)
try:
while finished_tasks.value < len(self.project_paths):
while finished_tasks.value < len(self.project_paths) and nonloc.error is None:
time.sleep(0.5)
except KeyboardInterrupt:
log.ODM_WARNING("LRE: CTRL+C")

Wyświetl plik

@ -2,7 +2,7 @@ import time
import unittest
import threading
from opendm.remote import LocalRemoteExecutor, Task, NodeTaskLimitReachedException
from pyodm import Node
from pyodm import Node, exceptions
from pyodm.types import TaskStatus
class TestRemote(unittest.TestCase):
@ -28,6 +28,7 @@ class TestRemote(unittest.TestCase):
class nonloc:
local_task_check = False
remote_queue = 1
should_fail = False
class OdmTaskMock:
def __init__(self, running, queue_num):
@ -38,6 +39,7 @@ class TestRemote(unittest.TestCase):
def info(self):
class StatusMock:
status = TaskStatus.RUNNING if self.running else TaskStatus.QUEUED
processing_time = 1
return StatusMock()
def remove(self):
@ -54,6 +56,11 @@ class TestRemote(unittest.TestCase):
self.remote_task = OdmTaskMock(nonloc.remote_queue <= MAX_QUEUE, nonloc.remote_queue)
self.params['tasks'].append(self.remote_task)
if nonloc.should_fail:
if self.project_path.endswith("0006"):
raise exceptions.TaskFailedError("FAIL #6")
nonloc.remote_queue += 1
# Upload successful
@ -80,3 +87,9 @@ class TestRemote(unittest.TestCase):
self.lre.run(TaskMock)
self.assertTrue(nonloc.local_task_check)
nonloc.should_fail = True
with self.assertRaises(exceptions.TaskFailedError):
self.lre.run(TaskMock)
if __name__ == '__main__':
unittest.main()