-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPD-409 Scrubbing added #17
Merged
Changes from 6 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
e80adfe
Add scrubbing example
lnauta f9caf9e
Clean scrub example
lnauta 73e9fdb
Add scrub after time expires example
lnauta 6813625
Add notes to scrub examples
lnauta ff36c5f
Clean local example
lnauta 274fa1e
Merge remote-tracking branch 'origin/master' into SPD-409
lnauta 9961c7b
Add stopit fnality to stop single token
lnauta 73d46c7
Add scrubbing to the handler
lnauta 90e5291
Add stopit to CICD
lnauta d9b37ea
Add decorator to tests b/c inheritance
lnauta 01bbc4a
Add setuptools for p3.12
lnauta 941efba
Improve setup requirements
lnauta 4d4de45
SPD-409 fixes (#23)
hailihu c0178a1
Merge branch 'master' into SPD-409
lnauta ea948ea
Change errorcode for Exception from -1 to 99 to fix visibility in Pic…
hailihu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import logging | ||
import os | ||
import time | ||
import couchdb | ||
import picasconfig | ||
|
||
from picas.actors import RunActorWithStop | ||
from picas.clients import CouchDB | ||
from picas.iterators import EndlessViewIterator | ||
from picas.modifiers import BasicTokenModifier | ||
from picas.executers import execute | ||
from picas.util import Timer | ||
|
||
log = logging.getLogger("Scrub example") | ||
|
||
class ExampleActor(RunActorWithStop): | ||
""" | ||
The ExampleActor is the custom implementation of a RunActor that the user needs for the processing. | ||
Example for scrubbing tokens and rerunning them. Scrubbing is done when a token fails to finish | ||
properly and the user wants to rerun it. | ||
""" | ||
def __init__(self, db, modifier, view="todo", scrub_count=2, **viewargs): | ||
super(ExampleActor, self).__init__(db, view=view, **viewargs) | ||
self.timer = Timer() | ||
self.iterator = EndlessViewIterator(self.iterator) | ||
self.modifier = modifier | ||
self.client = db | ||
# scrub limit is the amount of retries | ||
self.scrub_limit = scrub_count | ||
|
||
def process_task(self, token): | ||
# Print token information | ||
print("-----------------------") | ||
print("Working on token: " +token['_id']) | ||
for key, value in token.doc.items(): | ||
print(key, value) | ||
print("-----------------------") | ||
|
||
# Start running the main job | ||
# /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out | ||
command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out" | ||
|
||
out = execute(command, shell=True) | ||
self.subprocess = out[0] | ||
|
||
# Get the job exit code and done in the token | ||
token['exit_code'] = out[1] | ||
token = self.modifier.close(token) | ||
|
||
# Scrub the token N times if it failed, scrubbing puts it back in 'todo' state | ||
if (token['scrub_count'] < self.scrub_limit) and (out[1] != 0): | ||
log.info(f"Scrubbing token {token['_id']}") | ||
token = self.modifier.unclose(token) | ||
token.scrub() | ||
|
||
# Attach logs in token | ||
curdate = time.strftime("%d/%m/%Y_%H:%M:%S_") | ||
try: | ||
logsout = "logs_" + str(token['_id']) + ".out" | ||
log_handle = open(logsout, 'rb') | ||
token.put_attachment(logsout, log_handle.read()) | ||
|
||
logserr = "logs_" + str(token['_id']) + ".err" | ||
log_handle = open(logserr, 'rb') | ||
token.put_attachment(logserr, log_handle.read()) | ||
except: | ||
pass | ||
|
||
def main(): | ||
# setup connection to db | ||
client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD) | ||
print("Connected to the database %s sucessfully. Now starting work..." %(picasconfig.PICAS_DATABASE)) | ||
# Create token modifier | ||
modifier = BasicTokenModifier() | ||
# Create actor | ||
actor = ExampleActor(client, modifier, scrub_count=2) | ||
# Start work! | ||
actor.run() | ||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import logging | ||
import os | ||
import time | ||
import couchdb | ||
import picasconfig | ||
|
||
from picas.actors import RunActorWithStop | ||
from picas.clients import CouchDB | ||
from picas.iterators import EndlessViewIterator | ||
from picas.modifiers import BasicTokenModifier | ||
from picas.executers import execute | ||
from picas.util import Timer | ||
|
||
log = logging.getLogger("Scrub with timer example") | ||
|
||
class ExampleActor(RunActorWithStop): | ||
""" | ||
The ExampleActor is the custom implementation of a RunActor that the user needs for the processing. | ||
Example for scrubbing tokens and rerunning them. Scrubbing is done when a token fails to finish | ||
within a given time limit and the user wants to rerun it. | ||
""" | ||
def __init__(self, db, modifier, view="todo", time_limit=1, scrub_count=0, **viewargs): | ||
super(ExampleActor, self).__init__(db, view=view, **viewargs) | ||
self.timer = Timer() | ||
self.iterator = EndlessViewIterator(self.iterator) | ||
self.modifier = modifier | ||
self.client = db | ||
self.time_limit = time_limit | ||
self.scrub_limit = scrub_count | ||
|
||
def process_task(self, token): | ||
# Print token information | ||
print("-----------------------") | ||
print("Working on token: " +token['_id']) | ||
for key, value in token.doc.items(): | ||
print(key, value) | ||
print("-----------------------") | ||
|
||
# Start running the main job | ||
# /usr/bin/time -v ./process_task.sh [input] [tokenid] 2> logs_[token_id].err 1> logs_[token_id].out | ||
command = "/usr/bin/time -v ./process_task.sh " + "\"" +token['input'] + "\" " + token['_id'] + " 2> logs_" + str(token['_id']) + ".err 1> logs_" + str(token['_id']) + ".out" | ||
|
||
out = execute(command, shell=True) | ||
self.subprocess = out[0] | ||
|
||
# Get the job exit code and done in the token | ||
token['exit_code'] = out[1] | ||
token = self.modifier.close(token) | ||
|
||
# Scrub the token N times if it went over time, scrubbing puts it back in 'todo' state | ||
if (self.time_limit < self.timer.elapsed()) and (token['scrub_count'] < self.scrub_limit): | ||
log.info(f"Scrubbing token {token['_id']}") | ||
token = self.modifier.unclose(token) | ||
token.scrub() | ||
|
||
# Attach logs in token | ||
curdate = time.strftime("%d/%m/%Y_%H:%M:%S_") | ||
try: | ||
logsout = "logs_" + str(token['_id']) + ".out" | ||
log_handle = open(logsout, 'rb') | ||
token.put_attachment(logsout, log_handle.read()) | ||
|
||
logserr = "logs_" + str(token['_id']) + ".err" | ||
log_handle = open(logserr, 'rb') | ||
token.put_attachment(logserr, log_handle.read()) | ||
except: | ||
pass | ||
|
||
def main(): | ||
# setup connection to db | ||
client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD) | ||
print("Connected to the database %s sucessfully. Now starting work..." %(picasconfig.PICAS_DATABASE)) | ||
# Create token modifier | ||
modifier = BasicTokenModifier() | ||
# Create actor | ||
actor = ExampleActor(client, modifier, time_limit=3, scrub_count=2) | ||
# Start work! | ||
actor.run() | ||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
sleep 10 | ||
exit(1) | ||
sleep 10 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
sleep 5 | ||
sleep 5 | ||
sleep 5 |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having the scrub_count functionality in a example files (scrub-example.py and scrub-timer-example.py) creates duplicate code (i.e. more difficult to maintain) and makes it more difficult for the users as they would need to adapt more code themselves.
I think it's better to have the scrub_count functionality in actors.py together with the stop functionality (RunActor.run). Also the scrub_count needs to be increased in case of the handler function is called (i.e. when Picas is killed).