Skip to content

Commit

Permalink
Merge pull request #16 from sara-nl/SPD-410
Browse files Browse the repository at this point in the history
SPD-410: stop functionality
  • Loading branch information
natalieda authored Sep 17, 2024
2 parents 714da43 + 68f7898 commit f9577d8
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 62 deletions.
21 changes: 10 additions & 11 deletions examples/local-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,24 @@
Run main job (process_task.sh) with the input argument
When done, return the exit code to the token
Attach the logs to the token
'''

#python imports
import logging
import os
import time
import couchdb
import picasconfig

#picas imports
from picas.actors import RunActor
from picas.clients import CouchDB
from picas.executers import execute
from picas.iterators import TaskViewIterator
from picas.iterators import EndlessViewIterator
from picas.modifiers import BasicTokenModifier
from picas.executers import execute
from picas.util import Timer

log = logging.getLogger(__name__)

class ExampleActor(RunActor):
"""
The ExampleActor is the custom implementation of a RunActor that the user needs for the processing.
Expand All @@ -36,7 +35,7 @@ class ExampleActor(RunActor):
def __init__(self, db, modifier, view="todo", **viewargs):
super(ExampleActor, self).__init__(db, view=view, **viewargs)
self.timer = Timer()
self.iterator = EndlessViewIterator(self.iterator, stop_callback=self.time_elapsed) # overwrite default iterator from super().init()
self.iterator = EndlessViewIterator(self.iterator)
self.modifier = modifier
self.client = db

Expand All @@ -52,12 +51,12 @@ def process_task(self, token):
# /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out
command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out"

out = execute(command,shell=True)
out = execute(command, shell=True)
self.subprocess = out[0]

## Get the job exit code in the token
token['exit_code'] = out[0]
# Get the job exit code and done in the token
token['exit_code'] = out[1]
token = self.modifier.close(token)
#self.client.db[token['_id']] = token # necessary?

# Attach logs in token
curdate = time.strftime("%d/%m/%Y_%H:%M:%S_")
Expand Down Expand Up @@ -93,7 +92,7 @@ def main():
# Create actor
actor = ExampleActor(client, modifier)
# Start work!
actor.run()
actor.run(max_tasks=2, stop_function=actor.time_elapsed, elapsed=11)

if __name__ == '__main__':
main()
2 changes: 1 addition & 1 deletion examples/pushTokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def loadTokens(db):
'exit_code': ''
}
tokens.append(token)
i = i +1
i = i + 1
db.update(tokens)

def get_db():
Expand Down
210 changes: 168 additions & 42 deletions picas/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,91 +5,217 @@
@author: Jan Bot, Joris Borgdorff
"""

from couchdb.http import ResourceConflict
import logging
import signal
import subprocess

from .util import Timer
from .iterators import TaskViewIterator
from .picaslogger import picaslogger
from .iterators import TaskViewIterator, EndlessViewIterator

from couchdb.http import ResourceConflict

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

class RunActor:

"""Executor class to be overwritten in the client implementation.
class AbstractRunActor(object):
"""
Executor class to be overwritten in the client implementation.
"""

def __init__(self, db, iterator=None, view='todo', **view_params):
def __init__(self, db, iterator=None, view='todo', token_reset_values=[0, 0], **view_params):
"""
@param database: the database to get the tasks from.
@param token_reset_values: values to use in the token when PiCaS is terminated, defaults to values of 'todo' ([0,0])
"""
if db is None:
raise ValueError("Database must be initialized")
self.db = db
self.iterator = iterator
self.token_reset_values = token_reset_values

# current task is needed to reset it when PiCaS is killed
self.current_task = None
# the subprocess running the token code is necessary s.t. the handler can cleanly kill it
self.subprocess = None
self.tasks_processed = 0

self.iterator = iterator
if iterator is None:
self.iterator = TaskViewIterator(self.db, view, **view_params)
else:
self.iterator = iterator

def run(self, maxtime=None, avg_time_factor=0.0):
"""Run method of the actor, executes the application code by iterating
def _run(self, task):
"""
Execution of the work on the iterator used in the run method.
"""
self.prepare_run()
# current task is set s.t. the handler can reset the task that is being worked on
self.current_task = task

try:
self.process_task(task)
except Exception as ex:
msg = ("Exception {0} occurred during processing: {1}"
.format(type(ex), ex))
task.error(msg, exception=ex)
log.info(msg)

while True:
try:
self.db.save(task)
break
except ResourceConflict:
# simply overwrite changes - model results are more
# important
new_task = self.db.get(task.id)
task['_rev'] = new_task.rev

self.cleanup_run()
self.tasks_processed += 1

def run(self):
"""
Run method of the actor, executes the application code by iterating
over the available tasks in CouchDB.
"""
time = Timer()
# The error handler for when SLURM (or other scheduler / user) kills PiCaS, to reset the
# token back to 'todo' state (or other state defined through the token_reset_values)
self.setup_handler()

self.time = Timer()
self.prepare_env()
try:
for task in self.iterator:
self.prepare_run()

try:
self.process_task(task)
except Exception as ex:
msg = f"Exception {type(ex)} occurred during processing: {ex}"
task.error(msg, exception=ex)
picaslogger.info(msg)

while True:
try:
self.db.save(task)
break
except ResourceConflict:
# simply overwrite changes - model results are more
# important
new_task = self.db.get(task.id)
task['_rev'] = new_task.rev

self.cleanup_run()
self.tasks_processed += 1

if maxtime is not None:
will_elapse = ((avg_time_factor + self.tasks_processed) *
time.elapsed() / self.tasks_processed)
if will_elapse > maxtime:
break
self._run(task)
self.current_task = None # set to None so the handler leaves the token alone when picas is killed
finally:
self.cleanup_env()

def handler(self, signum, frame):
"""
Signal handler method. It sets the tokens values of 'lock' and 'done' fields to the values
passed to token_reset_values. This method ensures that when PiCaS is killed by the
scheduler or user, it automatically resets the token that was being worked on back to some
state (default: 'todo' state).
@param signum: signal to listen to and act upon
@param frame: stack frame, defaults to None, see https://docs.python.org/3/library/signal.html#signal.signal
"""
log.info(f'PiCaS shutting down, called with signal {signum}')

# gracefully kill the process running token code, it needs to stop before we update the token state
if self.subprocess and self.subprocess.poll() is None:
log.info('Terminating execution of token')
self.subprocess.terminate()
try:
self.subprocess.communicate(timeout=30) # wait 30 seconds for termination, value chosen to allow complex processes to stop
except subprocess.TimeoutExpired:
log.info('Killing subprocess')
self.subprocess.kill()
self.subprocess.communicate()

# update the token state, if reset vaue is None, do nothing.
if self.current_task and self.token_reset_values is not None:
self.current_task['lock'] = self.token_reset_values[0]
self.current_task['done'] = self.token_reset_values[1]
self.db.save(self.current_task)

self.cleanup_env()
exit(0)

def setup_handler(self):
"""
Method to set up the handler in the run method with lower redundancy
"""
log.info('Setting up signal handlers')
signal.signal(signal.SIGTERM, self.handler)
signal.signal(signal.SIGINT, self.handler)

def prepare_env(self, *args, **kwargs):
"""Method to be called to prepare the environment to run the
"""
Method to be called to prepare the environment to run the
application.
"""

def prepare_run(self, *args, **kwargs):
"""Code to run before a task gets processed. Used e.g. for fetching
"""
Code to run before a task gets processed. Used e.g. for fetching
inputs.
"""

def process_task(self, task):
"""The function to override, which processes the tasks themselves.
"""
The function to override, which processes the tasks themselves.
@param task: the task to process
"""
raise NotImplementedError

def cleanup_run(self, *args, **kwargs):
"""Code to run after a task has been processed.
"""
Code to run after a task has been processed.
"""

def cleanup_env(self, *args, **kwargs):
"""Method which gets called after the run method has completed.
"""
Method which gets called after the run method has completed.
"""


class RunActor(AbstractRunActor):
"""
RunActor class with added stopping functionality.
"""

def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=None, **stop_function_args):
"""
Run method of the actor, executes the application code by iterating
over the available tasks in CouchDB, including stop logic. The stop
logic is also extended into the EndlessViewIterator to break it when
the condition is met, otherwise it never stops.
@param max_time: maximum time to run picas before stopping
@param avg_time_factor: used for estimating when to stop with `max_time`,
value is average time per token to run
@param max_tasks: number of tasks that are performed before stopping
@param stop_function: custom function to stop the execution, must return bool
@param stop_function_args: kwargs to supply to stop_function
"""
self.time = Timer()
self.prepare_env()

# handler needs to be setup in overwritten method
self.setup_handler()

# Special case to break the while loop of the EndlessViewIterator:
# The while loop cant reach the stop condition in the for loop below,
# so pass the condition into the stop mechanism of the EVI, then the
# iterator is stopped from EVI and not the RunActorWithStop
if isinstance(self.iterator, EndlessViewIterator):
self.iterator.stop_callback = stop_function
self.iterator.stop_callback_args = stop_function_args

try:
for task in self.iterator:
self._run(task)

logging.debug("Tasks executed: ", self.tasks_processed)

if (stop_function is not None and
stop_function(**stop_function_args)):
break

# break if number of tasks processed is max set
if max_tasks and self.tasks_processed == max_tasks:
break

if max_time is not None:
# for a large number of tokens the avg time will be better (due to statistics)
# resulting in a better estimate of whether time.elapsed + avg_time (what will
# be added on the next iteration) is larger than the max_time.
will_elapse = (self.time.elapsed() + avg_time_factor)
if will_elapse > max_time:
break
self.current_task = None # set to None so the handler leaves the token alone when picas is killed
finally:
self.cleanup_env()
1 change: 1 addition & 0 deletions picas/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class Task(Document):
'hostname': '',
'scrub_count': 0,
'input': {},
'exit_code': '',
'output': {},
'uploads': {},
'error': [],
Expand Down
5 changes: 3 additions & 2 deletions picas/executers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ def execute(args, shell=False):
"""
with Popen(args, stdout=PIPE, stderr=PIPE, shell=shell) as proc:
(stdout, stderr) = proc.communicate()
return (proc.returncode, stdout, stderr)
return (proc, proc.returncode, stdout, stderr)



def execute_old(cmd):
"""Helper functino to execute an external application.
"""Helper function to execute an external application.
@param cmd: the command to be executed.
@return the exit code of the executed program.
"""
Expand Down
5 changes: 3 additions & 2 deletions picas/srm.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def remote_exists(self, loc):
surl = self.srm_host + loc
cmd = ['srmls', surl]
picaslogger.info(" ".join(cmd))
(returncode, stdout, _) = execute(cmd)
(proc, returncode, stdout, _) = execute(cmd)

if returncode == 0:
bn = path.basename(loc)
lines = stdout.split("\n")
Expand Down Expand Up @@ -150,7 +151,7 @@ def upload(self, local_file, srm_dir, check=False):
cmd = ['srmcp', '-2', '-server_mode=passive',
'file:///' + local_file, srm_url]
picaslogger.info(cmd)
(returncode, _, _) = execute(cmd)
(proc, returncode, _, _) = execute(cmd)
if returncode == 0:
pass
else:
Expand Down
Loading

0 comments on commit f9577d8

Please sign in to comment.