Skip to content

Commit

Permalink
Merge branch 'master' into SPD-417
Browse files Browse the repository at this point in the history
  • Loading branch information
lnauta committed Jan 2, 2025
2 parents 2a01cc4 + acddc3d commit 405ad7a
Show file tree
Hide file tree
Showing 15 changed files with 372 additions and 71 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12", "3.13"]



Expand Down
278 changes: 216 additions & 62 deletions README.md

Large diffs are not rendered by default.

Binary file modified docs/picas-layers.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/picas-layers.pptx
Binary file not shown.
Binary file added docs/picas-views.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
40 changes: 40 additions & 0 deletions examples/deleteTokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'''
@helpdesk: SURFsara helpdesk <helpdesk@surfsara.nl>
usage: python deleteTokens.py [viewname]
e.g. python deleteTokens.py Monitor/error
description:
Connect to PiCaS server
Delete all the Tokens in the [viewname] View
'''

import sys

import couchdb
import picasconfig


def deleteDocs(db, viewname="Monitor/error"):
v = db.view(viewname)
for x in v:
document = db[x['key']]
db.delete(document)


def get_db():
server = couchdb.Server(picasconfig.PICAS_HOST_URL)
username = picasconfig.PICAS_USERNAME
pwd = picasconfig.PICAS_PASSWORD
server.resource.credentials = (username, pwd)
db = server[picasconfig.PICAS_DATABASE]
return db


if __name__ == '__main__':
# Create a connection to the server
db = get_db()
# Delete the Docs in [viewname]
viewname = str(sys.argv[1])
deleteDocs(db, viewname)
7 changes: 3 additions & 4 deletions examples/local_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ExampleActor(RunActor):
def __init__(self, db, modifier, view="todo", **viewargs):
super(ExampleActor, self).__init__(db, view=view, **viewargs)
self.timer = Timer()
self.iterator = EndlessViewIterator(self.iterator)
# self.iterator = EndlessViewIterator(self.iterator)
self.modifier = modifier
self.client = db

Expand All @@ -48,8 +48,7 @@ def process_task(self, token):
print("-----------------------")

# Start running the main job, the logging is done internally and saved below
# /usr/bin/time -v ./process_task.sh [input] [tokenid]
command = ["/usr/bin/time", "-v", "./process_task.sh", token['input'], token['_id']]
command = ["/usr/bin/time", "./process_task.sh", token['input'], token['_id']]
out = execute(command)

logsout = f"logs_{token['_id']}.out"
Expand Down Expand Up @@ -88,7 +87,7 @@ def main():
# Create actor
actor = ExampleActor(client, modifier)
# Start work!
actor.run(max_token_time=10, max_total_time=100, max_tasks=10, max_scrub=2)
actor.run(max_token_time=1800, max_total_time=3600, max_tasks=10, max_scrub=2)

if __name__ == '__main__':
main()
3 changes: 3 additions & 0 deletions examples/process_task.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ if [[ "$?" != "0" ]]; then
exit 1
fi

#Copy output to the remote storage, e.g.
#globus-url-copy file:///${PWD}/${OUTPUT} gsiftp://gridftp.grid.sara.nl:2811/pnfs/grid.sara.nl/data/lsgrid/homer/${OUTPUT}

echo `date`

exit 0
6 changes: 3 additions & 3 deletions examples/quickExample.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo 'this is token 1'
echo 'this is token 2'
echo 'this is token 3'
echo 'this is token A'
echo 'this is token B'
echo 'this is token C'
68 changes: 68 additions & 0 deletions examples/resetTokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
'''
@helpdesk: SURFsara helpdesk <helpdesk@surfsara.nl>
usage: python resetTokens.py [viewname] [locktime]
e.g. python resetTokens.py Monitor/locked 72
description:
Connect to PiCaS server
Reset all Tokens in [viewname] View that have been locked more than [hours] hours,
defaults to 0 hours to reset all tokens
'''

import sys

import couchdb
from time import time
import picasconfig

def resetDocs(db, viewname="Monitor/locked", locktime=0):
v = db.view(viewname)
max_age = locktime * 3600
to_update = []
for x in v:
document = db[x['key']]
age = time() - document["lock"]
print(age)
if (age > max_age):
document['lock'] = 0
document['done'] = 0
document['scrub_count'] += 1
document['hostname'] = ''
document['exit_code'] = ''
if '_attachments' in document:
del document["_attachments"]
to_update.append(document)
db.update(to_update)
print("Number of reset tokens: " + str(len(to_update)))


