From e771d5b3f2d09015ad5c451c7accb20f16449517 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Mon, 6 May 2024 09:05:02 -0500 Subject: [PATCH] Worker client replacement (#236) Fixes #235 --- src/Temporalio/Bridge/Interop/Interop.cs | 3 + src/Temporalio/Bridge/Worker.cs | 12 ++++ .../Bridge/include/temporal-sdk-bridge.h | 2 + src/Temporalio/Bridge/sdk-core | 2 +- src/Temporalio/Bridge/src/worker.rs | 8 +++ src/Temporalio/Worker/TemporalWorker.cs | 40 ++++++++++-- .../Worker/WorkflowWorkerTests.cs | 63 +++++++++++++++++++ 7 files changed, 125 insertions(+), 5 deletions(-) diff --git a/src/Temporalio/Bridge/Interop/Interop.cs b/src/Temporalio/Bridge/Interop/Interop.cs index 8944ceab..a65493a8 100644 --- a/src/Temporalio/Bridge/Interop/Interop.cs +++ b/src/Temporalio/Bridge/Interop/Interop.cs @@ -726,6 +726,9 @@ internal static unsafe partial class Methods [DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] public static extern void worker_free([NativeTypeName("struct Worker *")] Worker* worker); + [DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] + public static extern void worker_replace_client([NativeTypeName("struct Worker *")] Worker* worker, [NativeTypeName("struct Client *")] Client* new_client); + [DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] public static extern void worker_poll_workflow_activation([NativeTypeName("struct Worker *")] Worker* worker, void* user_data, [NativeTypeName("WorkerPollCallback")] IntPtr callback); diff --git a/src/Temporalio/Bridge/Worker.cs b/src/Temporalio/Bridge/Worker.cs index b8888d4e..479cc2a2 100644 --- a/src/Temporalio/Bridge/Worker.cs +++ b/src/Temporalio/Bridge/Worker.cs @@ -88,6 +88,18 @@ internal Worker(Worker other) /// internal unsafe Interop.Worker* Ptr { get; private set; } + /// + /// Replace the client. + /// + /// New client. + public void ReplaceClient(Client client) + { + unsafe + { + Interop.Methods.worker_replace_client(Ptr, client.Ptr); + } + } + /// /// Poll for the next workflow activation. /// diff --git a/src/Temporalio/Bridge/include/temporal-sdk-bridge.h b/src/Temporalio/Bridge/include/temporal-sdk-bridge.h index e206b9b7..1aaf37c3 100644 --- a/src/Temporalio/Bridge/include/temporal-sdk-bridge.h +++ b/src/Temporalio/Bridge/include/temporal-sdk-bridge.h @@ -521,6 +521,8 @@ struct WorkerOrFail worker_new(struct Client *client, const struct WorkerOptions void worker_free(struct Worker *worker); +void worker_replace_client(struct Worker *worker, struct Client *new_client); + void worker_poll_workflow_activation(struct Worker *worker, void *user_data, WorkerPollCallback callback); diff --git a/src/Temporalio/Bridge/sdk-core b/src/Temporalio/Bridge/sdk-core index 409e74ec..f8593766 160000 --- a/src/Temporalio/Bridge/sdk-core +++ b/src/Temporalio/Bridge/sdk-core @@ -1 +1 @@ -Subproject commit 409e74ec8e80ae4c1f9043e8b413b1371b65f946 +Subproject commit f859376686e46c36607ea527e9fdceec481f549d diff --git a/src/Temporalio/Bridge/src/worker.rs b/src/Temporalio/Bridge/src/worker.rs index 0f4f23f3..64d26c34 100644 --- a/src/Temporalio/Bridge/src/worker.rs +++ b/src/Temporalio/Bridge/src/worker.rs @@ -134,6 +134,14 @@ pub extern "C" fn worker_free(worker: *mut Worker) { } } +#[no_mangle] +pub extern "C" fn worker_replace_client(worker: *mut Worker, new_client: *mut Client) { + let worker = unsafe { &*worker }; + let core_worker = worker.worker.as_ref().expect("missing worker").clone(); + let client = unsafe { &*new_client }; + core_worker.replace_client(client.core.get_client().clone()); +} + /// If success or fail are present, they must be freed. They will both be null /// if this is a result of a poll shutdown. type WorkerPollCallback = unsafe extern "C" fn( diff --git a/src/Temporalio/Worker/TemporalWorker.cs b/src/Temporalio/Worker/TemporalWorker.cs index c22d69d4..f3d53c33 100644 --- a/src/Temporalio/Worker/TemporalWorker.cs +++ b/src/Temporalio/Worker/TemporalWorker.cs @@ -15,9 +15,11 @@ namespace Temporalio.Worker /// public class TemporalWorker : IDisposable { + private readonly object clientLock = new(); private readonly ActivityWorker? activityWorker; private readonly WorkflowWorker? workflowWorker; private readonly bool workflowTracingEventListenerEnabled; + private IWorkerClient client; private int started; /// @@ -31,7 +33,7 @@ public class TemporalWorker : IDisposable /// Options for the worker. public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) { - Client = client; + this.client = client; // Clone the options to discourage mutation (but we aren't completely disabling mutation // on the Options field herein). Options = (TemporalWorkerOptions)options.Clone(); @@ -49,7 +51,7 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) // Interceptors are the client interceptors that implement IWorkerInterceptor followed // by the explicitly provided ones in options. - var interceptors = Client.Options.Interceptors?.OfType() ?? + var interceptors = client.Options.Interceptors?.OfType() ?? Enumerable.Empty(); if (Options.Interceptors != null) { @@ -102,9 +104,39 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) public TemporalWorkerOptions Options { get; private init; } /// - /// Gets the client this worker was created with. + /// Gets or sets the client for this worker. /// - internal IWorkerClient Client { get; private init; } + /// + /// When this property is set, it actually replaces the underlying client that is being used + /// by the worker. This means the next calls by the worker to Temporal (e.g. responding + /// task completion, activity heartbeat, etc) will be on this new client, but outstanding + /// calls will not be immediately interrupted. + /// + /// + /// When setting this value, the previous client will no longer apply for eager workflow + /// start. This new client will now be registered with this worker for eager workflow start. + /// + public IWorkerClient Client + { + get + { + lock (clientLock) + { + return client; + } + } + + set + { + var bridgeClient = value.BridgeClientProvider.BridgeClient ?? + throw new InvalidOperationException("Cannot use unconnected lazy client for worker"); + lock (clientLock) + { + BridgeWorker.ReplaceClient((Bridge.Client)bridgeClient); + client = value; + } + } + } /// /// Gets or sets the underlying bridge worker. diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index 0f1f6039..abacdd26 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -4310,6 +4310,69 @@ await ExecuteWorkerAsync(async worker => }); } + [Workflow] + public class TickingWorkflow + { + [WorkflowRun] + public async Task RunAsync() + { + // Just tick every 100ms for 10s + for (var i = 0; i < 100; i++) + { + await Workflow.DelayAsync(100); + } + } + } + + [Fact] + public async Task ExecuteWorkflowAsync_WorkerClientReplacement_UsesNewClient() + { + // We are going to start a second ephemeral server and then replace the client. So we will + // start a no-cache ticking workflow with the current client and confirm it has accomplished + // at least one task. Then we will start another on the other client, and confirm it gets + // started too. Then we will terminate both. We have to use a ticking workflow with only one + // poller to force a quick re-poll to recognize our client change quickly (as opposed to + // just waiting the minute for poll timeout). + await using var otherEnv = await Temporalio.Testing.WorkflowEnvironment.StartLocalAsync(); + + // Start both workflows on different servers + var taskQueue = $"tq-{Guid.NewGuid()}"; + var handle1 = await Client.StartWorkflowAsync( + (TickingWorkflow wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue)); + var handle2 = await otherEnv.Client.StartWorkflowAsync( + (TickingWorkflow wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue)); + + // Run the worker on the first env + await ExecuteWorkerAsync( + async worker => + { + // Confirm the first ticking workflow has completed a task but not the second workflow + await AssertHasEventEventuallyAsync(handle1, e => e.WorkflowTaskCompletedEventAttributes != null); + await foreach (var evt in handle2.FetchHistoryEventsAsync()) + { + Assert.Null(evt.WorkflowTaskCompletedEventAttributes); + } + + // Now replace the client, which should be used fairly quickly because we should have + // timer-done poll completions every 100ms + worker.Client = otherEnv.Client; + + // Now confirm the other workflow has started + await AssertHasEventEventuallyAsync(handle1, e => e.WorkflowTaskCompletedEventAttributes != null); + + // Terminate both + await handle1.TerminateAsync(); + await handle2.TerminateAsync(); + }, + new(taskQueue) + { + MaxCachedWorkflows = 0, + MaxConcurrentWorkflowTaskPolls = 1, + }); + } + internal static Task AssertTaskFailureContainsEventuallyAsync( WorkflowHandle handle, string messageContains) {