Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPD-409 Scrubbing added #17

Merged
merged 15 commits into from
Oct 11, 2024
2 changes: 1 addition & 1 deletion examples/local-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def main():
# Create actor
actor = ExampleActor(client, modifier)
# Start work!
actor.run(max_tasks=2, stop_function=actor.time_elapsed, elapsed=11)
actor.run()

if __name__ == '__main__':
main()
81 changes: 81 additions & 0 deletions examples/scrub-example.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the scrub_count functionality in a example files (scrub-example.py and scrub-timer-example.py) creates duplicate code (i.e. more difficult to maintain) and makes it more difficult for the users as they would need to adapt more code themselves.

I think it's better to have the scrub_count functionality in actors.py together with the stop functionality (RunActor.run). Also the scrub_count needs to be increased in case of the handler function is called (i.e. when Picas is killed).

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging
import os
import time
import couchdb
import picasconfig

from picas.actors import RunActorWithStop
from picas.clients import CouchDB
from picas.iterators import EndlessViewIterator
from picas.modifiers import BasicTokenModifier
from picas.executers import execute
from picas.util import Timer

log = logging.getLogger("Scrub example")

class ExampleActor(RunActorWithStop):
"""
The ExampleActor is the custom implementation of a RunActor that the user needs for the processing.
Example for scrubbing tokens and rerunning them. Scrubbing is done when a token fails to finish
properly and the user wants to rerun it.
"""
def __init__(self, db, modifier, view="todo", scrub_count=2, **viewargs):
super(ExampleActor, self).__init__(db, view=view, **viewargs)
self.timer = Timer()
self.iterator = EndlessViewIterator(self.iterator)
self.modifier = modifier
self.client = db
# scrub limit is the amount of retries
self.scrub_limit = scrub_count

def process_task(self, token):
# Print token information
print("-----------------------")
print("Working on token: " +token['_id'])
for key, value in token.doc.items():
print(key, value)
print("-----------------------")

# Start running the main job
# /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out
command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out"

out = execute(command, shell=True)
self.subprocess = out[0]

# Get the job exit code and done in the token
token['exit_code'] = out[1]
token = self.modifier.close(token)

# Scrub the token N times if it failed, scrubbing puts it back in 'todo' state
if (token['scrub_count'] < self.scrub_limit) and (out[1] != 0):
log.info(f"Scrubbing token {token['_id']}")
token = self.modifier.unclose(token)
token.scrub()

# Attach logs in token
curdate = time.strftime("%d/%m/%Y_%H:%M:%S_")
try:
logsout = "logs_" + str(token['_id']) + ".out"
log_handle = open(logsout, 'rb')
token.put_attachment(logsout, log_handle.read())

logserr = "logs_" + str(token['_id']) + ".err"
log_handle = open(logserr, 'rb')
token.put_attachment(logserr, log_handle.read())
except:
pass

def main():
# setup connection to db
client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD)
print("Connected to the database %s sucessfully. Now starting work..." %(picasconfig.PICAS_DATABASE))
# Create token modifier
modifier = BasicTokenModifier()
# Create actor
actor = ExampleActor(client, modifier, scrub_count=2)
# Start work!
actor.run()

if __name__ == '__main__':
main()
81 changes: 81 additions & 0 deletions examples/scrub-timer-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging
import os
import time
import couchdb
import picasconfig

from picas.actors import RunActorWithStop
from picas.clients import CouchDB
from picas.iterators import EndlessViewIterator
from picas.modifiers import BasicTokenModifier
from picas.executers import execute
from picas.util import Timer

log = logging.getLogger("Scrub with timer example")

class ExampleActor(RunActorWithStop):
"""
The ExampleActor is the custom implementation of a RunActor that the user needs for the processing.
Example for scrubbing tokens and rerunning them. Scrubbing is done when a token fails to finish
within a given time limit and the user wants to rerun it.
"""
def __init__(self, db, modifier, view="todo", time_limit=1, scrub_count=0, **viewargs):
super(ExampleActor, self).__init__(db, view=view, **viewargs)
self.timer = Timer()
self.iterator = EndlessViewIterator(self.iterator)
self.modifier = modifier
self.client = db
self.time_limit = time_limit
self.scrub_limit = scrub_count

def process_task(self, token):
# Print token information
print("-----------------------")
print("Working on token: " +token['_id'])
for key, value in token.doc.items():
print(key, value)
print("-----------------------")

# Start running the main job
# /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out
command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out"

out = execute(command, shell=True)
self.subprocess = out[0]

# Get the job exit code and done in the token
token['exit_code'] = out[1]
token = self.modifier.close(token)

# Scrub the token N times if it went over time, scrubbing puts it back in 'todo' state
if (self.time_limit < self.timer.elapsed()) and (token['scrub_count'] < self.scrub_limit):
log.info(f"Scrubbing token {token['_id']}")
token = self.modifier.unclose(token)
token.scrub()

# Attach logs in token
curdate = time.strftime("%d/%m/%Y_%H:%M:%S_")
try:
logsout = "logs_" + str(token['_id']) + ".out"
log_handle = open(logsout, 'rb')
token.put_attachment(logsout, log_handle.read())

logserr = "logs_" + str(token['_id']) + ".err"
log_handle = open(logserr, 'rb')
token.put_attachment(logserr, log_handle.read())
except:
pass

def main():
# setup connection to db
client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD)
print("Connected to the database %s sucessfully. Now starting work..." %(picasconfig.PICAS_DATABASE))
# Create token modifier
modifier = BasicTokenModifier()
# Create actor
actor = ExampleActor(client, modifier, time_limit=3, scrub_count=2)
# Start work!
actor.run()

if __name__ == '__main__':
main()
3 changes: 3 additions & 0 deletions examples/scrubExample.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
sleep 10
exit(1)
sleep 10
3 changes: 3 additions & 0 deletions examples/scrubTimerExample.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
sleep 5
sleep 5
sleep 5