diff --git a/examples/local-example.py b/examples/local-example.py index 6099be4..10a5a77 100755 --- a/examples/local-example.py +++ b/examples/local-example.py @@ -9,25 +9,24 @@ Run main job (process_task.sh) with the input argument When done, return the exit code to the token Attach the logs to the token - - ''' -#python imports +import logging import os import time import couchdb import picasconfig -#picas imports from picas.actors import RunActor from picas.clients import CouchDB +from picas.executers import execute from picas.iterators import TaskViewIterator from picas.iterators import EndlessViewIterator from picas.modifiers import BasicTokenModifier -from picas.executers import execute from picas.util import Timer +log = logging.getLogger(__name__) + class ExampleActor(RunActor): """ The ExampleActor is the custom implementation of a RunActor that the user needs for the processing. @@ -36,7 +35,7 @@ class ExampleActor(RunActor): def __init__(self, db, modifier, view="todo", **viewargs): super(ExampleActor, self).__init__(db, view=view, **viewargs) self.timer = Timer() - self.iterator = EndlessViewIterator(self.iterator, stop_callback=self.time_elapsed) # overwrite default iterator from super().init() + self.iterator = EndlessViewIterator(self.iterator) self.modifier = modifier self.client = db @@ -52,12 +51,12 @@ def process_task(self, token): # /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out" - out = execute(command,shell=True) + out = execute(command, shell=True) + self.subprocess = out[0] - ## Get the job exit code in the token - token['exit_code'] = out[0] + # Get the job exit code and done in the token + token['exit_code'] = out[1] token = self.modifier.close(token) - #self.client.db[token['_id']] = token # necessary? # Attach logs in token curdate = time.strftime("%d/%m/%Y_%H:%M:%S_") @@ -93,7 +92,7 @@ def main(): # Create actor actor = ExampleActor(client, modifier) # Start work! - actor.run() + actor.run(max_tasks=2, stop_function=actor.time_elapsed, elapsed=11) if __name__ == '__main__': main() diff --git a/examples/pushTokens.py b/examples/pushTokens.py index c531f77..55bb956 100644 --- a/examples/pushTokens.py +++ b/examples/pushTokens.py @@ -42,7 +42,7 @@ def loadTokens(db): 'exit_code': '' } tokens.append(token) - i = i +1 + i = i + 1 db.update(tokens) def get_db(): diff --git a/picas/actors.py b/picas/actors.py index f3944b3..c7b3b18 100644 --- a/picas/actors.py +++ b/picas/actors.py @@ -5,91 +5,217 @@ @author: Jan Bot, Joris Borgdorff """ -from couchdb.http import ResourceConflict +import logging +import signal +import subprocess from .util import Timer -from .iterators import TaskViewIterator -from .picaslogger import picaslogger +from .iterators import TaskViewIterator, EndlessViewIterator + +from couchdb.http import ResourceConflict +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) -class RunActor: - """Executor class to be overwritten in the client implementation. +class AbstractRunActor(object): + """ + Executor class to be overwritten in the client implementation. """ - def __init__(self, db, iterator=None, view='todo', **view_params): + def __init__(self, db, iterator=None, view='todo', token_reset_values=[0, 0], **view_params): """ @param database: the database to get the tasks from. + @param token_reset_values: values to use in the token when PiCaS is terminated, defaults to values of 'todo' ([0,0]) """ if db is None: raise ValueError("Database must be initialized") self.db = db + self.iterator = iterator + self.token_reset_values = token_reset_values + + # current task is needed to reset it when PiCaS is killed + self.current_task = None + # the subprocess running the token code is necessary s.t. the handler can cleanly kill it + self.subprocess = None self.tasks_processed = 0 - self.iterator = iterator if iterator is None: self.iterator = TaskViewIterator(self.db, view, **view_params) else: self.iterator = iterator - def run(self, maxtime=None, avg_time_factor=0.0): - """Run method of the actor, executes the application code by iterating + def _run(self, task): + """ + Execution of the work on the iterator used in the run method. + """ + self.prepare_run() + # current task is set s.t. the handler can reset the task that is being worked on + self.current_task = task + + try: + self.process_task(task) + except Exception as ex: + msg = ("Exception {0} occurred during processing: {1}" + .format(type(ex), ex)) + task.error(msg, exception=ex) + log.info(msg) + + while True: + try: + self.db.save(task) + break + except ResourceConflict: + # simply overwrite changes - model results are more + # important + new_task = self.db.get(task.id) + task['_rev'] = new_task.rev + + self.cleanup_run() + self.tasks_processed += 1 + + def run(self): + """ + Run method of the actor, executes the application code by iterating over the available tasks in CouchDB. """ - time = Timer() + # The error handler for when SLURM (or other scheduler / user) kills PiCaS, to reset the + # token back to 'todo' state (or other state defined through the token_reset_values) + self.setup_handler() + + self.time = Timer() self.prepare_env() try: for task in self.iterator: - self.prepare_run() - - try: - self.process_task(task) - except Exception as ex: - msg = f"Exception {type(ex)} occurred during processing: {ex}" - task.error(msg, exception=ex) - picaslogger.info(msg) - - while True: - try: - self.db.save(task) - break - except ResourceConflict: - # simply overwrite changes - model results are more - # important - new_task = self.db.get(task.id) - task['_rev'] = new_task.rev - - self.cleanup_run() - self.tasks_processed += 1 - - if maxtime is not None: - will_elapse = ((avg_time_factor + self.tasks_processed) * - time.elapsed() / self.tasks_processed) - if will_elapse > maxtime: - break + self._run(task) + self.current_task = None # set to None so the handler leaves the token alone when picas is killed finally: self.cleanup_env() + def handler(self, signum, frame): + """ + Signal handler method. It sets the tokens values of 'lock' and 'done' fields to the values + passed to token_reset_values. This method ensures that when PiCaS is killed by the + scheduler or user, it automatically resets the token that was being worked on back to some + state (default: 'todo' state). + + @param signum: signal to listen to and act upon + @param frame: stack frame, defaults to None, see https://docs.python.org/3/library/signal.html#signal.signal + """ + log.info(f'PiCaS shutting down, called with signal {signum}') + + # gracefully kill the process running token code, it needs to stop before we update the token state + if self.subprocess and self.subprocess.poll() is None: + log.info('Terminating execution of token') + self.subprocess.terminate() + try: + self.subprocess.communicate(timeout=30) # wait 30 seconds for termination, value chosen to allow complex processes to stop + except subprocess.TimeoutExpired: + log.info('Killing subprocess') + self.subprocess.kill() + self.subprocess.communicate() + + # update the token state, if reset vaue is None, do nothing. + if self.current_task and self.token_reset_values is not None: + self.current_task['lock'] = self.token_reset_values[0] + self.current_task['done'] = self.token_reset_values[1] + self.db.save(self.current_task) + + self.cleanup_env() + exit(0) + + def setup_handler(self): + """ + Method to set up the handler in the run method with lower redundancy + """ + log.info('Setting up signal handlers') + signal.signal(signal.SIGTERM, self.handler) + signal.signal(signal.SIGINT, self.handler) + def prepare_env(self, *args, **kwargs): - """Method to be called to prepare the environment to run the + """ + Method to be called to prepare the environment to run the application. """ def prepare_run(self, *args, **kwargs): - """Code to run before a task gets processed. Used e.g. for fetching + """ + Code to run before a task gets processed. Used e.g. for fetching inputs. """ def process_task(self, task): - """The function to override, which processes the tasks themselves. + """ + The function to override, which processes the tasks themselves. @param task: the task to process """ raise NotImplementedError def cleanup_run(self, *args, **kwargs): - """Code to run after a task has been processed. + """ + Code to run after a task has been processed. """ def cleanup_env(self, *args, **kwargs): - """Method which gets called after the run method has completed. """ + Method which gets called after the run method has completed. + """ + + +class RunActor(AbstractRunActor): + """ + RunActor class with added stopping functionality. + """ + + def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=None, **stop_function_args): + """ + Run method of the actor, executes the application code by iterating + over the available tasks in CouchDB, including stop logic. The stop + logic is also extended into the EndlessViewIterator to break it when + the condition is met, otherwise it never stops. + + @param max_time: maximum time to run picas before stopping + @param avg_time_factor: used for estimating when to stop with `max_time`, + value is average time per token to run + @param max_tasks: number of tasks that are performed before stopping + @param stop_function: custom function to stop the execution, must return bool + @param stop_function_args: kwargs to supply to stop_function + """ + self.time = Timer() + self.prepare_env() + + # handler needs to be setup in overwritten method + self.setup_handler() + + # Special case to break the while loop of the EndlessViewIterator: + # The while loop cant reach the stop condition in the for loop below, + # so pass the condition into the stop mechanism of the EVI, then the + # iterator is stopped from EVI and not the RunActorWithStop + if isinstance(self.iterator, EndlessViewIterator): + self.iterator.stop_callback = stop_function + self.iterator.stop_callback_args = stop_function_args + + try: + for task in self.iterator: + self._run(task) + + logging.debug("Tasks executed: ", self.tasks_processed) + + if (stop_function is not None and + stop_function(**stop_function_args)): + break + + # break if number of tasks processed is max set + if max_tasks and self.tasks_processed == max_tasks: + break + + if max_time is not None: + # for a large number of tokens the avg time will be better (due to statistics) + # resulting in a better estimate of whether time.elapsed + avg_time (what will + # be added on the next iteration) is larger than the max_time. + will_elapse = (self.time.elapsed() + avg_time_factor) + if will_elapse > max_time: + break + self.current_task = None # set to None so the handler leaves the token alone when picas is killed + finally: + self.cleanup_env() diff --git a/picas/documents.py b/picas/documents.py index 5961a90..b642239 100644 --- a/picas/documents.py +++ b/picas/documents.py @@ -186,6 +186,7 @@ class Task(Document): 'hostname': '', 'scrub_count': 0, 'input': {}, + 'exit_code': '', 'output': {}, 'uploads': {}, 'error': [], diff --git a/picas/executers.py b/picas/executers.py index 310c26e..70d7a9f 100644 --- a/picas/executers.py +++ b/picas/executers.py @@ -18,11 +18,12 @@ def execute(args, shell=False): """ with Popen(args, stdout=PIPE, stderr=PIPE, shell=shell) as proc: (stdout, stderr) = proc.communicate() - return (proc.returncode, stdout, stderr) + return (proc, proc.returncode, stdout, stderr) + def execute_old(cmd): - """Helper functino to execute an external application. + """Helper function to execute an external application. @param cmd: the command to be executed. @return the exit code of the executed program. """ diff --git a/picas/srm.py b/picas/srm.py index b618898..a775cb0 100644 --- a/picas/srm.py +++ b/picas/srm.py @@ -120,7 +120,8 @@ def remote_exists(self, loc): surl = self.srm_host + loc cmd = ['srmls', surl] picaslogger.info(" ".join(cmd)) - (returncode, stdout, _) = execute(cmd) + (proc, returncode, stdout, _) = execute(cmd) + if returncode == 0: bn = path.basename(loc) lines = stdout.split("\n") @@ -150,7 +151,7 @@ def upload(self, local_file, srm_dir, check=False): cmd = ['srmcp', '-2', '-server_mode=passive', 'file:///' + local_file, srm_url] picaslogger.info(cmd) - (returncode, _, _) = execute(cmd) + (proc, returncode, _, _) = execute(cmd) if returncode == 0: pass else: diff --git a/tests/test_actors.py b/tests/test_actors.py index 8cca2bf..890fd52 100644 --- a/tests/test_actors.py +++ b/tests/test_actors.py @@ -1,17 +1,122 @@ +import pytest +import signal +import subprocess +import time import unittest -from test_mock import MockDB, MockRun +from test_mock import MockDB, MockRun, MockRunWithStop +from unittest.mock import patch + +from picas import actors +from picas.documents import Task class TestRun(unittest.TestCase): def _callback(self, task): + """ + _callback is used to process tasks: for mock tasks it just increases the counter. + """ self.assertTrue(task.id in [t['_id'] for t in MockDB.TASKS]) self.assertTrue(task['lock'] > 0) self.count += 1 + task['exit_code'] = 0 def test_run(self): + """ + Test the run function, which iterates over the iterator class, where __next__ calls + the claim_task method which calls _claim_task that locks the token. + The callback function is applied through MockRun.process_task. + This locking is the test here. + """ self.count = 0 runner = MockRun(self._callback) runner.run() self.assertEqual(self.count, len(MockDB.TASKS)) + + def test_stop_max_tasks(self): + """ + Test to stop after 1 task is performed. + """ + self.count = 0 + self.test_number = 1 + runner = MockRunWithStop(self._callback) + runner.run(max_tasks=self.test_number) + self.assertEqual(self.count, self.test_number) + + def stop_fn(self, run_obj, id): + return run_obj.current_task['_id'] == id + + def test_stop_fn(self): + """ + Test to stop when a function is true. + """ + self.count = 0 + self.stop_fn_arg = "a" + runner = MockRunWithStop(self._callback) + runner.run(stop_function=self.stop_fn, run_obj=runner, id=self.stop_fn_arg) + self.assertEqual(runner.current_task['_id'], self.stop_fn_arg) + + def _callback_timer(self, task): + """ + Callback function for processing tokens that sleeps, s.t. the max time of + processing can expire and stop the processing. + """ + self.assertTrue(task.id in [t['_id'] for t in MockDB.TASKS]) + self.assertTrue(task['lock'] > 0) + self.count += 1 + time.sleep(0.5) # force one token to "take" 0.5 s + task['exit_code'] = 0 + + def test_max_time(self): + """ + Test to stop running when the max time is about to be reached. + """ + self.count = 0 + self.max_time = 0.5 # one token takes 0.5, so it quits after 1 token + self.avg_time_fac = 0.5 + self.test_number = 1 + runner = MockRunWithStop(self._callback_timer) + runner.run(max_time=self.max_time, avg_time_factor=self.avg_time_fac) + self.assertEqual(self.count, self.test_number) + + @patch('picas.actors.log') + @patch('signal.signal') + def test_setup_handler(self, sig, log): + """ + Test the setting up of the handler. + """ + actor = actors.RunActor(MockDB()) + actor.setup_handler() + + sig.assert_any_call(signal.SIGTERM, actor.handler) + sig.assert_any_call(signal.SIGINT, actor.handler) + log.info.assert_any_call('Setting up signal handlers') + + +class TestHandler(unittest.TestCase): + + def setUp(self): + self.lock_code = 2 + self.done_code = 2 + self.actor = actors.RunActor(MockDB(), token_reset_values=[self.lock_code, self.done_code]) + self.actor.subprocess = subprocess.Popen(['sleep', '10']) # ensure the actor is busy + self.actor.current_task = Task({'_id': 'c', 'lock': None, 'done': None}) + + def test_signal_handling(self): + """ + Test the handler response: proper exiting and updating of task codes. + """ + self.actor.setup_handler() + + # to handle the exit code from the handler, pytest can catch it + with pytest.raises(SystemExit) as handler_exit_code: + self.actor.handler(signal.SIGTERM, None) + self.assertEqual(handler_exit_code.value.code, 0) + + with pytest.raises(SystemExit) as handler_exit_code: + self.actor.handler(signal.SIGINT, None) + self.assertEqual(handler_exit_code.value.code, 0) + + self.assertEqual(self.actor.current_task['lock'], self.lock_code) + self.assertEqual(self.actor.current_task['done'], self.done_code) diff --git a/tests/test_executers.py b/tests/test_executers.py index 127b0b1..7d57dd2 100644 --- a/tests/test_executers.py +++ b/tests/test_executers.py @@ -6,7 +6,7 @@ class TestExecutors(unittest.TestCase): def test_run_command(self): - returncode, stdout, stderr = execute(["echo", "'hello world'"]) + proc, returncode, stdout, stderr = execute(["echo", "'hello world'"]) self.assertEqual(returncode, 0) self.assertEqual(stdout, b"'hello world'\n") diff --git a/tests/test_mock.py b/tests/test_mock.py index 20eb9f8..aa3d34a 100644 --- a/tests/test_mock.py +++ b/tests/test_mock.py @@ -1,5 +1,5 @@ import random -from picas.actors import RunActor +from picas.actors import AbstractRunActor, RunActor from picas.documents import Document @@ -40,7 +40,7 @@ def save(self, doc): return doc -class MockRun(RunActor): +class MockRun(AbstractRunActor): def __init__(self, callback): db = MockDB() @@ -50,3 +50,15 @@ def __init__(self, callback): def process_task(self, task): self.callback(task) + + +class MockRunWithStop(RunActor): + + def __init__(self, callback): + db = MockDB() + super(MockRunWithStop, self).__init__(db) + + self.callback = callback + + def process_task(self, task): + self.callback(task)