From af2c5c24eb342b1db7060f848fba5a16fd5ace5d Mon Sep 17 00:00:00 2001 From: Franck Plazanet Date: Mon, 8 Jul 2024 09:40:33 +0000 Subject: [PATCH 1/3] Refactor: Correct polling infrequent usecase --- polling/infrequent/activities.py | 3 ++- polling/test_service.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/polling/infrequent/activities.py b/polling/infrequent/activities.py index 2bd71587..be48b94e 100644 --- a/polling/infrequent/activities.py +++ b/polling/infrequent/activities.py @@ -13,7 +13,8 @@ class ComposeGreetingInput: @activity.defn async def compose_greeting(input: ComposeGreetingInput) -> str: - test_service = TestService() + attempt = activity.info().attempt - 1 + test_service = TestService(attempt=attempt) # If this raises an exception because it's not done yet, the activity will # continually be scheduled for retry return await test_service.get_service_result(input) diff --git a/polling/test_service.py b/polling/test_service.py index 3744998a..96fed5a0 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,7 +1,7 @@ class TestService: - def __init__(self): - self.try_attempts = 0 - self.error_attempts = 5 + def __init__(self, attempt=0, error_attempts=5): + self.try_attempts = attempt + self.error_attempts = error_attempts async def get_service_result(self, input): print( From ac73803d4e6afc51d577cd1833f0acb5e66cd0a6 Mon Sep 17 00:00:00 2001 From: Franck Plazanet Date: Mon, 8 Jul 2024 14:56:12 +0000 Subject: [PATCH 2/3] Refactor: use method activity --- polling/infrequent/activities.py | 16 +++++++++------- polling/infrequent/run_worker.py | 6 +++--- polling/infrequent/workflows.py | 6 +++--- polling/test_service.py | 6 +++--- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/polling/infrequent/activities.py b/polling/infrequent/activities.py index be48b94e..b21223b8 100644 --- a/polling/infrequent/activities.py +++ b/polling/infrequent/activities.py @@ -11,10 +11,12 @@ class ComposeGreetingInput: name: str -@activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: - attempt = activity.info().attempt - 1 - test_service = TestService(attempt=attempt) - # If this raises an exception because it's not done yet, the activity will - # continually be scheduled for retry - return await test_service.get_service_result(input) +class ComposeGreeting: + def __init__(self): + self.test_service = TestService() + + @activity.defn + async def compose_greeting(self, input: ComposeGreetingInput) -> str: + # If this raises an exception because it's not done yet, the activity will + # continually be scheduled for retry + return await self.test_service.get_service_result(input) diff --git a/polling/infrequent/run_worker.py b/polling/infrequent/run_worker.py index f600b949..020c04a9 100644 --- a/polling/infrequent/run_worker.py +++ b/polling/infrequent/run_worker.py @@ -3,18 +3,18 @@ from temporalio.client import Client from temporalio.worker import Worker -from polling.infrequent.activities import compose_greeting +from polling.infrequent.activities import ComposeGreeting from polling.infrequent.workflows import GreetingWorkflow async def main(): client = await Client.connect("localhost:7233") - + activities = ComposeGreeting() worker = Worker( client, task_queue="infrequent-activity-retry-task-queue", workflows=[GreetingWorkflow], - activities=[compose_greeting], + activities=[activities.compose_greeting], ) await worker.run() diff --git a/polling/infrequent/workflows.py b/polling/infrequent/workflows.py index 35769573..05abe943 100644 --- a/polling/infrequent/workflows.py +++ b/polling/infrequent/workflows.py @@ -4,15 +4,15 @@ from temporalio.common import RetryPolicy with workflow.unsafe.imports_passed_through(): - from polling.infrequent.activities import ComposeGreetingInput, compose_greeting + from polling.infrequent.activities import ComposeGreeting, ComposeGreetingInput @workflow.defn class GreetingWorkflow: @workflow.run async def run(self, name: str) -> str: - return await workflow.execute_activity( - compose_greeting, + return await workflow.execute_activity_method( + ComposeGreeting.compose_greeting, ComposeGreetingInput("Hello", name), start_to_close_timeout=timedelta(seconds=2), retry_policy=RetryPolicy( diff --git a/polling/test_service.py b/polling/test_service.py index 96fed5a0..3744998a 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,7 +1,7 @@ class TestService: - def __init__(self, attempt=0, error_attempts=5): - self.try_attempts = attempt - self.error_attempts = error_attempts + def __init__(self): + self.try_attempts = 0 + self.error_attempts = 5 async def get_service_result(self, input): print( From 1f733dd7e91ac9395c4136d7e7b709db70ac6b2a Mon Sep 17 00:00:00 2001 From: Franck Plazanet Date: Mon, 8 Jul 2024 16:23:56 +0000 Subject: [PATCH 3/3] Refactor: change logic periodic_sequence to work according to README --- polling/periodic_sequence/activities.py | 14 +++++++++++--- polling/periodic_sequence/run_worker.py | 6 +++--- polling/periodic_sequence/workflows.py | 16 ++++++++++------ polling/test_service.py | 4 ++-- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/polling/periodic_sequence/activities.py b/polling/periodic_sequence/activities.py index 1a1196c6..d858ac19 100644 --- a/polling/periodic_sequence/activities.py +++ b/polling/periodic_sequence/activities.py @@ -2,6 +2,8 @@ from temporalio import activity +from polling.test_service import TestService + @dataclass class ComposeGreetingInput: @@ -9,6 +11,12 @@ class ComposeGreetingInput: name: str -@activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: - raise RuntimeError("Service is down") +class ComposeGreeting: + def __init__(self): + self.test_service = TestService(error_attempts=23) + + @activity.defn + async def compose_greeting(self, input: ComposeGreetingInput) -> str: + # If this raises an exception because it's not done yet, the activity will + # continually be scheduled for retry + return await self.test_service.get_service_result(input) diff --git a/polling/periodic_sequence/run_worker.py b/polling/periodic_sequence/run_worker.py index e04ac4dc..64772b54 100644 --- a/polling/periodic_sequence/run_worker.py +++ b/polling/periodic_sequence/run_worker.py @@ -3,18 +3,18 @@ from temporalio.client import Client from temporalio.worker import Worker -from polling.periodic_sequence.activities import compose_greeting +from polling.periodic_sequence.activities import ComposeGreeting from polling.periodic_sequence.workflows import ChildWorkflow, GreetingWorkflow async def main(): client = await Client.connect("localhost:7233") - + activities = ComposeGreeting() worker = Worker( client, task_queue="periodic-retry-task-queue", workflows=[GreetingWorkflow, ChildWorkflow], - activities=[compose_greeting], + activities=[activities.compose_greeting], ) await worker.run() diff --git a/polling/periodic_sequence/workflows.py b/polling/periodic_sequence/workflows.py index d38d41ce..c07c971f 100644 --- a/polling/periodic_sequence/workflows.py +++ b/polling/periodic_sequence/workflows.py @@ -7,10 +7,12 @@ with workflow.unsafe.imports_passed_through(): from polling.periodic_sequence.activities import ( + ComposeGreeting, ComposeGreetingInput, - compose_greeting, ) +MAX_RETRY_PER_CHILD_FLOW = 10 + @workflow.defn class GreetingWorkflow: @@ -26,10 +28,10 @@ async def run(self, name: str) -> str: class ChildWorkflow: @workflow.run async def run(self, name: str) -> str: - for i in range(10): + for i in range(MAX_RETRY_PER_CHILD_FLOW): try: - return await workflow.execute_activity( - compose_greeting, + return await workflow.execute_activity_method( + ComposeGreeting.compose_greeting, ComposeGreetingInput("Hello", name), start_to_close_timeout=timedelta(seconds=4), retry_policy=RetryPolicy( @@ -38,8 +40,10 @@ async def run(self, name: str) -> str: ) except ActivityError: - workflow.logger.error("Activity failed, retrying in 1 seconds") + workflow.logger.error( + f"Activity failed ({i}/{MAX_RETRY_PER_CHILD_FLOW}), retrying in 1 seconds" + ) await asyncio.sleep(1) - workflow.continue_as_new(name) + workflow.continue_as_new(name) raise Exception("Polling failed after all attempts") diff --git a/polling/test_service.py b/polling/test_service.py index 3744998a..005150d5 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,7 +1,7 @@ class TestService: - def __init__(self): + def __init__(self, error_attempts: int = 5): self.try_attempts = 0 - self.error_attempts = 5 + self.error_attempts = error_attempts async def get_service_result(self, input): print(