Skip to content

Commit

Permalink
Run wait-condition callbacks in workflow context (#242)
Browse files Browse the repository at this point in the history
Fixes #240
  • Loading branch information
cretz authored May 8, 2024
1 parent f40e78e commit 5cd6f59
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 23 deletions.
66 changes: 43 additions & 23 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -648,38 +648,58 @@ protected override bool TryDequeue(Task task)
private void RunOnce(bool checkConditions)
{
// Run as long as we have scheduled tasks
// TODO(cretz): Fix to run as long as any tasks not yielded on Temporal
while (scheduledTasks.Count > 0)
{
while (scheduledTasks.Count > 0)
// Run all tasks until empty
RunAllTasks();

// If there are any conditions, schedule a new condition-check task and the run all
// tasks again. This is a scheduled task instead of just run inline because it needs
// to be in the workflow context (i.e. current task scheduler) so the `Workflow`
// methods work properly.
if (checkConditions && conditions.Count > 0)
{
// Pop last
var task = scheduledTasks.Last!.Value;
scheduledTasks.RemoveLast();
scheduledTaskNodes.Remove(task);
_ = QueueNewTaskAsync(CheckConditionsAsync);
RunAllTasks();
}
}
}

// This should never return false
if (!TryExecuteTask(task))
{
logger.LogWarning("Task unexpectedly was unable to execute");
}
if (currentActivationException != null)
{
ExceptionDispatchInfo.Capture(currentActivationException).Throw();
}
private void RunAllTasks()
{
while (scheduledTasks.Count > 0)
{
// Pop last
var task = scheduledTasks.Last!.Value;
scheduledTasks.RemoveLast();
scheduledTaskNodes.Remove(task);

// This should never return false
if (!TryExecuteTask(task))
{
logger.LogWarning("Task unexpectedly was unable to execute");
}
if (currentActivationException != null)
{
ExceptionDispatchInfo.Capture(currentActivationException).Throw();
}
}
}

// Collect all condition sources to mark complete and then complete them. This
// avoids modify-during-iterate issues.
if (checkConditions)
private Task CheckConditionsAsync()
{
try
{
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
{
var completeConditions = conditions.Where(tuple => tuple.Item1());
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
{
source.TrySetResult(null);
}
source.TrySetResult(null);
}
}
catch (Exception e)
{
currentActivationException = e;
}
return Task.CompletedTask;
}

private void AddCommand(WorkflowCommand cmd)
Expand Down
39 changes: 39 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4373,6 +4373,45 @@ await ExecuteWorkerAsync<TickingWorkflow>(
});
}

[Workflow]
public class CallWorkflowInWaitConditionWorkflow
{
[WorkflowRun]
public async Task RunAsync() => await Workflow.WaitConditionAsync(
() => !string.IsNullOrEmpty(Workflow.Info.WorkflowId));
}

[Fact]
public async Task ExecuteWorkflowAsync_WaitConditionCallingWorkflow_WorksProperly()
{
await ExecuteWorkerAsync<CallWorkflowInWaitConditionWorkflow>(async worker =>
{
await Client.ExecuteWorkflowAsync(
(CallWorkflowInWaitConditionWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
});
}

[Workflow]
public class WaitConditionExceptionWorkflow
{
[WorkflowRun]
public async Task RunAsync() => await Workflow.WaitConditionAsync(
() => throw new ApplicationFailureException("Intentional error"));
}

[Fact]
public async Task ExecuteWorkflowAsync_WaitConditionExceptionWorkflow_WorksProperly()
{
await ExecuteWorkerAsync<WaitConditionExceptionWorkflow>(async worker =>
{
var handle = await Client.StartWorkflowAsync(
(WaitConditionExceptionWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
await AssertTaskFailureContainsEventuallyAsync(handle, "Intentional error");
});
}

internal static Task AssertTaskFailureContainsEventuallyAsync(
WorkflowHandle handle, string messageContains)
{
Expand Down

0 comments on commit 5cd6f59

Please sign in to comment.