From b6ee3297515970c136161f59713afbf155c9aaea Mon Sep 17 00:00:00 2001 From: Franck Plazanet Date: Mon, 8 Jul 2024 14:56:12 +0000 Subject: [PATCH] Refactor: use method activity --- polling/infrequent/activities.py | 15 ++++++++------- polling/infrequent/run_worker.py | 6 +++--- polling/infrequent/workflows.py | 6 +++--- polling/test_service.py | 4 ++-- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/polling/infrequent/activities.py b/polling/infrequent/activities.py index be48b94e..6f5434f8 100644 --- a/polling/infrequent/activities.py +++ b/polling/infrequent/activities.py @@ -10,11 +10,12 @@ class ComposeGreetingInput: greeting: str name: str +class ComposeGreeting: + def __init__(self, test_service: TestService = None): + self.test_service = test_service or TestService() -@activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: - attempt = activity.info().attempt - 1 - test_service = TestService(attempt=attempt) - # If this raises an exception because it's not done yet, the activity will - # continually be scheduled for retry - return await test_service.get_service_result(input) + @activity.defn + async def compose_greeting(self, input: ComposeGreetingInput) -> str: + # If this raises an exception because it's not done yet, the activity will + # continually be scheduled for retry + return await self.test_service.get_service_result(input) diff --git a/polling/infrequent/run_worker.py b/polling/infrequent/run_worker.py index f600b949..020c04a9 100644 --- a/polling/infrequent/run_worker.py +++ b/polling/infrequent/run_worker.py @@ -3,18 +3,18 @@ from temporalio.client import Client from temporalio.worker import Worker -from polling.infrequent.activities import compose_greeting +from polling.infrequent.activities import ComposeGreeting from polling.infrequent.workflows import GreetingWorkflow async def main(): client = await Client.connect("localhost:7233") - + activities = ComposeGreeting() worker = Worker( client, task_queue="infrequent-activity-retry-task-queue", workflows=[GreetingWorkflow], - activities=[compose_greeting], + activities=[activities.compose_greeting], ) await worker.run() diff --git a/polling/infrequent/workflows.py b/polling/infrequent/workflows.py index 35769573..a3ae7418 100644 --- a/polling/infrequent/workflows.py +++ b/polling/infrequent/workflows.py @@ -4,15 +4,15 @@ from temporalio.common import RetryPolicy with workflow.unsafe.imports_passed_through(): - from polling.infrequent.activities import ComposeGreetingInput, compose_greeting + from polling.infrequent.activities import ComposeGreetingInput, ComposeGreeting @workflow.defn class GreetingWorkflow: @workflow.run async def run(self, name: str) -> str: - return await workflow.execute_activity( - compose_greeting, + return await workflow.execute_activity_method( + ComposeGreeting.compose_greeting, ComposeGreetingInput("Hello", name), start_to_close_timeout=timedelta(seconds=2), retry_policy=RetryPolicy( diff --git a/polling/test_service.py b/polling/test_service.py index 96fed5a0..c9d6674f 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,6 +1,6 @@ class TestService: - def __init__(self, attempt=0, error_attempts=5): - self.try_attempts = attempt + def __init__(self, error_attempts=5): + self.try_attempts = 0 self.error_attempts = error_attempts async def get_service_result(self, input):