LRE improvements (semaphore --> atomic counter with retries)

Former-commit-id: ab9a94723c
pull/1161/head
Piero Toffanin 2019-05-21 10:33:48 -04:00
rodzic 055c7a7d1f
commit 0b98e6b397
2 zmienionych plików z 28 dodań i 32 usunięć

Wyświetl plik

@ -62,11 +62,12 @@ class LocalRemoteExecutor:
# Shared variables across threads
class nonloc:
error = None
semaphore = None
local_processing = False
max_remote_tasks = None
calculate_task_limit_lock = threading.Lock()
finished_tasks = AtomicCounter(0)
remote_running_tasks = AtomicCounter(0)
# Create queue
q = queue.Queue()
@ -85,7 +86,7 @@ class LocalRemoteExecutor:
if self.params['tasks']:
log.ODM_WARNING("LRE: Attempting to cleanup remote tasks")
else:
log.ODM_INFO("LRE: No remote tasks to cleanup")
log.ODM_INFO("LRE: No remote tasks left to cleanup")
for task in self.params['tasks']:
log.ODM_DEBUG("LRE: Removing remote task %s... %s" % (task.uuid, 'OK' if remove_task_safe(task) else 'NO'))
@ -106,12 +107,12 @@ class LocalRemoteExecutor:
if str(error) == "Child was terminated by signal 15":
system.exit_gracefully()
created_semaphore = False
if isinstance(error, NodeTaskLimitReachedException):
task_limit_reached = isinstance(error, NodeTaskLimitReachedException)
if task_limit_reached:
# Estimate the maximum number of tasks based on how many tasks
# are currently running
with calculate_task_limit_lock:
if not nonloc.semaphore:
if nonloc.max_remote_tasks is None:
node_task_limit = 0
for t in self.params['tasks']:
try:
@ -121,39 +122,36 @@ class LocalRemoteExecutor:
except exceptions.OdmError:
pass
sem_value = max(1, node_task_limit)
nonloc.semaphore = threading.Semaphore(sem_value)
log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s" % sem_value)
for i in range(sem_value):
nonloc.semaphore.acquire()
created_semaphore = True
nonloc.max_remote_tasks = max(1, node_task_limit)
log.ODM_DEBUG("LRE: Node task limit reached. Setting max remote tasks to %s" % node_task_limit)
# Retry, but only if the error is not related to a task failure
if task.retries < task.max_retries and not isinstance(error, exceptions.TaskFailedError):
# Put task back in queue
# Don't increment the retry counter if this task simply reached the task
# limit count.
if not isinstance(error, NodeTaskLimitReachedException):
if not task_limit_reached:
task.retries += 1
task.wait_until = datetime.datetime.now() + datetime.timedelta(seconds=task.retries * task.retry_timeout)
cleanup_remote()
q.task_done()
log.ODM_DEBUG("LRE: Re-queueing %s (retries: %s)" % (task, task.retries))
if not created_semaphore and nonloc.semaphore: nonloc.semaphore.release()
q.put(task)
if not local: remote_running_tasks.increment(-1)
return
else:
nonloc.error = error
finished_tasks.increment()
if not local: remote_running_tasks.increment(-1)
else:
if not partial:
log.ODM_INFO("LRE: %s finished successfully" % task)
finished_tasks.increment()
if not local: remote_running_tasks.increment(-1)
cleanup_remote()
if not local and not partial and nonloc.semaphore: nonloc.semaphore.release()
if not partial: q.task_done()
def local_worker():
@ -177,38 +175,32 @@ class LocalRemoteExecutor:
def remote_worker():
while True:
had_semaphore = bool(nonloc.semaphore)
# If we've found an estimate of the limit on the maximum number of tasks
# a node can process, we block until some tasks have completed
if nonloc.semaphore: nonloc.semaphore.acquire()
# Block until a new queue item is available
task = q.get()
if task is None or nonloc.error is not None:
q.task_done()
if nonloc.semaphore: nonloc.semaphore.release()
break
# Special case in which we've just created a semaphore
if not had_semaphore and nonloc.semaphore:
log.ODM_INFO("LRE: Just found semaphore, sending %s back to the queue" % task)
q.put(task)
q.task_done()
continue
# Yield to local processing
if not nonloc.local_processing:
log.ODM_DEBUG("LRE: Yielding to local processing, sending %s back to the queue" % task)
q.put(task)
q.task_done()
if nonloc.semaphore: nonloc.semaphore.release()
time.sleep(0.05)
continue
# If we've found an estimate of the limit on the maximum number of tasks
# a node can process, we block until some tasks have completed
if nonloc.max_remote_tasks is not None and remote_running_tasks.value >= nonloc.max_remote_tasks:
q.put(task)
q.task_done()
time.sleep(2)
continue
# Process remote
try:
remote_running_tasks.increment()
task.process(False, handle_result)
except Exception as e:
handle_result(task, False, e)
@ -234,7 +226,6 @@ class LocalRemoteExecutor:
system.exit_gracefully()
# stop workers
if nonloc.semaphore: nonloc.semaphore.release()
q.put(None)
if self.node_online:
q.put(None)

Wyświetl plik

@ -8,7 +8,7 @@ from pyodm.types import TaskStatus
class TestRemote(unittest.TestCase):
def setUp(self):
self.lre = LocalRemoteExecutor('http://invalid-host:3000')
self.lre = LocalRemoteExecutor('http://localhost:9001')
projects = []
for i in range(9):
@ -48,6 +48,11 @@ class TestRemote(unittest.TestCase):
def process_local(self):
# First task should be 0000 or 0001
if not nonloc.local_task_check: nonloc.local_task_check = self.project_path.endswith("0000") or self.project_path.endswith("0001")
if nonloc.should_fail:
if self.project_path.endswith("0006"):
raise exceptions.TaskFailedError("FAIL #6")
time.sleep(1)
def process_remote(self, done):