From fa3867e87b5c99faa8a330d7df3ab623177b0218 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 12 Jul 2024 13:16:37 -0500 Subject: [PATCH] Support for workflow ID conflict policy (#304) Fixes #217 --- .../Schedules/ScheduleActionStartWorkflow.cs | 4 ++ .../Client/TemporalClient.Workflow.cs | 1 + src/Temporalio/Client/WorkflowOptions.cs | 9 +++ .../Worker/WorkflowWorkerTests.cs | 60 +++++++++++++++++++ 4 files changed, 74 insertions(+) diff --git a/src/Temporalio/Client/Schedules/ScheduleActionStartWorkflow.cs b/src/Temporalio/Client/Schedules/ScheduleActionStartWorkflow.cs index d2f9cc0f..4882a3af 100644 --- a/src/Temporalio/Client/Schedules/ScheduleActionStartWorkflow.cs +++ b/src/Temporalio/Client/Schedules/ScheduleActionStartWorkflow.cs @@ -122,6 +122,10 @@ internal static ScheduleActionStartWorkflow FromProto( { throw new ArgumentException("ID reuse policy cannot change from default for scheduled workflow"); } + if (Options.IdConflictPolicy != Api.Enums.V1.WorkflowIdConflictPolicy.Unspecified) + { + throw new ArgumentException("ID conflict policy cannot change from default for scheduled workflow"); + } if (Options.CronSchedule != null) { throw new ArgumentException("Cron schedule cannot be set on scheduled workflow"); diff --git a/src/Temporalio/Client/TemporalClient.Workflow.cs b/src/Temporalio/Client/TemporalClient.Workflow.cs index 5d0227a6..8c4d08e0 100644 --- a/src/Temporalio/Client/TemporalClient.Workflow.cs +++ b/src/Temporalio/Client/TemporalClient.Workflow.cs @@ -504,6 +504,7 @@ private async Task> StartWorkflowInternalAsyn Identity = Client.Connection.Options.Identity, RequestId = Guid.NewGuid().ToString(), WorkflowIdReusePolicy = input.Options.IdReusePolicy, + WorkflowIdConflictPolicy = input.Options.IdConflictPolicy, RetryPolicy = input.Options.RetryPolicy?.ToProto(), RequestEagerExecution = input.Options.RequestEagerStart, }; diff --git a/src/Temporalio/Client/WorkflowOptions.cs b/src/Temporalio/Client/WorkflowOptions.cs index c970967c..c585df75 100644 --- a/src/Temporalio/Client/WorkflowOptions.cs +++ b/src/Temporalio/Client/WorkflowOptions.cs @@ -62,6 +62,15 @@ public WorkflowOptions(string id, string taskQueue) /// public WorkflowIdReusePolicy IdReusePolicy { get; set; } = WorkflowIdReusePolicy.AllowDuplicate; + /// + /// Gets or sets how already-existing workflows of the same ID are treated. Default is + /// which effectively means + /// on the server. If this value is set, then + /// cannot be set to + /// . + /// + public WorkflowIdConflictPolicy IdConflictPolicy { get; set; } = WorkflowIdConflictPolicy.Unspecified; + /// /// Gets or sets the retry policy for the workflow. If unset, workflow never retries. /// diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index 3f47ed16..e018fdb2 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -5580,6 +5580,66 @@ await AssertWarnings( shouldWarn: false); } + [Workflow] + public class IdConflictWorkflow + { + // Just wait forever + [WorkflowRun] + public Task RunAsync() => Workflow.WaitConditionAsync(() => false); + } + + [Fact] + public async Task ExecuteWorkflowAsync_IdConflictPolicy_ProperlyApplies() + { + await ExecuteWorkerAsync(async worker => + { + // Start a workflow + var handle = await Env.Client.StartWorkflowAsync( + (IdConflictWorkflow wf) => wf.RunAsync(), + new(id: $"wf-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + handle = handle with { RunId = handle.ResultRunId }; + + // Confirm another fails by default + await Assert.ThrowsAsync(() => + Env.Client.StartWorkflowAsync( + (IdConflictWorkflow wf) => wf.RunAsync(), + new(id: handle.Id, taskQueue: worker.Options.TaskQueue!))); + + // Confirm fails if explicitly given that option + await Assert.ThrowsAsync(() => + Env.Client.StartWorkflowAsync( + (IdConflictWorkflow wf) => wf.RunAsync(), + new(id: handle.Id, taskQueue: worker.Options.TaskQueue!) + { + IdConflictPolicy = WorkflowIdConflictPolicy.Fail, + })); + + // Confirm gives back same handle if requested + var newHandle = await Env.Client.StartWorkflowAsync( + (IdConflictWorkflow wf) => wf.RunAsync(), + new(id: handle.Id, taskQueue: worker.Options.TaskQueue!) + { + IdConflictPolicy = WorkflowIdConflictPolicy.UseExisting, + }); + newHandle = newHandle with { RunId = newHandle.ResultRunId }; + Assert.Equal(handle.RunId, newHandle.RunId); + Assert.Equal(WorkflowExecutionStatus.Running, (await handle.DescribeAsync()).Status); + Assert.Equal(WorkflowExecutionStatus.Running, (await newHandle.DescribeAsync()).Status); + + // Confirm terminates and starts new if requested + newHandle = await Env.Client.StartWorkflowAsync( + (IdConflictWorkflow wf) => wf.RunAsync(), + new(id: handle.Id, taskQueue: worker.Options.TaskQueue!) + { + IdConflictPolicy = WorkflowIdConflictPolicy.TerminateExisting, + }); + newHandle = newHandle with { RunId = newHandle.ResultRunId }; + Assert.NotEqual(handle.RunId, newHandle.RunId); + Assert.Equal(WorkflowExecutionStatus.Terminated, (await handle.DescribeAsync()).Status); + Assert.Equal(WorkflowExecutionStatus.Running, (await newHandle.DescribeAsync()).Status); + }); + } + internal static Task AssertTaskFailureContainsEventuallyAsync( WorkflowHandle handle, string messageContains) {