Skip to content

Commit

Permalink
Support for workflow ID conflict policy (temporalio#304)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jul 12, 2024
1 parent 1fceb0b commit fa3867e
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ internal static ScheduleActionStartWorkflow FromProto(
{
throw new ArgumentException("ID reuse policy cannot change from default for scheduled workflow");
}
if (Options.IdConflictPolicy != Api.Enums.V1.WorkflowIdConflictPolicy.Unspecified)
{
throw new ArgumentException("ID conflict policy cannot change from default for scheduled workflow");
}
if (Options.CronSchedule != null)
{
throw new ArgumentException("Cron schedule cannot be set on scheduled workflow");
Expand Down
1 change: 1 addition & 0 deletions src/Temporalio/Client/TemporalClient.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ private async Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowInternalAsyn
Identity = Client.Connection.Options.Identity,
RequestId = Guid.NewGuid().ToString(),
WorkflowIdReusePolicy = input.Options.IdReusePolicy,
WorkflowIdConflictPolicy = input.Options.IdConflictPolicy,
RetryPolicy = input.Options.RetryPolicy?.ToProto(),
RequestEagerExecution = input.Options.RequestEagerStart,
};
Expand Down
9 changes: 9 additions & 0 deletions src/Temporalio/Client/WorkflowOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ public WorkflowOptions(string id, string taskQueue)
/// </summary>
public WorkflowIdReusePolicy IdReusePolicy { get; set; } = WorkflowIdReusePolicy.AllowDuplicate;

/// <summary>
/// Gets or sets how already-existing workflows of the same ID are treated. Default is
/// <see cref="WorkflowIdConflictPolicy.Unspecified" /> which effectively means
/// <see cref="WorkflowIdConflictPolicy.Fail"/> on the server. If this value is set, then
/// <see cref="IdReusePolicy"/> cannot be set to
/// <see cref="WorkflowIdReusePolicy.TerminateIfRunning"/>.
/// </summary>
public WorkflowIdConflictPolicy IdConflictPolicy { get; set; } = WorkflowIdConflictPolicy.Unspecified;

/// <summary>
/// Gets or sets the retry policy for the workflow. If unset, workflow never retries.
/// </summary>
Expand Down
60 changes: 60 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5580,6 +5580,66 @@ await AssertWarnings(
shouldWarn: false);
}

[Workflow]
public class IdConflictWorkflow
{
// Just wait forever
[WorkflowRun]
public Task RunAsync() => Workflow.WaitConditionAsync(() => false);
}

[Fact]
public async Task ExecuteWorkflowAsync_IdConflictPolicy_ProperlyApplies()
{
await ExecuteWorkerAsync<IdConflictWorkflow>(async worker =>
{
// Start a workflow
var handle = await Env.Client.StartWorkflowAsync(
(IdConflictWorkflow wf) => wf.RunAsync(),
new(id: $"wf-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
handle = handle with { RunId = handle.ResultRunId };

// Confirm another fails by default
await Assert.ThrowsAsync<WorkflowAlreadyStartedException>(() =>
Env.Client.StartWorkflowAsync(
(IdConflictWorkflow wf) => wf.RunAsync(),
new(id: handle.Id, taskQueue: worker.Options.TaskQueue!)));

// Confirm fails if explicitly given that option
await Assert.ThrowsAsync<WorkflowAlreadyStartedException>(() =>
Env.Client.StartWorkflowAsync(
(IdConflictWorkflow wf) => wf.RunAsync(),
new(id: handle.Id, taskQueue: worker.Options.TaskQueue!)
{
IdConflictPolicy = WorkflowIdConflictPolicy.Fail,
}));

// Confirm gives back same handle if requested
var newHandle = await Env.Client.StartWorkflowAsync(
(IdConflictWorkflow wf) => wf.RunAsync(),
new(id: handle.Id, taskQueue: worker.Options.TaskQueue!)
{
IdConflictPolicy = WorkflowIdConflictPolicy.UseExisting,
});
newHandle = newHandle with { RunId = newHandle.ResultRunId };
Assert.Equal(handle.RunId, newHandle.RunId);
Assert.Equal(WorkflowExecutionStatus.Running, (await handle.DescribeAsync()).Status);
Assert.Equal(WorkflowExecutionStatus.Running, (await newHandle.DescribeAsync()).Status);

// Confirm terminates and starts new if requested
newHandle = await Env.Client.StartWorkflowAsync(
(IdConflictWorkflow wf) => wf.RunAsync(),
new(id: handle.Id, taskQueue: worker.Options.TaskQueue!)
{
IdConflictPolicy = WorkflowIdConflictPolicy.TerminateExisting,
});
newHandle = newHandle with { RunId = newHandle.ResultRunId };
Assert.NotEqual(handle.RunId, newHandle.RunId);
Assert.Equal(WorkflowExecutionStatus.Terminated, (await handle.DescribeAsync()).Status);
Assert.Equal(WorkflowExecutionStatus.Running, (await newHandle.DescribeAsync()).Status);
});
}

internal static Task AssertTaskFailureContainsEventuallyAsync(
WorkflowHandle handle, string messageContains)
{
Expand Down

0 comments on commit fa3867e

Please sign in to comment.