Skip to content

Commit

Permalink
Add stopit fnality to stop single token
Browse files Browse the repository at this point in the history
  • Loading branch information
lnauta committed Sep 26, 2024
1 parent 274fa1e commit 9961c7b
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions picas/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .iterators import TaskViewIterator, EndlessViewIterator

from couchdb.http import ResourceConflict
from stopit import threading_timeoutable as timeoutable

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -45,7 +46,7 @@ def __init__(self, db, iterator=None, view='todo', token_reset_values=[0, 0], **
else:
self.iterator = iterator

def _run(self, task):
def _run(self, task, timeout):
"""
Execution of the work on the iterator used in the run method.
"""
Expand All @@ -54,7 +55,7 @@ def _run(self, task):
self.current_task = task

try:
self.process_task(task)
self.process_task(task, timeout=timeout)
except Exception as ex:
msg = ("Exception {0} occurred during processing: {1}"
.format(type(ex), ex))
Expand All @@ -74,7 +75,7 @@ def _run(self, task):
self.cleanup_run()
self.tasks_processed += 1

def run(self):
def run(self, max_token_time=None):
"""
Run method of the actor, executes the application code by iterating
over the available tasks in CouchDB.
Expand All @@ -87,7 +88,7 @@ def run(self):
self.prepare_env()
try:
for task in self.iterator:
self._run(task)
self._run(task, timeout=max_token_time)
self.current_task = None # set to None so the handler leaves the token alone when picas is killed
finally:
self.cleanup_env()
Expand Down Expand Up @@ -144,6 +145,7 @@ def prepare_run(self, *args, **kwargs):
inputs.
"""

@timeoutable()
def process_task(self, task):
"""
The function to override, which processes the tasks themselves.
Expand All @@ -167,13 +169,14 @@ class RunActor(AbstractRunActor):
RunActor class with added stopping functionality.
"""

def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=None, **stop_function_args):
def run(self, max_token_time=None, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=None, **stop_function_args):
"""
Run method of the actor, executes the application code by iterating
over the available tasks in CouchDB, including stop logic. The stop
logic is also extended into the EndlessViewIterator to break it when
the condition is met, otherwise it never stops.
@param max_token_time: maximum time to run a single token before stopping
@param max_time: maximum time to run picas before stopping
@param avg_time_factor: used for estimating when to stop with `max_time`,
value is average time per token to run
Expand All @@ -197,7 +200,7 @@ def run(self, max_time=None, avg_time_factor=0.0, max_tasks=0, stop_function=Non

try:
for task in self.iterator:
self._run(task)
self._run(task, timeout=max_token_time)

logging.debug("Tasks executed: ", self.tasks_processed)

Expand Down

0 comments on commit 9961c7b

Please sign in to comment.