diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java index e79c73d97f..0fa1485c0c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java @@ -218,8 +218,8 @@ public QueryResult handleDirectQueryWorkflowTask( } @Override - public void resetStartedEvenId(Long eventId) { - workflowStateMachines.resetStartedEvenId(eventId); + public void resetStartedEventId(Long eventId) { + workflowStateMachines.resetStartedEventId(eventId); } private void handleWorkflowTaskImpl( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java index 4fb5da8822..7e381199f2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java @@ -139,7 +139,7 @@ private Result handleWorkflowTaskWithQuery( workflowTask.getWorkflowType().getName(), workflowTask, wftResult, - workflowRunTaskHandler::resetStartedEvenId); + workflowRunTaskHandler::resetStartedEventId); } if (useCache) { @@ -230,7 +230,11 @@ private Result createCompletedWFTRequest( .setNonfirstLocalActivityExecutionAttempts( result.getNonfirstLocalActivityAttempts()) .build()) - .setReturnNewWorkflowTask(result.isForceWorkflowTask()); + .setReturnNewWorkflowTask(result.isForceWorkflowTask()) + .setCapabilities( + RespondWorkflowTaskCompletedRequest.Capabilities.newBuilder() + .setDiscardSpeculativeWorkflowTaskWithEvents(true) + .build()); if (stickyTaskQueue != null && (stickyTaskQueueScheduleToStartTimeout == null diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowRunTaskHandler.java index e0bac382cf..f27eaafb07 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowRunTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowRunTaskHandler.java @@ -61,7 +61,7 @@ QueryResult handleDirectQueryWorkflowTask( * * @param eventId the event ID to reset the cached state to. */ - void resetStartedEvenId(Long eventId); + void resetStartedEventId(Long eventId); void close(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index f6463d59fe..260e3acf3a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -222,7 +222,7 @@ public void setWorkflowStartedEventId(long workflowTaskStartedEventId) { this.workflowTaskStartedEventId = workflowTaskStartedEventId; } - public void resetStartedEvenId(long eventId) { + public void resetStartedEventId(long eventId) { // We must reset the last event we handled to be after the last WFT we really completed // + any command events (since the SDK "processed" those when it emitted the commands). This // is also equal to what we just processed in the speculative task, minus two, since we diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java index c546baf11f..5f58424554 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java @@ -547,7 +547,7 @@ protected void signal(HistoryEvent signalEvent, AsyncWorkflowBuilder build assertNotNull(rejection); assertEquals(request, rejection.getRejectedRequest()); // Simulate the server request to reset the workflow event ID - stateMachines.resetStartedEvenId(3); + stateMachines.resetStartedEventId(3); // Create a new history after the reset event ID /* 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java index 1966128b87..9358b02fa7 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java @@ -809,7 +809,7 @@ public QueryResult handleDirectQueryWorkflowTask( } @Override - public void resetStartedEvenId(Long event) {} + public void resetStartedEventId(Long event) {} @Override public void close() { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java new file mode 100644 index 0000000000..058728fb7c --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.updateTest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assume.assumeTrue; + +import io.temporal.activity.ActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.*; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Async; +import io.temporal.workflow.CompletablePromise; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities; +import io.temporal.workflow.shared.TestWorkflows.WorkflowWithUpdate; +import java.time.Duration; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import org.junit.Rule; +import org.junit.Test; + +public class SpeculativeUpdateTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestUpdateWorkflowImpl.class) + .setActivityImplementations(new TestActivities.TestActivitiesImpl()) + .build(); + + @Test(timeout = 60000) + public void speculativeUpdateRejected() { + assumeTrue( + "Test Server doesn't support speculative update yet", + SDKTestWorkflowRule.useExternalService); + + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .build(); + WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options); + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + workflow.update(3, "test value"); + // This update is going to be rejected, the resulting workflow task will not appear in history + assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject")); + + assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject")); + // Create more events to make sure the server persists the workflow tasks + workflow.update(12, "test value"); + // This update is going to be rejected, the resulting workflow task will appear in history + assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject")); + + assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject")); + + workflow.complete(); + String result = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub(execution, Optional.empty()) + .getResult(String.class); + assertEquals("", result); + } + + public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate { + String state = "initial"; + CompletablePromise promise = Workflow.newPromise(); + + private final TestActivities.VariousTestActivities activities = + Workflow.newActivityStub( + TestActivities.VariousTestActivities.class, + ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(200)) + .build()); + + @Override + public String execute() { + promise.get(); + return ""; + } + + @Override + public String getState() { + return state; + } + + @Override + public String update(Integer index, String value) { + Random random = Workflow.newRandom(); + for (int i = 0; i <= index; i++) { + int choice = random.nextInt(3); + if (choice == 0) { + Async.function(activities::sleepActivity, new Long(10000), 0); + } else if (choice == 1) { + Workflow.getVersion("test version " + i, Workflow.DEFAULT_VERSION, 1); + } else { + Workflow.newTimer(Duration.ofMillis(10)); + } + } + return value; + } + + @Override + public void updateValidator(Integer index, String value) { + if (value.equals("reject")) { + throw new RuntimeException("Rejecting update"); + } + } + + @Override + public void complete() { + promise.complete(null); + } + + @Override + public void completeValidator() {} + } +}