Skip to content

Commit

Permalink
Worker client replacement (temporalio#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored May 6, 2024
1 parent f1b8dde commit e771d5b
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/Temporalio/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ internal static unsafe partial class Methods
[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void worker_free([NativeTypeName("struct Worker *")] Worker* worker);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void worker_replace_client([NativeTypeName("struct Worker *")] Worker* worker, [NativeTypeName("struct Client *")] Client* new_client);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void worker_poll_workflow_activation([NativeTypeName("struct Worker *")] Worker* worker, void* user_data, [NativeTypeName("WorkerPollCallback")] IntPtr callback);

Expand Down
12 changes: 12 additions & 0 deletions src/Temporalio/Bridge/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ internal Worker(Worker other)
/// </summary>
internal unsafe Interop.Worker* Ptr { get; private set; }

/// <summary>
/// Replace the client.
/// </summary>
/// <param name="client">New client.</param>
public void ReplaceClient(Client client)
{
unsafe
{
Interop.Methods.worker_replace_client(Ptr, client.Ptr);
}
}

/// <summary>
/// Poll for the next workflow activation.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/Temporalio/Bridge/include/temporal-sdk-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ struct WorkerOrFail worker_new(struct Client *client, const struct WorkerOptions

void worker_free(struct Worker *worker);

void worker_replace_client(struct Worker *worker, struct Client *new_client);

void worker_poll_workflow_activation(struct Worker *worker,
void *user_data,
WorkerPollCallback callback);
Expand Down
2 changes: 1 addition & 1 deletion src/Temporalio/Bridge/sdk-core
Submodule sdk-core updated 83 files
+3 −0 Cargo.toml
+0 −2 README.md
+3 −0 client/Cargo.toml
+3 −1 client/src/lib.rs
+1 −1 client/src/metrics.rs
+4 −4 client/src/raw.rs
+5 −3 client/src/worker_registry/mod.rs
+3 −1 client/src/workflow_handle/mod.rs
+3 −0 core-api/Cargo.toml
+4 −0 core/Cargo.toml
+11 −11 core/src/abstractions.rs
+3 −3 core/src/abstractions/take_cell.rs
+2 −1 core/src/core_tests/determinism.rs
+1 −1 core/src/core_tests/local_activities.rs
+1 −1 core/src/core_tests/workflow_tasks.rs
+4 −120 core/src/ephemeral_server/mod.rs
+8 −8 core/src/internal_flags.rs
+12 −8 core/src/lib.rs
+2 −2 core/src/pollers/mod.rs
+6 −4 core/src/pollers/poll_buffer.rs
+31 −31 core/src/protosext/mod.rs
+14 −12 core/src/protosext/protocol_messages.rs
+32 −1 core/src/telemetry/metrics.rs
+3 −2 core/src/telemetry/otel.rs
+4 −4 core/src/telemetry/prometheus_server.rs
+57 −44 core/src/test_help/mod.rs
+14 −14 core/src/worker/activities.rs
+6 −6 core/src/worker/activities/activity_heartbeat_manager.rs
+17 −16 core/src/worker/activities/local_activities.rs
+40 −38 core/src/worker/client.rs
+5 −3 core/src/worker/client/mocks.rs
+21 −9 core/src/worker/mod.rs
+3 −3 core/src/worker/slot_provider.rs
+6 −6 core/src/worker/workflow/driven_workflow.rs
+21 −18 core/src/worker/workflow/history_update.rs
+2 −15 core/src/worker/workflow/machines/activity_state_machine.rs
+1 −10 core/src/worker/workflow/machines/cancel_external_state_machine.rs
+2 −9 core/src/worker/workflow/machines/cancel_workflow_state_machine.rs
+3 −17 core/src/worker/workflow/machines/child_workflow_state_machine.rs
+0 −8 core/src/worker/workflow/machines/complete_workflow_state_machine.rs
+1 −5 core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs
+0 −5 core/src/worker/workflow/machines/fail_workflow_state_machine.rs
+0 −5 core/src/worker/workflow/machines/local_activity_state_machine.rs
+0 −14 core/src/worker/workflow/machines/mod.rs
+0 −5 core/src/worker/workflow/machines/modify_workflow_properties_state_machine.rs
+0 −5 core/src/worker/workflow/machines/patch_state_machine.rs
+1 −10 core/src/worker/workflow/machines/signal_external_state_machine.rs
+1 −8 core/src/worker/workflow/machines/timer_state_machine.rs
+6 −1 core/src/worker/workflow/machines/transition_coverage.rs
+0 −10 core/src/worker/workflow/machines/update_state_machine.rs
+6 −13 core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs
+21 −19 core/src/worker/workflow/machines/workflow_machines.rs
+1 −12 core/src/worker/workflow/machines/workflow_task_state_machine.rs
+4 −4 core/src/worker/workflow/managed_run.rs
+38 −38 core/src/worker/workflow/mod.rs
+22 −13 core/src/worker/workflow/run_cache.rs
+2 −2 core/src/worker/workflow/workflow_stream.rs
+3 −0 fsm/Cargo.toml
+3 −0 sdk-core-protos/Cargo.toml
+1 −1 sdk-core-protos/src/history_builder.rs
+16 −8 sdk-core-protos/src/lib.rs
+1 −1 sdk-core-protos/src/task_token.rs
+3 −0 sdk/Cargo.toml
+3 −3 sdk/src/app_data.rs
+0 −1 sdk/src/lib.rs
+0 −11 sdk/src/payload_converter.rs
+10 −9 sdk/src/workflow_context.rs
+1 −1 sdk/src/workflow_context/options.rs
+1 −1 sdk/src/workflow_future.rs
+3 −0 test-utils/Cargo.toml
+2 −2 test-utils/src/lib.rs
+1 −1 tests/integ_tests/activity_functions.rs
+8 −25 tests/integ_tests/ephemeral_server_tests.rs
+4 −3 tests/integ_tests/metrics_tests.rs
+120 −1 tests/integ_tests/polling_tests.rs
+2 −1 tests/integ_tests/workflow_tests.rs
+1 −1 tests/integ_tests/workflow_tests/activities.rs
+1 −1 tests/integ_tests/workflow_tests/appdata_propagation.rs
+2 −1 tests/integ_tests/workflow_tests/determinism.rs
+1 −1 tests/integ_tests/workflow_tests/eager.rs
+4 −4 tests/integ_tests/workflow_tests/local_activities.rs
+5 −3 tests/integ_tests/workflow_tests/patches.rs
+1 −1 tests/integ_tests/workflow_tests/timers.rs
8 changes: 8 additions & 0 deletions src/Temporalio/Bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ pub extern "C" fn worker_free(worker: *mut Worker) {
}
}

#[no_mangle]
pub extern "C" fn worker_replace_client(worker: *mut Worker, new_client: *mut Client) {
let worker = unsafe { &*worker };
let core_worker = worker.worker.as_ref().expect("missing worker").clone();
let client = unsafe { &*new_client };
core_worker.replace_client(client.core.get_client().clone());
}

/// If success or fail are present, they must be freed. They will both be null
/// if this is a result of a poll shutdown.
type WorkerPollCallback = unsafe extern "C" fn(
Expand Down
40 changes: 36 additions & 4 deletions src/Temporalio/Worker/TemporalWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ namespace Temporalio.Worker
/// </summary>
public class TemporalWorker : IDisposable
{
private readonly object clientLock = new();
private readonly ActivityWorker? activityWorker;
private readonly WorkflowWorker? workflowWorker;
private readonly bool workflowTracingEventListenerEnabled;
private IWorkerClient client;
private int started;

/// <summary>
Expand All @@ -31,7 +33,7 @@ public class TemporalWorker : IDisposable
/// <param name="options">Options for the worker.</param>
public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
{
Client = client;
this.client = client;
// Clone the options to discourage mutation (but we aren't completely disabling mutation
// on the Options field herein).
Options = (TemporalWorkerOptions)options.Clone();
Expand All @@ -49,7 +51,7 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)

// Interceptors are the client interceptors that implement IWorkerInterceptor followed
// by the explicitly provided ones in options.
var interceptors = Client.Options.Interceptors?.OfType<IWorkerInterceptor>() ??
var interceptors = client.Options.Interceptors?.OfType<IWorkerInterceptor>() ??
Enumerable.Empty<IWorkerInterceptor>();
if (Options.Interceptors != null)
{
Expand Down Expand Up @@ -102,9 +104,39 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
public TemporalWorkerOptions Options { get; private init; }

/// <summary>
/// Gets the client this worker was created with.
/// Gets or sets the client for this worker.
/// </summary>
internal IWorkerClient Client { get; private init; }
/// <remarks>
/// When this property is set, it actually replaces the underlying client that is being used
/// by the worker. This means the next calls by the worker to Temporal (e.g. responding
/// task completion, activity heartbeat, etc) will be on this new client, but outstanding
/// calls will not be immediately interrupted.
/// </remarks>
/// <remarks>
/// When setting this value, the previous client will no longer apply for eager workflow
/// start. This new client will now be registered with this worker for eager workflow start.
/// </remarks>
public IWorkerClient Client
{
get
{
lock (clientLock)
{
return client;
}
}

set
{
var bridgeClient = value.BridgeClientProvider.BridgeClient ??
throw new InvalidOperationException("Cannot use unconnected lazy client for worker");
lock (clientLock)
{
BridgeWorker.ReplaceClient((Bridge.Client)bridgeClient);
client = value;
}
}
}

/// <summary>
/// Gets or sets the underlying bridge worker.
Expand Down
63 changes: 63 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4310,6 +4310,69 @@ await ExecuteWorkerAsync<FailOnBadInputWorkflow>(async worker =>
});
}

[Workflow]
public class TickingWorkflow
{
[WorkflowRun]
public async Task RunAsync()
{
// Just tick every 100ms for 10s
for (var i = 0; i < 100; i++)
{
await Workflow.DelayAsync(100);
}
}
}

[Fact]
public async Task ExecuteWorkflowAsync_WorkerClientReplacement_UsesNewClient()
{
// We are going to start a second ephemeral server and then replace the client. So we will
// start a no-cache ticking workflow with the current client and confirm it has accomplished
// at least one task. Then we will start another on the other client, and confirm it gets
// started too. Then we will terminate both. We have to use a ticking workflow with only one
// poller to force a quick re-poll to recognize our client change quickly (as opposed to
// just waiting the minute for poll timeout).
await using var otherEnv = await Temporalio.Testing.WorkflowEnvironment.StartLocalAsync();

// Start both workflows on different servers
var taskQueue = $"tq-{Guid.NewGuid()}";
var handle1 = await Client.StartWorkflowAsync(
(TickingWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue));
var handle2 = await otherEnv.Client.StartWorkflowAsync(
(TickingWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue));

// Run the worker on the first env
await ExecuteWorkerAsync<TickingWorkflow>(
async worker =>
{
// Confirm the first ticking workflow has completed a task but not the second workflow
await AssertHasEventEventuallyAsync(handle1, e => e.WorkflowTaskCompletedEventAttributes != null);
await foreach (var evt in handle2.FetchHistoryEventsAsync())
{
Assert.Null(evt.WorkflowTaskCompletedEventAttributes);
}

// Now replace the client, which should be used fairly quickly because we should have
// timer-done poll completions every 100ms
worker.Client = otherEnv.Client;

// Now confirm the other workflow has started
await AssertHasEventEventuallyAsync(handle1, e => e.WorkflowTaskCompletedEventAttributes != null);

// Terminate both
await handle1.TerminateAsync();
await handle2.TerminateAsync();
},
new(taskQueue)
{
MaxCachedWorkflows = 0,
MaxConcurrentWorkflowTaskPolls = 1,
});
}

internal static Task AssertTaskFailureContainsEventuallyAsync(
WorkflowHandle handle, string messageContains)
{
Expand Down

0 comments on commit e771d5b

Please sign in to comment.