From 0f3bb8d821d6d7b49f92fcd493339ede0585102a Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 20 Nov 2024 16:19:41 -0800 Subject: [PATCH 1/3] Enabled speculative workflow task with command events --- .../replay/ReplayWorkflowRunTaskHandler.java | 2 +- .../replay/ReplayWorkflowTaskHandler.java | 6 +- .../statemachines/WorkflowStateMachines.java | 2 +- .../UpdateProtocolStateMachineTest.java | 2 +- .../updateTest/SpeculativeUpdateTest.java | 139 ++++++++++++++++++ 5 files changed, 147 insertions(+), 4 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java index e79c73d97f..190a6e6477 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java @@ -219,7 +219,7 @@ public QueryResult handleDirectQueryWorkflowTask( @Override public void resetStartedEvenId(Long eventId) { - workflowStateMachines.resetStartedEvenId(eventId); + workflowStateMachines.resetStartedEventId(eventId); } private void handleWorkflowTaskImpl( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java index 4fb5da8822..5cc86051ba 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java @@ -230,7 +230,11 @@ private Result createCompletedWFTRequest( .setNonfirstLocalActivityExecutionAttempts( result.getNonfirstLocalActivityAttempts()) .build()) - .setReturnNewWorkflowTask(result.isForceWorkflowTask()); + .setReturnNewWorkflowTask(result.isForceWorkflowTask()) + .setCapabilities( + RespondWorkflowTaskCompletedRequest.Capabilities.newBuilder() + .setDiscardSpeculativeWorkflowTaskWithEvents(true) + .build()); if (stickyTaskQueue != null && (stickyTaskQueueScheduleToStartTimeout == null diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index f6463d59fe..260e3acf3a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -222,7 +222,7 @@ public void setWorkflowStartedEventId(long workflowTaskStartedEventId) { this.workflowTaskStartedEventId = workflowTaskStartedEventId; } - public void resetStartedEvenId(long eventId) { + public void resetStartedEventId(long eventId) { // We must reset the last event we handled to be after the last WFT we really completed // + any command events (since the SDK "processed" those when it emitted the commands). This // is also equal to what we just processed in the speculative task, minus two, since we diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java index c546baf11f..5f58424554 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java @@ -547,7 +547,7 @@ protected void signal(HistoryEvent signalEvent, AsyncWorkflowBuilder build assertNotNull(rejection); assertEquals(request, rejection.getRejectedRequest()); // Simulate the server request to reset the workflow event ID - stateMachines.resetStartedEvenId(3); + stateMachines.resetStartedEventId(3); // Create a new history after the reset event ID /* 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java new file mode 100644 index 0000000000..68401a6e76 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.updateTest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import io.temporal.activity.ActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.*; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Async; +import io.temporal.workflow.CompletablePromise; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities; +import io.temporal.workflow.shared.TestWorkflows.WorkflowWithUpdate; +import java.time.Duration; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import org.junit.Rule; +import org.junit.Test; + +public class SpeculativeUpdateTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestUpdateWorkflowImpl.class) + .setActivityImplementations(new TestActivities.TestActivitiesImpl()) + .setUseExternalService(true) + .build(); + + @Test(timeout = 60000) + public void speculativeUpdateRejected() { + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .build(); + WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options); + // To execute workflow client.execute() would do. But we want to start workflow and immediately + // return. + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + workflow.update(3, "test value"); + // This update is going to be rejected, the resulting workflow task will not appear in history + assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject")); + + assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject")); + // Create more events to make sure the server persists the workflow tasks + workflow.update(12, "test value"); + // This update is going to be rejected, the resulting workflow task will appear in history + assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject")); + + assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject")); + + workflow.complete(); + String result = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(execution, Optional.empty()) + .getResult(String.class); + assertEquals("", result); + } + + public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate { + String state = "initial"; + CompletablePromise promise = Workflow.newPromise(); + + private final TestActivities.VariousTestActivities activities = + Workflow.newActivityStub( + TestActivities.VariousTestActivities.class, + ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(200)) + .build()); + + @Override + public String execute() { + promise.get(); + return ""; + } + + @Override + public String getState() { + return state; + } + + @Override + public String update(Integer index, String value) { + Random random = Workflow.newRandom(); + for (int i = 0; i <= index; i++) { + int choice = random.nextInt(3); + if (choice == 0) { + Async.function(activities::sleepActivity, new Long(10000), 0); + } else if (choice == 1) { + Workflow.getVersion("test version " + i, Workflow.DEFAULT_VERSION, 1); + } else { + Workflow.newTimer(Duration.ofMillis(10)); + } + } + return value; + } + + @Override + public void updateValidator(Integer index, String value) { + if (value.equals("reject")) { + throw new RuntimeException("Rejecting update"); + } + } + + @Override + public void complete() { + promise.complete(null); + } + + @Override + public void completeValidator() {} + } +} From d712137b285e986c675372badc4ada81d600b778 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 27 Jan 2025 15:09:16 -0800 Subject: [PATCH 2/3] add check for real server --- .../workflow/updateTest/SpeculativeUpdateTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java index 68401a6e76..058728fb7c 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assume.assumeTrue; import io.temporal.activity.ActivityOptions; import io.temporal.api.common.v1.WorkflowExecution; @@ -46,11 +47,14 @@ public class SpeculativeUpdateTest { SDKTestWorkflowRule.newBuilder() .setWorkflowTypes(TestUpdateWorkflowImpl.class) .setActivityImplementations(new TestActivities.TestActivitiesImpl()) - .setUseExternalService(true) .build(); @Test(timeout = 60000) public void speculativeUpdateRejected() { + assumeTrue( + "Test Server doesn't support speculative update yet", + SDKTestWorkflowRule.useExternalService); + String workflowId = UUID.randomUUID().toString(); WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); WorkflowOptions options = @@ -58,8 +62,6 @@ public void speculativeUpdateRejected() { .setWorkflowId(workflowId) .build(); WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options); - // To execute workflow client.execute() would do. But we want to start workflow and immediately - // return. WorkflowExecution execution = WorkflowClient.start(workflow::execute); workflow.update(3, "test value"); From 9f901f285bc63cdc84fbd54a622c73a97118466b Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 28 Jan 2025 07:32:56 -0800 Subject: [PATCH 3/3] even -> event --- .../temporal/internal/replay/ReplayWorkflowRunTaskHandler.java | 2 +- .../io/temporal/internal/replay/ReplayWorkflowTaskHandler.java | 2 +- .../io/temporal/internal/replay/WorkflowRunTaskHandler.java | 2 +- .../java/io/temporal/internal/sync/DeterministicRunnerTest.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java index 190a6e6477..0fa1485c0c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java @@ -218,7 +218,7 @@ public QueryResult handleDirectQueryWorkflowTask( } @Override - public void resetStartedEvenId(Long eventId) { + public void resetStartedEventId(Long eventId) { workflowStateMachines.resetStartedEventId(eventId); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java index 5cc86051ba..7e381199f2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java @@ -139,7 +139,7 @@ private Result handleWorkflowTaskWithQuery( workflowTask.getWorkflowType().getName(), workflowTask, wftResult, - workflowRunTaskHandler::resetStartedEvenId); + workflowRunTaskHandler::resetStartedEventId); } if (useCache) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowRunTaskHandler.java index e0bac382cf..f27eaafb07 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowRunTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowRunTaskHandler.java @@ -61,7 +61,7 @@ QueryResult handleDirectQueryWorkflowTask( * * @param eventId the event ID to reset the cached state to. */ - void resetStartedEvenId(Long eventId); + void resetStartedEventId(Long eventId); void close(); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java index 1966128b87..9358b02fa7 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java @@ -809,7 +809,7 @@ public QueryResult handleDirectQueryWorkflowTask( } @Override - public void resetStartedEvenId(Long event) {} + public void resetStartedEventId(Long event) {} @Override public void close() {