Skip to content

Commit

Permalink
Change approach to effectively revert temporalio#242 and use AsyncLocal
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Jun 3, 2024
1 parent 049722c commit 9907527
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 51 deletions.
85 changes: 35 additions & 50 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -651,63 +651,48 @@ private void RunOnce(bool checkConditions)
while (scheduledTasks.Count > 0)
{
// Run all tasks until empty
RunAllTasks();

// If there are any conditions, run the condition-check as a task. This is a task
// instead of just run inline because it needs to be in the workflow context (i.e.
// current task scheduler) so the `Workflow` methods work properly. However, we make
// sure that we only run this task because we want the loop to continue with other
// scheduled tasks that the conditions caused to wake up. An original, naive form of
// this ran all tasks including conditions, but that didn't allow conditions that
// depended on each other to properly re-schedule earlier conditions.
if (checkConditions && conditions.Count > 0)
while (scheduledTasks.Count > 0)
{
_ = QueueNewTaskAsync(CheckConditionsAsync);
RunAllTasks(singleTaskOnly: true);
}
}
}

private void RunAllTasks(bool singleTaskOnly = false)
{
while (scheduledTasks.Count > 0)
{
// Pop last
var task = scheduledTasks.Last!.Value;
scheduledTasks.RemoveLast();
scheduledTaskNodes.Remove(task);
// Pop last
var task = scheduledTasks.Last!.Value;
scheduledTasks.RemoveLast();
scheduledTaskNodes.Remove(task);

// This should never return false
if (!TryExecuteTask(task))
{
logger.LogWarning("Task unexpectedly was unable to execute");
}
if (currentActivationException != null)
{
ExceptionDispatchInfo.Capture(currentActivationException).Throw();
}
// We return on single-task-only regardless of whether there are more tasks
if (singleTaskOnly)
{
return;
// This should never return false
if (!TryExecuteTask(task))
{
logger.LogWarning("Task unexpectedly was unable to execute");
}
if (currentActivationException != null)
{
ExceptionDispatchInfo.Capture(currentActivationException).Throw();
}
}
}
}

private Task CheckConditionsAsync()
{
try
{
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
// Check conditions. It would be nice if we could run this in the task scheduler
// because then users could have access to the `Workflow` context in the condition
// callback. However, this cannot be done because even just running one task in the
// scheduler causes .NET to add more tasks to the scheduler. And you don't want to
// "run until empty" with the condition, because conditions may need to be retried
// based on each other. This sounds confusing but basically: can't run check
// conditions in the task scheduler comfortably but still need to access the static
// Workflow class, hence the context override.
if (checkConditions && conditions.Count > 0)
{
source.TrySetResult(null);
Workflow.OverrideContext.Value = this;
try
{
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
{
source.TrySetResult(null);
}
}
finally
{
Workflow.OverrideContext.Value = null;
}
}
}
catch (Exception e)
{
currentActivationException = e;
}
return Task.CompletedTask;
}

private void AddCommand(WorkflowCommand cmd)
Expand Down
13 changes: 12 additions & 1 deletion src/Temporalio/Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,19 @@ public static WorkflowUpdateDefinition? DynamicUpdate
/// </remarks>
public static DateTime UtcNow => Context.UtcNow;

/// <summary>
/// Gets an async local to override the context.
/// </summary>
/// <remarks>
/// This was only made available so WaitConditionAsync callbacks could have access to the
/// workflow context without running inside the task scheduler.
/// </remarks>
internal static AsyncLocal<IWorkflowContext?> OverrideContext { get; } = new();

private static IWorkflowContext Context =>
TaskScheduler.Current as IWorkflowContext ?? throw new InvalidOperationException("Not in workflow");
TaskScheduler.Current as IWorkflowContext ??
OverrideContext.Value ??
throw new InvalidOperationException("Not in workflow");

/// <summary>
/// Create an exception via lambda invoking the run method that, when thrown out of the
Expand Down

0 comments on commit 9907527

Please sign in to comment.