def get_db():
server = couchdb.Server(picasconfig.PICAS_HOST_URL)
username = picasconfig.PICAS_USERNAME
pwd = picasconfig.PICAS_PASSWORD
server.resource.credentials = (username, pwd)
db = server[picasconfig.PICAS_DATABASE]
return db


if __name__ == '__main__':
# Create a connection to the server
db = get_db()

if len(sys.argv)==1:
sys.exit("Error: No viewname provided. To reset all locked tokens: `python resetTokens.py Monitor/locked`")
elif len(sys.argv)>1:
# reset the Docs in [viewname]
viewname = str(sys.argv[1])
if len(sys.argv)==2:
print("Warning: No locktime provided. Will reset all tokens in view ", viewname)
input("Press Enter to continue or Ctrl+C to cancel.")
# default: reset all locked tokens
locktime=0
else:
locktime=float(sys.argv[2])


resetDocs(db, viewname, locktime)
20 changes: 19 additions & 1 deletion picas/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
@author: Jan Bot, Joris Borgdorff
"""

import ssl
import logging
import signal
import subprocess
Expand Down Expand Up @@ -48,6 +49,10 @@ def __init__(self, db, iterator=None, view='todo', token_reset_values=[0, 0], **
else:
self.iterator = iterator

def reconnect(self):
self.db = self.db.copy()
self.iterator.reconnect(self.db)

def _run(self, task, timeout):
"""
Execution of the work on the iterator used in the run method.
Expand Down Expand Up @@ -77,8 +82,21 @@ def _run(self, task, timeout):
log.info(msg)
new_task = self.db.get(task.id)
task['_rev'] = new_task.rev
except ssl.SSLEOFError as ex:
# SSLEOFError can occur for long-lived connections, re-establish connection
msg = f"Warning: {type(ex)} occurred while saving task to database: " + \
"Trying ro reconnect to database"
log.info(msg)
self.reconnect()
try:
self.db.save(task)
except Exception as ex:
msg = f"Error: {type(ex)} occurred while saving task to database: " + \
"Not able to reconnect to database"
log.info(msg)
raise
except Exception as ex:
# re-raise Exception
# re-raise unknown exception, this will terminate the iterator
msg = f"Error: {type(ex)} occurred while saving task to database: {ex}"
log.info(msg)
raise
Expand Down
4 changes: 4 additions & 0 deletions picas/iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ def claim_task(self):
"""Get the first available task from a view."""
raise NotImplementedError("claim_task function not implemented.")

def reconnect(self, database):
"""Reconnect to database"""
self.database = database


def _claim_task(database, view, allowed_failures=10, **view_params):
for _ in range(allowed_failures):
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ readme = "README.md"
python = "^3.10"
couchdb = "^1.2"
stopit = "^1.1.2"
setuptools = "^66"

[tool.poetry.group.test.dependencies]
pytest = "^8.3.3"
Expand Down
11 changes: 11 additions & 0 deletions tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import subprocess
import time
import unittest
import ssl

from test_mock import MockDB, MockEmptyDB
from unittest.mock import patch
Expand Down Expand Up @@ -72,6 +73,16 @@ def test_run_resourceconflict(self, mock_save):
runner._run(task=Task({'_id': 'c', 'lock': None, 'done': None}), timeout=None)
self.assertEqual(runner.tasks_processed, 1)

@patch('test_mock.MockDB.save')
def test_run_ssleoferror(self, mock_save):
"""
Test the _run function, in case the DB throws a an SSLEOFError
"""
with pytest.raises(ssl.SSLEOFError):
mock_save.side_effect = ssl.SSLEOFError
runner = ExampleRun(self._callback)
runner._run(task=Task({'_id': 'c', 'lock': None, 'done': None}), timeout=None)

@patch('test_mock.MockDB.save')
def test_run_exception(self, mock_save):
"""
Expand Down
3 changes: 3 additions & 0 deletions tests/test_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def save(self, doc):

return doc

def copy(self):
return self


class MockEmptyDB(MockDB):
TASKS = []
Expand Down

0 comments on commit 405ad7a

Please sign in to comment.