diff --git a/.gitignore b/.gitignore index 6cf26001..c9c9ddd0 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,6 @@ obj/ /tests/golangworker/golangworker /.vs /.vscode -/.idea \ No newline at end of file +/.idea +/.zed +Temporalio.sln.DotSettings.user diff --git a/src/Temporalio/Bridge/Cargo.lock b/src/Temporalio/Bridge/Cargo.lock index f78dd222..708cc45b 100644 --- a/src/Temporalio/Bridge/Cargo.lock +++ b/src/Temporalio/Bridge/Cargo.lock @@ -1277,9 +1277,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "jobserver" @@ -1307,9 +1307,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.164" +version = "0.2.166" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "c2ccc108bbc0b1331bd061864e7cd823c0cab660bbe6970e66e2c0614decde36" [[package]] name = "libredox" @@ -2189,9 +2189,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.18" +version = "0.23.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" +checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" dependencies = [ "log", "once_cell", @@ -2507,9 +2507,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b5ae3f4f7d64646c46c4cae4e3f01d1c5d255c7406fdd7c7f999a94e488791" +checksum = "4c33cd241af0f2e9e3b5c32163b873b29956890b5342e6745b917ce9d490f4af" dependencies = [ "core-foundation-sys", "libc", @@ -2577,6 +2577,7 @@ name = "temporal-sdk-bridge" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "cbindgen", "futures", "libc", @@ -2981,9 +2982,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -2992,9 +2993,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", @@ -3003,9 +3004,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", "valuable", diff --git a/src/Temporalio/Bridge/Cargo.toml b/src/Temporalio/Bridge/Cargo.toml index 53d715c3..7087ea7a 100644 --- a/src/Temporalio/Bridge/Cargo.toml +++ b/src/Temporalio/Bridge/Cargo.toml @@ -9,6 +9,7 @@ crate-type = ["cdylib", "rlib"] [dependencies] anyhow = "1.0" +async-trait = "0.1" futures = "0.3" libc = "0.2" log = "0.4" @@ -21,7 +22,9 @@ rand = "0.8.5" rand_pcg = "0.3.1" serde_json = "1.0" temporal-client = { version = "0.1.0", path = "./sdk-core/client" } -temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] } +temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = [ + "ephemeral-server", +] } temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" } temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" } tokio = "1.26" @@ -39,3 +42,4 @@ incremental = false [build-dependencies] cbindgen = "0.24" +anyhow = "1.0" diff --git a/src/Temporalio/Bridge/CustomMetricMeter.cs b/src/Temporalio/Bridge/CustomMetricMeter.cs index 3d5a586e..36618728 100644 --- a/src/Temporalio/Bridge/CustomMetricMeter.cs +++ b/src/Temporalio/Bridge/CustomMetricMeter.cs @@ -7,11 +7,10 @@ namespace Temporalio.Bridge /// /// Core wrapper for a custom metric meter implementation. /// - internal class CustomMetricMeter + internal class CustomMetricMeter : NativeInvokeableClass { private readonly Temporalio.Runtime.ICustomMetricMeter meter; private readonly Temporalio.Runtime.CustomMetricMeterOptions options; - private readonly List handles = new(); /// /// Initializes a new instance of the class. @@ -38,20 +37,9 @@ public unsafe CustomMetricMeter( meter_free = FunctionPointer(Free), }; - // Pin the metric meter pointer and set it as the first handle - var interopMeterHandle = GCHandle.Alloc(interopMeter, GCHandleType.Pinned); - handles.Insert(0, interopMeterHandle); - Ptr = (Interop.CustomMetricMeter*)interopMeterHandle.AddrOfPinnedObject(); - - // Add handle for ourself - handles.Add(GCHandle.Alloc(this)); + PinCallbackHolder(interopMeter); } - /// - /// Gets the pointer to the native metric meter. - /// - internal unsafe Interop.CustomMetricMeter* Ptr { get; private init; } - private static unsafe string? GetStringOrNull(Interop.ByteArrayRef bytes) => (int)bytes.size == 0 ? null : GetString(bytes); @@ -220,23 +208,5 @@ private unsafe void RecordMetricDuration(void* metric, ulong valueMs, void* attr } private unsafe void FreeAttributes(void* attributes) => GCHandle.FromIntPtr(new(attributes)).Free(); - - private unsafe void Free(Interop.CustomMetricMeter* meter) - { - // Free in order which frees function pointers first then meter handles - foreach (var handle in handles) - { - handle.Free(); - } - } - - // Similar to Scope.FunctionPointer - private IntPtr FunctionPointer(T func) - where T : Delegate - { - var handle = GCHandle.Alloc(func); - handles.Add(handle); - return Marshal.GetFunctionPointerForDelegate(handle.Target!); - } } -} \ No newline at end of file +} diff --git a/src/Temporalio/Bridge/CustomSlotSupplier.cs b/src/Temporalio/Bridge/CustomSlotSupplier.cs new file mode 100644 index 00000000..c786bfef --- /dev/null +++ b/src/Temporalio/Bridge/CustomSlotSupplier.cs @@ -0,0 +1,245 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Temporalio.Bridge +{ + /// + /// Core wrapper for a user-defined custom slot supplier. + /// + internal class CustomSlotSupplier : NativeInvokeableClass + { + private readonly ILogger logger; + private readonly Temporalio.Worker.Tuning.CustomSlotSupplier userSupplier; + private readonly Dictionary permits = new(); + private uint permitId = 1; + + /// + /// Initializes a new instance of the class. + /// + /// User's slot supplier implementation'. + /// Logger factory. + internal unsafe CustomSlotSupplier( + Temporalio.Worker.Tuning.CustomSlotSupplier userSupplier, + ILoggerFactory loggerFactory) + { + this.logger = loggerFactory.CreateLogger(); + this.userSupplier = userSupplier; + + var interopCallbacks = new Interop.CustomSlotSupplierCallbacks + { + reserve = FunctionPointer(Reserve), + cancel_reserve = FunctionPointer(CancelReserve), + try_reserve = FunctionPointer(TryReserve), + mark_used = FunctionPointer(MarkUsed), + release = FunctionPointer(Release), + free = FunctionPointer(Free), + }; + + PinCallbackHolder(interopCallbacks); + } + + private static Temporalio.Worker.Tuning.SlotInfo SlotInfoFromBridge(Interop.SlotInfo slotInfo) + { + return slotInfo.tag switch + { + Interop.SlotInfo_Tag.WorkflowSlotInfo => + new Temporalio.Worker.Tuning.SlotInfo.WorkflowSlotInfo( + ByteArrayRef.ToUtf8(slotInfo.workflow_slot_info.workflow_type), slotInfo.workflow_slot_info.is_sticky != 0), + Interop.SlotInfo_Tag.ActivitySlotInfo => + new Temporalio.Worker.Tuning.SlotInfo.ActivitySlotInfo( + ByteArrayRef.ToUtf8(slotInfo.activity_slot_info.activity_type)), + Interop.SlotInfo_Tag.LocalActivitySlotInfo => + new Temporalio.Worker.Tuning.SlotInfo.LocalActivitySlotInfo( + ByteArrayRef.ToUtf8(slotInfo.local_activity_slot_info.activity_type)), + _ => throw new System.ArgumentOutOfRangeException(nameof(slotInfo)), + }; + } + + private unsafe void Reserve(Interop.SlotReserveCtx* ctx, void* sender) + { + SafeReserve(new IntPtr(ctx), new IntPtr(sender)); + } + + // Note that this is always called by Rust, either because the call is cancelled or because + // it completed. Therefore the GCHandle is always freed. + private unsafe void CancelReserve(void* tokenSrc) + { + var handle = GCHandle.FromIntPtr(new IntPtr(tokenSrc)); + var cancelTokenSrc = (CancellationTokenSource)handle.Target!; + cancelTokenSrc.Cancel(); + handle.Free(); + } + + private void SafeReserve(IntPtr ctx, IntPtr sender) + { + _ = Task.Run(async () => + { + using (var cancelTokenSrc = new System.Threading.CancellationTokenSource()) + { + unsafe + { + var srcHandle = GCHandle.Alloc(cancelTokenSrc); + Interop.Methods.set_reserve_cancel_target( + (Interop.SlotReserveCtx*)ctx.ToPointer(), + GCHandle.ToIntPtr(srcHandle).ToPointer()); + } + while (true) + { + try + { + Task reserveTask; + unsafe + { + reserveTask = userSupplier.ReserveSlotAsync( + ReserveCtxFromBridge((Interop.SlotReserveCtx*)ctx.ToPointer()), + cancelTokenSrc.Token); + } + var permit = await reserveTask.ConfigureAwait(false); + unsafe + { + var usedPermitId = AddPermitToMap(permit); + Interop.Methods.complete_async_reserve(sender.ToPointer(), new(usedPermitId)); + } + return; + } + catch (OperationCanceledException) when (cancelTokenSrc.Token.IsCancellationRequested) + { + unsafe + { + // Always call this to ensure the sender is freed + Interop.Methods.complete_async_reserve(sender.ToPointer(), new(0)); + } + return; + } +#pragma warning disable CA1031 // We are ok catching all exceptions here + catch (Exception e) + { +#pragma warning restore CA1031 + logger.LogError(e, "Error reserving slot"); + } + // Wait for a bit to avoid spamming errors + await Task.Delay(1000, cancelTokenSrc.Token).ConfigureAwait(false); + } + } + }); + } + + private unsafe UIntPtr TryReserve(Interop.SlotReserveCtx* ctx) + { + Temporalio.Worker.Tuning.SlotPermit? maybePermit; + try + { + maybePermit = userSupplier.TryReserveSlot(ReserveCtxFromBridge(ctx)); + } +#pragma warning disable CA1031 // We are ok catching all exceptions here + catch (Exception e) + { +#pragma warning restore CA1031 + logger.LogError(e, "Error trying to reserve slot"); + return UIntPtr.Zero; + } + + if (maybePermit == null) + { + return UIntPtr.Zero; + } + var usedPermitId = AddPermitToMap(maybePermit); + return new(usedPermitId); + } + + private unsafe void MarkUsed(Interop.SlotMarkUsedCtx* ctx) + { + try + { + Temporalio.Worker.Tuning.SlotPermit permit; + lock (permits) + { + permit = permits[(*ctx).slot_permit.ToUInt32()]; + } + userSupplier.MarkSlotUsed(MarkUsedCtxFromBridge(ctx, permit)); + } +#pragma warning disable CA1031 // We are ok catching all exceptions here + catch (Exception e) + { +#pragma warning restore CA1031 + logger.LogError(e, "Error marking slot used"); + } + } + + private unsafe void Release(Interop.SlotReleaseCtx* ctx) + { + var permitId = (*ctx).slot_permit.ToUInt32(); + Temporalio.Worker.Tuning.SlotPermit permit; + lock (permits) + { + permit = permits[permitId]; + } + try + { + userSupplier.ReleaseSlot(ReleaseCtxFromBridge(ctx, permit)); + } +#pragma warning disable CA1031 // We are ok catching all exceptions here + catch (Exception e) + { +#pragma warning restore CA1031 + logger.LogError(e, "Error releasing slot"); + } + finally + { + lock (permits) + { + permits.Remove(permitId); + } + } + } + + private uint AddPermitToMap(Temporalio.Worker.Tuning.SlotPermit permit) + { + lock (permits) + { + var usedPermitId = permitId; + permits.Add(permitId, permit); + permitId += 1; + return usedPermitId; + } + } + + private unsafe Temporalio.Worker.Tuning.SlotReserveContext ReserveCtxFromBridge(Interop.SlotReserveCtx* ctx) + { + return new( + SlotType: (*ctx).slot_type switch + { + Interop.SlotKindType.WorkflowSlotKindType => Temporalio.Worker.Tuning.SlotType.Workflow, + Interop.SlotKindType.ActivitySlotKindType => Temporalio.Worker.Tuning.SlotType.Activity, + Interop.SlotKindType.LocalActivitySlotKindType => Temporalio.Worker.Tuning.SlotType.LocalActivity, + _ => throw new System.ArgumentOutOfRangeException(nameof(ctx)), + }, + TaskQueue: ByteArrayRef.ToUtf8((*ctx).task_queue), + WorkerIdentity: ByteArrayRef.ToUtf8((*ctx).worker_identity), + WorkerBuildId: ByteArrayRef.ToUtf8((*ctx).worker_build_id), + IsSticky: (*ctx).is_sticky != 0); + } + + private unsafe Temporalio.Worker.Tuning.SlotReleaseContext ReleaseCtxFromBridge( + Interop.SlotReleaseCtx* ctx, + Temporalio.Worker.Tuning.SlotPermit permit) + { + return new( + SlotInfo: (*ctx).slot_info is null ? null : SlotInfoFromBridge(*(*ctx).slot_info), + Permit: permit); + } + + private unsafe Temporalio.Worker.Tuning.SlotMarkUsedContext MarkUsedCtxFromBridge( + Interop.SlotMarkUsedCtx* ctx, + Temporalio.Worker.Tuning.SlotPermit permit) + { + return new( + SlotInfo: SlotInfoFromBridge((*ctx).slot_info), + Permit: permit); + } + } +} diff --git a/src/Temporalio/Bridge/Interop/Interop.cs b/src/Temporalio/Bridge/Interop/Interop.cs index 7e0ba6d9..a9b313aa 100644 --- a/src/Temporalio/Bridge/Interop/Interop.cs +++ b/src/Temporalio/Bridge/Interop/Interop.cs @@ -45,6 +45,13 @@ internal enum RpcService Health, } + internal enum SlotKindType + { + WorkflowSlotKindType, + ActivitySlotKindType, + LocalActivitySlotKindType, + } + internal partial struct CancellationToken { } @@ -551,17 +558,184 @@ internal partial struct ResourceBasedSlotSupplier public ResourceBasedTunerOptions tuner_options; } + internal unsafe partial struct SlotReserveCtx + { + [NativeTypeName("enum SlotKindType")] + public SlotKindType slot_type; + + [NativeTypeName("struct ByteArrayRef")] + public ByteArrayRef task_queue; + + [NativeTypeName("struct ByteArrayRef")] + public ByteArrayRef worker_identity; + + [NativeTypeName("struct ByteArrayRef")] + public ByteArrayRef worker_build_id; + + [NativeTypeName("bool")] + public byte is_sticky; + + public void* token_src; + } + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + internal unsafe delegate void CustomReserveSlotCallback([NativeTypeName("const struct SlotReserveCtx *")] SlotReserveCtx* ctx, void* sender); + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + internal unsafe delegate void CustomCancelReserveCallback(void* token_source); + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + [return: NativeTypeName("uintptr_t")] + internal unsafe delegate UIntPtr CustomTryReserveSlotCallback([NativeTypeName("const struct SlotReserveCtx *")] SlotReserveCtx* ctx); + + internal enum SlotInfo_Tag + { + WorkflowSlotInfo, + ActivitySlotInfo, + LocalActivitySlotInfo, + } + + internal partial struct WorkflowSlotInfo_Body + { + [NativeTypeName("struct ByteArrayRef")] + public ByteArrayRef workflow_type; + + [NativeTypeName("bool")] + public byte is_sticky; + } + + internal partial struct ActivitySlotInfo_Body + { + [NativeTypeName("struct ByteArrayRef")] + public ByteArrayRef activity_type; + } + + internal partial struct LocalActivitySlotInfo_Body + { + [NativeTypeName("struct ByteArrayRef")] + public ByteArrayRef activity_type; + } + + internal unsafe partial struct SlotInfo + { + public SlotInfo_Tag tag; + + [NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L425_C3")] + public _Anonymous_e__Union Anonymous; + + internal ref WorkflowSlotInfo_Body workflow_slot_info + { + get + { + fixed (_Anonymous_e__Union* pField = &Anonymous) + { + return ref pField->workflow_slot_info; + } + } + } + + internal ref ActivitySlotInfo_Body activity_slot_info + { + get + { + fixed (_Anonymous_e__Union* pField = &Anonymous) + { + return ref pField->activity_slot_info; + } + } + } + + internal ref LocalActivitySlotInfo_Body local_activity_slot_info + { + get + { + fixed (_Anonymous_e__Union* pField = &Anonymous) + { + return ref pField->local_activity_slot_info; + } + } + } + + [StructLayout(LayoutKind.Explicit)] + internal partial struct _Anonymous_e__Union + { + [FieldOffset(0)] + public WorkflowSlotInfo_Body workflow_slot_info; + + [FieldOffset(0)] + public ActivitySlotInfo_Body activity_slot_info; + + [FieldOffset(0)] + public LocalActivitySlotInfo_Body local_activity_slot_info; + } + } + + internal partial struct SlotMarkUsedCtx + { + [NativeTypeName("struct SlotInfo")] + public SlotInfo slot_info; + + [NativeTypeName("uintptr_t")] + public UIntPtr slot_permit; + } + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + internal unsafe delegate void CustomMarkSlotUsedCallback([NativeTypeName("const struct SlotMarkUsedCtx *")] SlotMarkUsedCtx* ctx); + + internal unsafe partial struct SlotReleaseCtx + { + [NativeTypeName("const struct SlotInfo *")] + public SlotInfo* slot_info; + + [NativeTypeName("uintptr_t")] + public UIntPtr slot_permit; + } + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + internal unsafe delegate void CustomReleaseSlotCallback([NativeTypeName("const struct SlotReleaseCtx *")] SlotReleaseCtx* ctx); + + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + internal unsafe delegate void CustomSlotImplFreeCallback([NativeTypeName("const struct CustomSlotSupplierCallbacks *")] CustomSlotSupplierCallbacks* userimpl); + + internal partial struct CustomSlotSupplierCallbacks + { + [NativeTypeName("CustomReserveSlotCallback")] + public IntPtr reserve; + + [NativeTypeName("CustomCancelReserveCallback")] + public IntPtr cancel_reserve; + + [NativeTypeName("CustomTryReserveSlotCallback")] + public IntPtr try_reserve; + + [NativeTypeName("CustomMarkSlotUsedCallback")] + public IntPtr mark_used; + + [NativeTypeName("CustomReleaseSlotCallback")] + public IntPtr release; + + [NativeTypeName("CustomSlotImplFreeCallback")] + public IntPtr free; + } + + internal unsafe partial struct CustomSlotSupplierCallbacksImpl + { + [NativeTypeName("const struct CustomSlotSupplierCallbacks *")] + public CustomSlotSupplierCallbacks* _0; + } + internal enum SlotSupplier_Tag { FixedSize, ResourceBased, + Custom, } internal unsafe partial struct SlotSupplier { public SlotSupplier_Tag tag; - [NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L387_C3")] + [NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L475_C3")] public _Anonymous_e__Union Anonymous; internal ref FixedSizeSlotSupplier fixed_size @@ -586,17 +760,32 @@ internal ref ResourceBasedSlotSupplier resource_based } } + internal ref CustomSlotSupplierCallbacksImpl custom + { + get + { + fixed (_Anonymous_e__Union._Anonymous3_e__Struct* pField = &Anonymous.Anonymous3) + { + return ref pField->custom; + } + } + } + [StructLayout(LayoutKind.Explicit)] internal unsafe partial struct _Anonymous_e__Union { [FieldOffset(0)] - [NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L388_C5")] + [NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L476_C5")] public _Anonymous1_e__Struct Anonymous1; [FieldOffset(0)] - [NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L391_C5")] + [NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L479_C5")] public _Anonymous2_e__Struct Anonymous2; + [FieldOffset(0)] + [NativeTypeName("__AnonymousRecord_temporal-sdk-bridge_L482_C5")] + public _Anonymous3_e__Struct Anonymous3; + internal partial struct _Anonymous1_e__Struct { [NativeTypeName("struct FixedSizeSlotSupplier")] @@ -608,6 +797,12 @@ internal partial struct _Anonymous2_e__Struct [NativeTypeName("struct ResourceBasedSlotSupplier")] public ResourceBasedSlotSupplier resource_based; } + + internal partial struct _Anonymous3_e__Struct + { + [NativeTypeName("struct CustomSlotSupplierCallbacksImpl")] + public CustomSlotSupplierCallbacksImpl custom; + } } } @@ -877,5 +1072,11 @@ internal static unsafe partial class Methods [DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] [return: NativeTypeName("struct WorkerReplayPushResult")] public static extern WorkerReplayPushResult worker_replay_push([NativeTypeName("struct Worker *")] Worker* worker, [NativeTypeName("struct WorkerReplayPusher *")] WorkerReplayPusher* worker_replay_pusher, [NativeTypeName("struct ByteArrayRef")] ByteArrayRef workflow_id, [NativeTypeName("struct ByteArrayRef")] ByteArrayRef history); + + [DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] + public static extern void complete_async_reserve(void* sender, [NativeTypeName("uintptr_t")] UIntPtr permit_id); + + [DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)] + public static extern void set_reserve_cancel_target([NativeTypeName("struct SlotReserveCtx *")] SlotReserveCtx* ctx, void* token_ptr); } } diff --git a/src/Temporalio/Bridge/NativeInvokeableClass.cs b/src/Temporalio/Bridge/NativeInvokeableClass.cs new file mode 100644 index 00000000..58012e0a --- /dev/null +++ b/src/Temporalio/Bridge/NativeInvokeableClass.cs @@ -0,0 +1,66 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; + +namespace Temporalio.Bridge +{ + /// + /// Extend this class to help with making a class that has callbacks which are invoked by Rust. + /// + /// The native type that holds the function ptrs for callbacks to C#. + internal abstract class NativeInvokeableClass + where T : unmanaged + { + private readonly List handles = new(); + + /// + /// Gets the pointer to the native callback holder. + /// + internal unsafe T* Ptr { get; private set; } + + /// + /// Pin the native type in memory and add it to the handle list. Call this after adding + /// the callbacks via . Also adds `this` to the handle list. + /// + /// The native type to pin. + private protected void PinCallbackHolder(T value) + { + // Pin the callback holder & set it as the first handle + var holderHandle = GCHandle.Alloc(value, GCHandleType.Pinned); + handles.Insert(0, holderHandle); + unsafe + { + Ptr = (T*)holderHandle.AddrOfPinnedObject(); + } + // Add handle for ourself + handles.Add(GCHandle.Alloc(this)); + } + + /// + /// Make a handle for and return a C# method as a callback for Rust to invoke. + /// + /// The native type of the function pointer. + /// The C# method to use for the callback. + /// The function pointer to the C# method. + private protected IntPtr FunctionPointer(TF func) + where TF : Delegate + { + var handle = GCHandle.Alloc(func); + handles.Add(handle); + return Marshal.GetFunctionPointerForDelegate(handle.Target!); + } + + /// + /// Free the memory of the native type and all the function pointers. + /// + /// The native type to free. + private protected unsafe void Free(T* ptr) + { + // Free in order which frees function pointers first then object handles + foreach (var handle in handles) + { + handle.Free(); + } + } + } +} diff --git a/src/Temporalio/Bridge/OptionsExtensions.cs b/src/Temporalio/Bridge/OptionsExtensions.cs index ca1ce02e..80c09ef8 100644 --- a/src/Temporalio/Bridge/OptionsExtensions.cs +++ b/src/Temporalio/Bridge/OptionsExtensions.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Reflection; +using Microsoft.Extensions.Logging; using Temporalio.Bridge.Interop; using Temporalio.Exceptions; @@ -408,11 +409,13 @@ public static Interop.TestServerOptions ToInteropOptions( /// Options to convert. /// Scope to use. /// Namespace for the worker. + /// Logger factory. /// Converted options. public static Interop.WorkerOptions ToInteropOptions( this Temporalio.Worker.TemporalWorkerOptions options, Scope scope, - string namespace_) + string namespace_, + ILoggerFactory loggerFactory) { if (options.TaskQueue == null) { @@ -458,7 +461,7 @@ public static Interop.WorkerOptions ToInteropOptions( build_id = scope.ByteArray(buildId), identity_override = scope.ByteArray(options.Identity), max_cached_workflows = (uint)options.MaxCachedWorkflows, - tuner = tuner.ToInteropTuner(scope), + tuner = tuner.ToInteropTuner(scope, loggerFactory), no_remote_activities = (byte)(noRemoteActivities ? 1 : 0), sticky_queue_schedule_to_start_timeout_millis = (ulong)options.StickyQueueScheduleToStartTimeout.TotalMilliseconds, @@ -504,7 +507,7 @@ public static Interop.WorkerOptions ToInteropOptions( build_id = scope.ByteArray(buildId), identity_override = scope.ByteArray(options.Identity), max_cached_workflows = 2, - tuner = Temporalio.Worker.Tuning.WorkerTuner.CreateFixedSize(2, 1, 1).ToInteropTuner(scope), + tuner = Temporalio.Worker.Tuning.WorkerTuner.CreateFixedSize(2, 1, 1).ToInteropTuner(scope, options.LoggerFactory), no_remote_activities = 1, sticky_queue_schedule_to_start_timeout_millis = 1000, max_heartbeat_throttle_interval_millis = 1000, @@ -524,10 +527,11 @@ public static Interop.WorkerOptions ToInteropOptions( private static Interop.TunerHolder ToInteropTuner( this Temporalio.Worker.Tuning.WorkerTuner tuner, - Scope scope) + Scope scope, + ILoggerFactory loggerFactory) { Temporalio.Worker.Tuning.ResourceBasedTunerOptions? lastTunerOptions = null; - Temporalio.Worker.Tuning.ISlotSupplier[] suppliers = + Temporalio.Worker.Tuning.SlotSupplier[] suppliers = { tuner.WorkflowTaskSlotSupplier, tuner.ActivityTaskSlotSupplier, tuner.LocalActivitySlotSupplier, @@ -549,17 +553,18 @@ private static Interop.TunerHolder ToInteropTuner( return new() { workflow_slot_supplier = - tuner.WorkflowTaskSlotSupplier.ToInteropSlotSupplier(true), + tuner.WorkflowTaskSlotSupplier.ToInteropSlotSupplier(true, loggerFactory), activity_slot_supplier = - tuner.ActivityTaskSlotSupplier.ToInteropSlotSupplier(false), + tuner.ActivityTaskSlotSupplier.ToInteropSlotSupplier(false, loggerFactory), local_activity_slot_supplier = - tuner.LocalActivitySlotSupplier.ToInteropSlotSupplier(false), + tuner.LocalActivitySlotSupplier.ToInteropSlotSupplier(false, loggerFactory), }; } private static Interop.SlotSupplier ToInteropSlotSupplier( - this Temporalio.Worker.Tuning.ISlotSupplier supplier, - bool isWorkflow) + this Temporalio.Worker.Tuning.SlotSupplier supplier, + bool isWorkflow, + ILoggerFactory loggerFactory) { if (supplier is Temporalio.Worker.Tuning.FixedSizeSlotSupplier fixedSize) { @@ -602,10 +607,21 @@ private static Interop.SlotSupplier ToInteropSlotSupplier( }, }; } + else if (supplier is Temporalio.Worker.Tuning.CustomSlotSupplier custom) + { + var wrapped = new CustomSlotSupplier(custom, loggerFactory); + unsafe + { + return new() + { + tag = Interop.SlotSupplier_Tag.Custom, + custom = new Interop.CustomSlotSupplierCallbacksImpl() { _0 = wrapped.Ptr }, + }; + } + } else { - throw new ArgumentException( - "ISlotSupplier must be one of the types provided by the library"); + throw new ArgumentException("Unknown slot supplier type"); } } diff --git a/src/Temporalio/Bridge/Worker.cs b/src/Temporalio/Bridge/Worker.cs index 85e4625b..8733a59f 100644 --- a/src/Temporalio/Bridge/Worker.cs +++ b/src/Temporalio/Bridge/Worker.cs @@ -2,6 +2,7 @@ using System.Runtime.InteropServices; using System.Threading.Tasks; using Google.Protobuf; +using Microsoft.Extensions.Logging; namespace Temporalio.Bridge { @@ -16,11 +17,16 @@ internal class Worker : SafeHandle /// Client for the worker. /// Namespace for the worker. /// Options for the worker. + /// Logger factory, used instead of the one in options by + /// anything in the bridge that needs it, since it's guaranteed to be set. /// /// If any of the options are invalid including improperly defined workflows/activities. /// public Worker( - Client client, string namespace_, Temporalio.Worker.TemporalWorkerOptions options) + Client client, + string namespace_, + Temporalio.Worker.TemporalWorkerOptions options, + ILoggerFactory loggerFactory) : base(IntPtr.Zero, true) { Runtime = client.Runtime; @@ -29,7 +35,8 @@ public Worker( unsafe { var workerOrFail = Interop.Methods.worker_new( - client.Ptr, scope.Pointer(options.ToInteropOptions(scope, namespace_))); + client.Ptr, + scope.Pointer(options.ToInteropOptions(scope, namespace_, loggerFactory))); if (workerOrFail.fail != null) { string failStr; @@ -375,4 +382,4 @@ protected override unsafe bool ReleaseHandle() return true; } } -} \ No newline at end of file +} diff --git a/src/Temporalio/Bridge/include/temporal-sdk-bridge.h b/src/Temporalio/Bridge/include/temporal-sdk-bridge.h index d5c8c575..6b00a7dd 100644 --- a/src/Temporalio/Bridge/include/temporal-sdk-bridge.h +++ b/src/Temporalio/Bridge/include/temporal-sdk-bridge.h @@ -42,6 +42,12 @@ typedef enum RpcService { Health, } RpcService; +typedef enum SlotKindType { + WorkflowSlotKindType, + ActivitySlotKindType, + LocalActivitySlotKindType, +} SlotKindType; + typedef struct CancellationToken CancellationToken; typedef struct Client Client; @@ -377,9 +383,91 @@ typedef struct ResourceBasedSlotSupplier { struct ResourceBasedTunerOptions tuner_options; } ResourceBasedSlotSupplier; +typedef struct SlotReserveCtx { + enum SlotKindType slot_type; + struct ByteArrayRef task_queue; + struct ByteArrayRef worker_identity; + struct ByteArrayRef worker_build_id; + bool is_sticky; + void *token_src; +} SlotReserveCtx; + +typedef void (*CustomReserveSlotCallback)(const struct SlotReserveCtx *ctx, void *sender); + +typedef void (*CustomCancelReserveCallback)(void *token_source); + +/** + * Must return C#-tracked id for the permit. A zero value means no permit was reserved. + */ +typedef uintptr_t (*CustomTryReserveSlotCallback)(const struct SlotReserveCtx *ctx); + +typedef enum SlotInfo_Tag { + WorkflowSlotInfo, + ActivitySlotInfo, + LocalActivitySlotInfo, +} SlotInfo_Tag; + +typedef struct WorkflowSlotInfo_Body { + struct ByteArrayRef workflow_type; + bool is_sticky; +} WorkflowSlotInfo_Body; + +typedef struct ActivitySlotInfo_Body { + struct ByteArrayRef activity_type; +} ActivitySlotInfo_Body; + +typedef struct LocalActivitySlotInfo_Body { + struct ByteArrayRef activity_type; +} LocalActivitySlotInfo_Body; + +typedef struct SlotInfo { + SlotInfo_Tag tag; + union { + WorkflowSlotInfo_Body workflow_slot_info; + ActivitySlotInfo_Body activity_slot_info; + LocalActivitySlotInfo_Body local_activity_slot_info; + }; +} SlotInfo; + +typedef struct SlotMarkUsedCtx { + struct SlotInfo slot_info; + /** + * C# id for the slot permit. + */ + uintptr_t slot_permit; +} SlotMarkUsedCtx; + +typedef void (*CustomMarkSlotUsedCallback)(const struct SlotMarkUsedCtx *ctx); + +typedef struct SlotReleaseCtx { + const struct SlotInfo *slot_info; + /** + * C# id for the slot permit. + */ + uintptr_t slot_permit; +} SlotReleaseCtx; + +typedef void (*CustomReleaseSlotCallback)(const struct SlotReleaseCtx *ctx); + +typedef void (*CustomSlotImplFreeCallback)(const struct CustomSlotSupplierCallbacks *userimpl); + +typedef struct CustomSlotSupplierCallbacks { + CustomReserveSlotCallback reserve; + CustomCancelReserveCallback cancel_reserve; + CustomTryReserveSlotCallback try_reserve; + CustomMarkSlotUsedCallback mark_used; + CustomReleaseSlotCallback release; + CustomSlotImplFreeCallback free; +} CustomSlotSupplierCallbacks; + +typedef struct CustomSlotSupplierCallbacksImpl { + const struct CustomSlotSupplierCallbacks *_0; +} CustomSlotSupplierCallbacksImpl; + typedef enum SlotSupplier_Tag { FixedSize, ResourceBased, + Custom, } SlotSupplier_Tag; typedef struct SlotSupplier { @@ -391,6 +479,9 @@ typedef struct SlotSupplier { struct { struct ResourceBasedSlotSupplier resource_based; }; + struct { + struct CustomSlotSupplierCallbacksImpl custom; + }; }; } SlotSupplier; @@ -608,6 +699,10 @@ struct WorkerReplayPushResult worker_replay_push(struct Worker *worker, struct ByteArrayRef workflow_id, struct ByteArrayRef history); +void complete_async_reserve(void *sender, uintptr_t permit_id); + +void set_reserve_cancel_target(struct SlotReserveCtx *ctx, void *token_ptr); + #ifdef __cplusplus } // extern "C" #endif // __cplusplus diff --git a/src/Temporalio/Bridge/src/lib.rs b/src/Temporalio/Bridge/src/lib.rs index d3dc7519..69d813fc 100644 --- a/src/Temporalio/Bridge/src/lib.rs +++ b/src/Temporalio/Bridge/src/lib.rs @@ -31,13 +31,6 @@ impl ByteArrayRef { } } - fn from_string(s: &String) -> ByteArrayRef { - ByteArrayRef { - data: s.as_ptr(), - size: s.len(), - } - } - fn to_slice(&self) -> &[u8] { unsafe { std::slice::from_raw_parts(self.data, self.size) } } @@ -105,6 +98,18 @@ impl ByteArrayRef { } } +impl From<&str> for ByteArrayRef { + fn from(s: &str) -> ByteArrayRef { + ByteArrayRef::from_str(s) + } +} + +impl From for ByteArrayRef { + fn from(s: String) -> ByteArrayRef { + ByteArrayRef::from_str(&s) + } +} + #[repr(C)] pub struct ByteArrayRefArray { data: *const ByteArrayRef, diff --git a/src/Temporalio/Bridge/src/metric.rs b/src/Temporalio/Bridge/src/metric.rs index c211cb09..7ceb7510 100644 --- a/src/Temporalio/Bridge/src/metric.rs +++ b/src/Temporalio/Bridge/src/metric.rs @@ -417,7 +417,7 @@ impl CustomMetricMeterRef { ), }; CustomMetricAttribute { - key: ByteArrayRef::from_string(&kv.key), + key: ByteArrayRef::from_str(&kv.key), value, value_type, } diff --git a/src/Temporalio/Bridge/src/runtime.rs b/src/Temporalio/Bridge/src/runtime.rs index 40b80cfb..1ab91f72 100644 --- a/src/Temporalio/Bridge/src/runtime.rs +++ b/src/Temporalio/Bridge/src/runtime.rs @@ -316,13 +316,13 @@ impl fmt::Debug for LogForwarder { #[no_mangle] pub extern "C" fn forwarded_log_target(log: *const ForwardedLog) -> ByteArrayRef { let log = unsafe { &*log }; - ByteArrayRef::from_string(&log.core.target) + ByteArrayRef::from_str(&log.core.target) } #[no_mangle] pub extern "C" fn forwarded_log_message(log: *const ForwardedLog) -> ByteArrayRef { let log = unsafe { &*log }; - ByteArrayRef::from_string(&log.core.message) + ByteArrayRef::from_str(&log.core.message) } #[no_mangle] diff --git a/src/Temporalio/Bridge/src/worker.rs b/src/Temporalio/Bridge/src/worker.rs index 99278002..470e3011 100644 --- a/src/Temporalio/Bridge/src/worker.rs +++ b/src/Temporalio/Bridge/src/worker.rs @@ -12,13 +12,19 @@ use temporal_sdk_core::WorkerConfigBuilder; use temporal_sdk_core_api::errors::PollActivityError; use temporal_sdk_core_api::errors::PollWfError; use temporal_sdk_core_api::errors::WorkflowErrorType; +use temporal_sdk_core_api::worker::SlotInfoTrait; use temporal_sdk_core_api::worker::SlotKind; +use temporal_sdk_core_api::worker::SlotMarkUsedContext; +use temporal_sdk_core_api::worker::SlotReleaseContext; +use temporal_sdk_core_api::worker::SlotReservationContext; +use temporal_sdk_core_api::worker::SlotSupplierPermit; use temporal_sdk_core_api::Worker as CoreWorker; use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; use temporal_sdk_core_protos::coresdk::ActivityHeartbeat; use temporal_sdk_core_protos::coresdk::ActivityTaskCompletion; use temporal_sdk_core_protos::temporal::api::history::v1::History; use tokio::sync::mpsc::{channel, Sender}; +use tokio::sync::oneshot; use tokio_stream::wrappers::ReceiverStream; use std::collections::HashMap; @@ -57,20 +63,21 @@ pub struct TunerHolder { } #[repr(C)] -#[derive(Clone, Copy, PartialEq)] +#[derive(Clone, Copy)] pub enum SlotSupplier { FixedSize(FixedSizeSlotSupplier), ResourceBased(ResourceBasedSlotSupplier), + Custom(CustomSlotSupplierCallbacksImpl), } #[repr(C)] -#[derive(Clone, Copy, PartialEq)] +#[derive(Clone, Copy)] pub struct FixedSizeSlotSupplier { num_slots: usize, } #[repr(C)] -#[derive(Clone, Copy, PartialEq)] +#[derive(Clone, Copy)] pub struct ResourceBasedSlotSupplier { minimum_slots: usize, maximum_slots: usize, @@ -78,6 +85,220 @@ pub struct ResourceBasedSlotSupplier { tuner_options: ResourceBasedTunerOptions, } +#[repr(C)] +pub struct CustomSlotSupplier { + inner: CustomSlotSupplierCallbacksImpl, + _pd: std::marker::PhantomData, +} + +unsafe impl Send for CustomSlotSupplier {} +unsafe impl Sync for CustomSlotSupplier {} + +type CustomReserveSlotCallback = + unsafe extern "C" fn(ctx: *const SlotReserveCtx, sender: *mut libc::c_void); +type CustomCancelReserveCallback = unsafe extern "C" fn(token_source: *mut libc::c_void); +/// Must return C#-tracked id for the permit. A zero value means no permit was reserved. +type CustomTryReserveSlotCallback = unsafe extern "C" fn(ctx: *const SlotReserveCtx) -> usize; +type CustomMarkSlotUsedCallback = unsafe extern "C" fn(ctx: *const SlotMarkUsedCtx); +type CustomReleaseSlotCallback = unsafe extern "C" fn(ctx: *const SlotReleaseCtx); +type CustomSlotImplFreeCallback = + unsafe extern "C" fn(userimpl: *const CustomSlotSupplierCallbacks); + +#[repr(C)] +#[derive(Clone, Copy)] +pub struct CustomSlotSupplierCallbacksImpl(*const CustomSlotSupplierCallbacks); + +#[repr(C)] +pub struct CustomSlotSupplierCallbacks { + reserve: CustomReserveSlotCallback, + cancel_reserve: CustomCancelReserveCallback, + try_reserve: CustomTryReserveSlotCallback, + mark_used: CustomMarkSlotUsedCallback, + release: CustomReleaseSlotCallback, + free: CustomSlotImplFreeCallback, +} + +impl CustomSlotSupplierCallbacksImpl { + fn into_ss( + self, + ) -> Arc + Send + Sync + 'static> + { + Arc::new(CustomSlotSupplier { + inner: self, + _pd: Default::default(), + }) + } +} +impl Drop for CustomSlotSupplierCallbacks { + fn drop(&mut self) { + unsafe { + (self.free)(&*self); + } + } +} + +#[repr(C)] +pub enum SlotKindType { + WorkflowSlotKindType, + ActivitySlotKindType, + LocalActivitySlotKindType, +} + +#[repr(C)] +pub struct SlotReserveCtx { + slot_type: SlotKindType, + task_queue: ByteArrayRef, + worker_identity: ByteArrayRef, + worker_build_id: ByteArrayRef, + is_sticky: bool, + // The C# side will store a pointer here to the cancellation token source + token_src: *mut libc::c_void, +} +unsafe impl Send for SlotReserveCtx {} + +#[repr(C)] +pub enum SlotInfo { + WorkflowSlotInfo { + workflow_type: ByteArrayRef, + is_sticky: bool, + }, + ActivitySlotInfo { + activity_type: ByteArrayRef, + }, + LocalActivitySlotInfo { + activity_type: ByteArrayRef, + }, +} + +#[repr(C)] +pub struct SlotMarkUsedCtx { + slot_info: SlotInfo, + /// C# id for the slot permit. + slot_permit: usize, +} + +#[repr(C)] +pub struct SlotReleaseCtx { + slot_info: *const SlotInfo, + /// C# id for the slot permit. + slot_permit: usize, +} + +struct CancelReserveGuard { + token_src: *mut libc::c_void, + callback: CustomCancelReserveCallback, +} +impl Drop for CancelReserveGuard { + fn drop(&mut self) { + if !self.token_src.is_null() { + unsafe { + (self.callback)(self.token_src); + } + } + } +} +unsafe impl Send for CancelReserveGuard {} + +#[async_trait::async_trait] +impl temporal_sdk_core_api::worker::SlotSupplier + for CustomSlotSupplier +{ + type SlotKind = SK; + + async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit { + let (tx, rx) = oneshot::channel(); + let ctx = Self::convert_reserve_ctx(ctx); + let tx = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + unsafe { + let _drop_guard = CancelReserveGuard { + token_src: ctx.token_src, + callback: (*self.inner.0).cancel_reserve, + }; + ((*self.inner.0).reserve)(&ctx, tx); + rx.await.expect("reserve channel is not closed") + } + } + + fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option { + let ctx = Self::convert_reserve_ctx(ctx); + let permit_id = unsafe { ((*self.inner.0).try_reserve)(&ctx) }; + if permit_id == 0 { + None + } else { + Some(SlotSupplierPermit::with_user_data(permit_id)) + } + } + + fn mark_slot_used(&self, ctx: &dyn SlotMarkUsedContext) { + let ctx = SlotMarkUsedCtx { + slot_info: Self::convert_slot_info(ctx.info().downcast()), + slot_permit: ctx.permit().user_data::().map(|u| *u).unwrap_or(0), + }; + unsafe { + ((*self.inner.0).mark_used)(&ctx); + } + } + + fn release_slot(&self, ctx: &dyn SlotReleaseContext) { + let mut info_ptr = std::ptr::null(); + let converted_slot_info = ctx.info().map(|i| Self::convert_slot_info(i.downcast())); + if let Some(ref converted) = converted_slot_info { + info_ptr = converted; + } + let ctx = SlotReleaseCtx { + slot_info: info_ptr, + slot_permit: ctx.permit().user_data::().map(|u| *u).unwrap_or(0), + }; + unsafe { + ((*self.inner.0).release)(&ctx); + } + } + + fn available_slots(&self) -> Option { + None + } +} + +impl CustomSlotSupplier { + fn convert_reserve_ctx(ctx: &dyn SlotReservationContext) -> SlotReserveCtx { + SlotReserveCtx { + slot_type: match SK::kind() { + temporal_sdk_core_api::worker::SlotKindType::Workflow => { + SlotKindType::WorkflowSlotKindType + } + temporal_sdk_core_api::worker::SlotKindType::Activity => { + SlotKindType::ActivitySlotKindType + } + temporal_sdk_core_api::worker::SlotKindType::LocalActivity => { + SlotKindType::LocalActivitySlotKindType + } + }, + task_queue: ctx.task_queue().into(), + worker_identity: ctx.worker_identity().into(), + worker_build_id: ctx.worker_build_id().into(), + is_sticky: ctx.is_sticky(), + token_src: std::ptr::null_mut(), + } + } + + fn convert_slot_info(info: temporal_sdk_core_api::worker::SlotInfo) -> SlotInfo { + match info { + temporal_sdk_core_api::worker::SlotInfo::Workflow(w) => SlotInfo::WorkflowSlotInfo { + workflow_type: w.workflow_type.as_str().into(), + is_sticky: w.is_sticky, + }, + temporal_sdk_core_api::worker::SlotInfo::Activity(a) => SlotInfo::ActivitySlotInfo { + activity_type: a.activity_type.as_str().into(), + }, + temporal_sdk_core_api::worker::SlotInfo::LocalActivity(a) => { + SlotInfo::LocalActivitySlotInfo { + activity_type: a.activity_type.as_str().into(), + } + } + } + } +} + #[repr(C)] #[derive(Clone, Copy, PartialEq, Debug)] pub struct ResourceBasedTunerOptions { @@ -538,6 +759,29 @@ pub extern "C" fn worker_replay_push( } } +#[no_mangle] +pub extern "C" fn complete_async_reserve(sender: *mut libc::c_void, permit_id: usize) { + if !sender.is_null() { + unsafe { + let sender = Box::from_raw(sender as *mut oneshot::Sender); + let permit = SlotSupplierPermit::with_user_data(permit_id); + let _ = sender.send(permit); + } + } else { + panic!("ReserveSlot sender must not be null!"); + } +} + +#[no_mangle] +pub unsafe extern "C" fn set_reserve_cancel_target( + ctx: *mut SlotReserveCtx, + token_ptr: *mut libc::c_void, +) { + if let Some(ctx) = ctx.as_mut() { + ctx.token_src = token_ptr; + } +} + impl TryFrom<&WorkerOptions> for temporal_sdk_core::WorkerConfig { type Error = anyhow::Error; @@ -663,7 +907,9 @@ impl TryFrom<&TunerHolder> for temporal_sdk_core::TunerHolder { } } -impl TryFrom for temporal_sdk_core::SlotSupplierOptions { +impl TryFrom + for temporal_sdk_core::SlotSupplierOptions +{ type Error = anyhow::Error; fn try_from( @@ -682,6 +928,9 @@ impl TryFrom for temporal_sdk_core::SlotSupplierOpti ), ) } + SlotSupplier::Custom(cs) => { + temporal_sdk_core::SlotSupplierOptions::Custom(cs.into_ss()) + } }) } } diff --git a/src/Temporalio/Worker/TemporalWorker.cs b/src/Temporalio/Worker/TemporalWorker.cs index 715a1de4..ac03c890 100644 --- a/src/Temporalio/Worker/TemporalWorker.cs +++ b/src/Temporalio/Worker/TemporalWorker.cs @@ -34,6 +34,7 @@ public class TemporalWorker : IDisposable public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) { this.client = client; + var loggerFactory = options.LoggerFactory ?? client.Options.LoggerFactory; // Clone the options to discourage mutation (but we aren't completely disabling mutation // on the Options field herein). Options = (TemporalWorkerOptions)options.Clone(); @@ -42,7 +43,8 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) BridgeWorker = new( (Bridge.Client)bridgeClient, client.Options.Namespace, - options); + options, + loggerFactory); if (options.Activities.Count == 0 && options.Workflows.Count == 0) { throw new ArgumentException("Must have at least one workflow and/or activity"); @@ -81,7 +83,7 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) Workflows: options.Workflows, DataConverter: client.Options.DataConverter, Interceptors: Interceptors, - LoggerFactory: options.LoggerFactory ?? client.Options.LoggerFactory, + LoggerFactory: loggerFactory, WorkflowInstanceFactory: options.WorkflowInstanceFactory, DebugMode: options.DebugMode, DisableWorkflowTracingEventListener: options.DisableWorkflowTracingEventListener, @@ -400,4 +402,4 @@ private async Task ExecuteInternalAsync( } } } -} \ No newline at end of file +} diff --git a/src/Temporalio/Worker/Tuning/CustomSlotSupplier.cs b/src/Temporalio/Worker/Tuning/CustomSlotSupplier.cs new file mode 100644 index 00000000..8ff8cf48 --- /dev/null +++ b/src/Temporalio/Worker/Tuning/CustomSlotSupplier.cs @@ -0,0 +1,78 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Temporalio.Worker.Tuning +{ + /// + /// This class can be implemented to provide custom slot supplier behavior. + /// + /// + /// WARNING: Custom slot suppliers are currently experimental. + /// + public abstract class CustomSlotSupplier : SlotSupplier + { + /// + /// This function is called before polling for new tasks. Your implementation must block + /// until a slot is available then return a permit to use that slot. + /// The only acceptable exception to throw is , as + /// invocations of this method may be cancelled. Any other exceptions thrown will be logged + /// and ignored. If an exception is thrown, this method will be called again after a one + /// second wait, since it must block until a permit is returned. + /// + /// + /// This method will be called concurrently from multiple threads, so it must be thread-safe. + /// + /// The context for slot reservation. + /// A cancellation token that the SDK may use + /// to cancel the operation. + /// A permit to use the slot which may be populated with your own data. + /// Cancellation requested. + public abstract Task ReserveSlotAsync(SlotReserveContext ctx, CancellationToken cancellationToken); + + /// + /// This function is called when trying to reserve slots for "eager" workflow and activity tasks. + /// Eager tasks are those which are returned as a result of completing a workflow task, rather than + /// from polling. Your implementation must not block, and if a slot is available, return a permit + /// to use that slot. + /// + /// + /// This method will be called concurrently from multiple threads, so it must be thread-safe. + /// + /// + /// Any exceptions thrown will be logged and ignored. + /// + /// The context for slot reservation. + /// Maybe a permit to use the slot which may be populated with your own data. + public abstract SlotPermit? TryReserveSlot(SlotReserveContext ctx); + + /// + /// This function is called once a slot is actually being used to process some task, which may be + /// some time after the slot was reserved originally. For example, if there is no work for a + /// worker, a number of slots equal to the number of active pollers may already be reserved, but + /// none of them are being used yet. This call should be non-blocking. + /// + /// + /// This method will be called concurrently from multiple threads, so it must be thread-safe. + /// + /// + /// Any exceptions thrown will be logged and ignored. + /// + /// The context for marking a slot as used. + public abstract void MarkSlotUsed(SlotMarkUsedContext ctx); + + /// + /// This function is called once a permit is no longer needed. This could be because the task has + /// finished, whether successfully or not, or because the slot was no longer needed (ex: the number + /// of active pollers decreased). This call should be non-blocking. + /// + /// + /// This method will be called concurrently from multiple threads, so it must be thread-safe. + /// + /// + /// Any exceptions thrown will be logged and ignored. + /// + /// The context for releasing a slot. + public abstract void ReleaseSlot(SlotReleaseContext ctx); + } +} diff --git a/src/Temporalio/Worker/Tuning/FixedSizeSlotSupplier.cs b/src/Temporalio/Worker/Tuning/FixedSizeSlotSupplier.cs index 008681fb..bc33051d 100644 --- a/src/Temporalio/Worker/Tuning/FixedSizeSlotSupplier.cs +++ b/src/Temporalio/Worker/Tuning/FixedSizeSlotSupplier.cs @@ -3,6 +3,20 @@ namespace Temporalio.Worker.Tuning /// /// A slot supplier that will only ever issue at most a fixed number of slots. /// - /// The maximum number of slots that will ever be issued. - public sealed record FixedSizeSlotSupplier(int SlotCount) : ISlotSupplier; -} \ No newline at end of file + public sealed class FixedSizeSlotSupplier : SlotSupplier + { + /// + /// Initializes a new instance of the class. + /// + /// The maximum number of slots that will ever be issued. + public FixedSizeSlotSupplier(int slotCount) + { + SlotCount = slotCount; + } + + /// + /// Gets the maximum number of slots that will ever be issued. + /// + public int SlotCount { get; } + } +} diff --git a/src/Temporalio/Worker/Tuning/ISlotSupplier.cs b/src/Temporalio/Worker/Tuning/ISlotSupplier.cs deleted file mode 100644 index d0ab1f47..00000000 --- a/src/Temporalio/Worker/Tuning/ISlotSupplier.cs +++ /dev/null @@ -1,15 +0,0 @@ -#pragma warning disable CA1040 // We are ok with an empty interface here - -namespace Temporalio.Worker.Tuning -{ - /// - /// Slot suppliers control how slots are handed out for workflow and activity tasks as well as - /// local activities when used in conjunction with a . - /// - /// Currently you cannot implement your own slot supplier, but you can use the provided and slot suppliers. - /// - public interface ISlotSupplier - { - } -} \ No newline at end of file diff --git a/src/Temporalio/Worker/Tuning/IWorkerTuner.cs b/src/Temporalio/Worker/Tuning/IWorkerTuner.cs index 16f81bd0..673715a9 100644 --- a/src/Temporalio/Worker/Tuning/IWorkerTuner.cs +++ b/src/Temporalio/Worker/Tuning/IWorkerTuner.cs @@ -9,18 +9,18 @@ public interface IWorkerTuner /// Gets a slot supplier for workflow tasks. /// /// A slot supplier for workflow tasks. - ISlotSupplier WorkflowTaskSlotSupplier { get; } + SlotSupplier WorkflowTaskSlotSupplier { get; } /// /// Gets a slot supplier for activity tasks. /// /// A slot supplier for activity tasks. - ISlotSupplier ActivityTaskSlotSupplier { get; } + SlotSupplier ActivityTaskSlotSupplier { get; } /// /// Gets a slot supplier for local activities. /// /// A slot supplier for local activities. - ISlotSupplier LocalActivitySlotSupplier { get; } + SlotSupplier LocalActivitySlotSupplier { get; } } -} \ No newline at end of file +} diff --git a/src/Temporalio/Worker/Tuning/ResourceBasedSlotSupplier.cs b/src/Temporalio/Worker/Tuning/ResourceBasedSlotSupplier.cs index 39e10f1e..9cb71a15 100644 --- a/src/Temporalio/Worker/Tuning/ResourceBasedSlotSupplier.cs +++ b/src/Temporalio/Worker/Tuning/ResourceBasedSlotSupplier.cs @@ -3,13 +3,31 @@ namespace Temporalio.Worker.Tuning /// /// A slot supplier that will dynamically adjust the number of slots based on resource usage. /// - /// Options specific to the slot type this supplier is used for. - /// Options for the tuner that will be used to adjust the number of - /// slots. All resource-based slot suppliers must use the same tuner options. /// /// WARNING: Resource based tuning is currently experimental. /// - public sealed record ResourceBasedSlotSupplier( - ResourceBasedSlotSupplierOptions Options, - ResourceBasedTunerOptions TunerOptions) : ISlotSupplier; -} \ No newline at end of file + public sealed class ResourceBasedSlotSupplier : SlotSupplier + { + /// + /// Initializes a new instance of the class. + /// + /// Options specific to the slot type this supplier is used for. + /// Options for the tuner that will be used to adjust the number of slots. + public ResourceBasedSlotSupplier(ResourceBasedSlotSupplierOptions options, ResourceBasedTunerOptions tunerOptions) + { + Options = options; + TunerOptions = tunerOptions; + } + + /// + /// Gets Options specific to the slot type this supplier is used for. + /// + public ResourceBasedSlotSupplierOptions Options { get; } + + /// + /// Gets Options for the tuner that will be used to adjust the number of slots. + /// All resource-based slot suppliers must use the same tuner options. + /// + public ResourceBasedTunerOptions TunerOptions { get; } + } +} diff --git a/src/Temporalio/Worker/Tuning/SlotInfo.cs b/src/Temporalio/Worker/Tuning/SlotInfo.cs new file mode 100644 index 00000000..7a0848b5 --- /dev/null +++ b/src/Temporalio/Worker/Tuning/SlotInfo.cs @@ -0,0 +1,28 @@ +namespace Temporalio.Worker.Tuning +{ + /// + /// Info about a task slot usage. + /// + /// + /// WARNING: Custom slot suppliers are currently experimental. + /// + public record SlotInfo + { + private SlotInfo() + { + } + + /// + /// Info about a workflow task slot usage. + /// + public record WorkflowSlotInfo(string WorkflowType, bool IsSticky) : SlotInfo(); + /// + /// Info about an activity task slot usage. + /// + public record ActivitySlotInfo(string ActivityType) : SlotInfo(); + /// + /// Info about a local activity task slot usage. + /// + public record LocalActivitySlotInfo(string ActivityType) : SlotInfo(); + } +} diff --git a/src/Temporalio/Worker/Tuning/SlotMarkUsedContext.cs b/src/Temporalio/Worker/Tuning/SlotMarkUsedContext.cs new file mode 100644 index 00000000..59f134fc --- /dev/null +++ b/src/Temporalio/Worker/Tuning/SlotMarkUsedContext.cs @@ -0,0 +1,16 @@ +namespace Temporalio.Worker.Tuning +{ + /// + /// Context for marking a slot used from a . + /// + /// Info about the task that will be using the slot. + /// The permit that was issued when the slot was reserved. + /// + /// WARNING: Custom slot suppliers are currently experimental. + /// + /// + /// WARNING: This constructor may have required properties added. Do not rely on the exact + /// constructor, only use "with" clauses. + /// + public record SlotMarkUsedContext(SlotInfo SlotInfo, SlotPermit Permit); +} diff --git a/src/Temporalio/Worker/Tuning/SlotPermit.cs b/src/Temporalio/Worker/Tuning/SlotPermit.cs new file mode 100644 index 00000000..28462f4a --- /dev/null +++ b/src/Temporalio/Worker/Tuning/SlotPermit.cs @@ -0,0 +1,34 @@ +namespace Temporalio.Worker.Tuning +{ + /// + /// A permit to use a slot for a workflow/activity/local activity task. This class can be + /// extended if desired. + /// + /// + /// WARNING: Custom slot suppliers are currently experimental. + /// + public class SlotPermit + { + /// + /// Initializes a new instance of the class with no associated data. + /// + public SlotPermit() + { + UserData = null; + } + + /// + /// Initializes a new instance of the class. + /// + /// Data associated with the permit. + public SlotPermit(object data) + { + UserData = data; + } + + /// + /// Gets data associated with the permit. + /// + public object? UserData { get; init; } + } +} diff --git a/src/Temporalio/Worker/Tuning/SlotReleaseContext.cs b/src/Temporalio/Worker/Tuning/SlotReleaseContext.cs new file mode 100644 index 00000000..029f1b31 --- /dev/null +++ b/src/Temporalio/Worker/Tuning/SlotReleaseContext.cs @@ -0,0 +1,16 @@ +namespace Temporalio.Worker.Tuning +{ + /// + /// Context for releasing a slot used from a . + /// + /// Info about the task that will be using the slot. May be null if the slot was never used. + /// The permit that was issued when the slot was reserved. + /// + /// WARNING: Custom slot suppliers are currently experimental. + /// + /// + /// WARNING: This constructor may have required properties added. Do not rely on the exact + /// constructor, only use "with" clauses. + /// + public record SlotReleaseContext(SlotInfo? SlotInfo, SlotPermit Permit); +} diff --git a/src/Temporalio/Worker/Tuning/SlotReserveContext.cs b/src/Temporalio/Worker/Tuning/SlotReserveContext.cs new file mode 100644 index 00000000..f476e76c --- /dev/null +++ b/src/Temporalio/Worker/Tuning/SlotReserveContext.cs @@ -0,0 +1,24 @@ +namespace Temporalio.Worker.Tuning +{ + /// + /// Context for reserving a slot from a . + /// + /// The type of slot trying to be reserved. + /// The name of the task queue for which this reservation request is associated. + /// The identity of the worker that is requesting the reservation. + /// The build id of the worker that is requesting the reservation. + /// True iff this is a reservation for a sticky poll for a workflow task. + /// + /// WARNING: Custom slot suppliers are currently experimental. + /// + /// + /// WARNING: This constructor may have required properties added. Do not rely on the exact + /// constructor, only use "with" clauses. + /// + public record SlotReserveContext( + SlotType SlotType, + string TaskQueue, + string WorkerIdentity, + string WorkerBuildId, + bool IsSticky); +} diff --git a/src/Temporalio/Worker/Tuning/SlotSupplier.cs b/src/Temporalio/Worker/Tuning/SlotSupplier.cs new file mode 100644 index 00000000..b8517e7a --- /dev/null +++ b/src/Temporalio/Worker/Tuning/SlotSupplier.cs @@ -0,0 +1,24 @@ +#pragma warning disable CA1040 // We are ok with an empty interface here + +namespace Temporalio.Worker.Tuning +{ + /// + /// Slot suppliers control how slots are handed out for workflow and activity tasks as well as + /// local activities when used in conjunction with a . + /// + /// Pre-built slot suppliers are available as + /// and . + /// + /// In order to implement your own slot supplier, you can extend the + /// class. + /// + public abstract class SlotSupplier + { + /// + /// Initializes a new instance of the class. + /// + internal SlotSupplier() + { + } + } +} diff --git a/src/Temporalio/Worker/Tuning/SlotType.cs b/src/Temporalio/Worker/Tuning/SlotType.cs new file mode 100644 index 00000000..dd5083bd --- /dev/null +++ b/src/Temporalio/Worker/Tuning/SlotType.cs @@ -0,0 +1,23 @@ +namespace Temporalio.Worker.Tuning +{ + /// + /// Defines types of Slots that workers use. + /// + public enum SlotType + { + /// + /// Workflow slot type. + /// + Workflow, + + /// + /// Activity slot type. + /// + Activity, + + /// + /// Local activity slot type. + /// + LocalActivity, + } +} diff --git a/src/Temporalio/Worker/Tuning/WorkerTuner.cs b/src/Temporalio/Worker/Tuning/WorkerTuner.cs index 57e6355e..1cebd4f2 100644 --- a/src/Temporalio/Worker/Tuning/WorkerTuner.cs +++ b/src/Temporalio/Worker/Tuning/WorkerTuner.cs @@ -1,7 +1,7 @@ namespace Temporalio.Worker.Tuning { /// - /// Implements by holding the different s. + /// Implements by holding the different s. /// public class WorkerTuner : IWorkerTuner { @@ -14,9 +14,9 @@ public class WorkerTuner : IWorkerTuner /// The supplier of local activity slots. /// The tuner. public WorkerTuner( - ISlotSupplier workflowTaskSlotSupplier, - ISlotSupplier activityTaskSlotSupplier, - ISlotSupplier localActivitySlotSupplier) + SlotSupplier workflowTaskSlotSupplier, + SlotSupplier activityTaskSlotSupplier, + SlotSupplier localActivitySlotSupplier) { WorkflowTaskSlotSupplier = workflowTaskSlotSupplier; ActivityTaskSlotSupplier = activityTaskSlotSupplier; @@ -27,19 +27,19 @@ public WorkerTuner( /// Gets a slot supplier for workflow tasks. /// /// A slot supplier for workflow tasks. - public ISlotSupplier WorkflowTaskSlotSupplier { get; init; } + public SlotSupplier WorkflowTaskSlotSupplier { get; init; } /// /// Gets a slot supplier for activity tasks. /// /// A slot supplier for activity tasks. - public ISlotSupplier ActivityTaskSlotSupplier { get; init; } + public SlotSupplier ActivityTaskSlotSupplier { get; init; } /// /// Gets a slot supplier for local activities. /// /// A slot supplier for local activities. - public ISlotSupplier LocalActivitySlotSupplier { get; init; } + public SlotSupplier LocalActivitySlotSupplier { get; init; } /// /// Create a resource based tuner with the provided options. @@ -103,4 +103,4 @@ public static WorkerTuner CreateFixedSize( new FixedSizeSlotSupplier(localActivitySlots)); } } -} \ No newline at end of file +} diff --git a/src/Temporalio/Worker/WorkflowReplayerOptions.cs b/src/Temporalio/Worker/WorkflowReplayerOptions.cs index e43c0519..cea271a2 100644 --- a/src/Temporalio/Worker/WorkflowReplayerOptions.cs +++ b/src/Temporalio/Worker/WorkflowReplayerOptions.cs @@ -99,8 +99,8 @@ public class WorkflowReplayerOptions : ICloneable public bool DisableWorkflowTracingEventListener { get; set; } /// - /// Gets or sets the logging factory used by loggers in workers. If unset, defaults to the - /// client logger factory. + /// Gets or sets the logging factory used by loggers in workers. If unset, defaults to a + /// null logger. /// public ILoggerFactory LoggerFactory { get; set; } = NullLoggerFactory.Instance; @@ -192,4 +192,4 @@ internal void OnTaskCompleted(WorkflowInstance instance, Exception? failureExcep } } } -} \ No newline at end of file +} diff --git a/tests/Temporalio.Tests/Extensions/Hosting/TemporalWorkerServiceTests.cs b/tests/Temporalio.Tests/Extensions/Hosting/TemporalWorkerServiceTests.cs index bd247a67..c10c6c06 100644 --- a/tests/Temporalio.Tests/Extensions/Hosting/TemporalWorkerServiceTests.cs +++ b/tests/Temporalio.Tests/Extensions/Hosting/TemporalWorkerServiceTests.cs @@ -79,11 +79,11 @@ public async Task TemporalWorkerService_ExecuteAsync_SimpleWorker() // Add the rest of the services services. AddSingleton(loggerFactory). - AddScoped(). - // We are also adding the DB client as a keyed service to demonstrate keyed service - // support for our DI logic. This used to break because newer DI library versions - // disallowed accessing certain properties on keyed services which we access - // internally for dupe checks. + AddScoped(). + // We are also adding the DB client as a keyed service to demonstrate keyed service + // support for our DI logic. This used to break because newer DI library versions + // disallowed accessing certain properties on keyed services which we access + // internally for dupe checks. AddKeyedScoped("client-keyed"). AddHostedTemporalWorker(taskQueue). AddScopedActivities(). diff --git a/tests/Temporalio.Tests/Worker/WorkerTuningTests.cs b/tests/Temporalio.Tests/Worker/WorkerTuningTests.cs index 241973f5..306cd735 100644 --- a/tests/Temporalio.Tests/Worker/WorkerTuningTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkerTuningTests.cs @@ -35,6 +35,16 @@ public async Task RunAsync(string name) } } + [Workflow] + public class OneTaskWf + { + [WorkflowRun] + public async Task RunAsync() + { + return "Hi!"; + } + } + [Fact] public async Task CanRunWith_ResourceBasedTuner() { @@ -111,4 +121,197 @@ public async Task Cannot_Mix_MaxConcurrent_And_Tuner() }); Assert.Contains("Cannot set both Tuner and any of", argumentException.Message); } -} \ No newline at end of file + + private class MySlotSupplier : CustomSlotSupplier + { + private object lockObj = new(); + + public uint ReserveCount { get; private set; } + + public uint ReleaseCount { get; private set; } + + public uint BiggestReleasedPermit { get; private set; } + + public bool SawWFSlotInfo { get; private set; } + + public bool SawActSlotInfo { get; private set; } + + public HashSet SeenReserveTypes { get; } = new(); + + public HashSet SeenActivityTypes { get; } = new(); + + public HashSet SeenWorkflowTypes { get; } = new(); + + public HashSet SeenStickyTypes { get; } = new(); + + public HashSet SeenReleaseInfoPresence { get; } = new(); + + public override async Task ReserveSlotAsync(SlotReserveContext ctx, CancellationToken cancellationToken) + { + // Do something async to make sure that works + await Task.Delay(10, cancellationToken); + ReserveTracking(ctx); + return new SlotPermit(ReserveCount); + } + + public override SlotPermit? TryReserveSlot(SlotReserveContext ctx) + { + ReserveTracking(ctx); + return new SlotPermit(ReserveCount); + } + + public override void MarkSlotUsed(SlotMarkUsedContext ctx) + { + lock (lockObj) + { + switch (ctx.SlotInfo) + { + case Temporalio.Worker.Tuning.SlotInfo.WorkflowSlotInfo wsi: + SawWFSlotInfo = true; + SeenWorkflowTypes.Add(wsi.WorkflowType); + break; + case Temporalio.Worker.Tuning.SlotInfo.ActivitySlotInfo asi: + SawActSlotInfo = true; + SeenActivityTypes.Add(asi.ActivityType); + break; + case Temporalio.Worker.Tuning.SlotInfo.LocalActivitySlotInfo lasi: + break; + } + } + } + + public override void ReleaseSlot(SlotReleaseContext ctx) + { + var dat = (uint)ctx.Permit.UserData!; + lock (lockObj) + { + ReleaseCount++; + SeenReleaseInfoPresence.Add(ctx.SlotInfo == null); + if (dat > BiggestReleasedPermit) + { + BiggestReleasedPermit = dat; + } + } + } + + private void ReserveTracking(SlotReserveContext ctx) + { + lock (lockObj) + { + ReserveCount++; + SeenStickyTypes.Add(ctx.IsSticky); + SeenReserveTypes.Add(ctx.SlotType); + } + } + } + + [Fact] + public async Task CanRunWith_CustomSlotSupplier() + { + var mySlotSupplier = new MySlotSupplier(); + using var worker = new TemporalWorker( + Client, + new TemporalWorkerOptions($"tq-{Guid.NewGuid()}") + { + Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier), + }.AddWorkflow().AddActivity(SimpleWorkflow.SomeActivity)); + await worker.ExecuteAsync(async () => + { + await Env.Client.ExecuteWorkflowAsync( + (SimpleWorkflow wf) => wf.RunAsync("Temporal"), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + }); + Assert.Equal(mySlotSupplier.ReleaseCount, mySlotSupplier.BiggestReleasedPermit); + Assert.True(mySlotSupplier.SawWFSlotInfo); + Assert.True(mySlotSupplier.SawActSlotInfo); + Assert.Contains("SimpleWorkflow", mySlotSupplier.SeenWorkflowTypes); + Assert.Contains("SomeActivity", mySlotSupplier.SeenActivityTypes); + Assert.Equal(3, mySlotSupplier.SeenReserveTypes.Count); + Assert.Equal(2, mySlotSupplier.SeenReleaseInfoPresence.Count); + } + + private class ThrowingSlotSupplier : CustomSlotSupplier + { + public override Task ReserveSlotAsync(SlotReserveContext ctx, CancellationToken cancellationToken) + { + // Let the workflow complete, but other reservations fail + if (ctx.SlotType == SlotType.Workflow) + { + return Task.FromResult(new SlotPermit(1)); + } + throw new InvalidOperationException("ReserveSlot"); + } + + public override SlotPermit? TryReserveSlot(SlotReserveContext ctx) + { + throw new InvalidOperationException("TryReserveSlot"); + } + + public override void MarkSlotUsed(SlotMarkUsedContext ctx) + { + throw new InvalidOperationException("MarkSlotUsed"); + } + + public override void ReleaseSlot(SlotReleaseContext ctx) + { + throw new InvalidOperationException("ReleaseSlot"); + } + } + + [Fact] + public async Task CanRunWith_ThrowingSlotSupplier() + { + var mySlotSupplier = new ThrowingSlotSupplier(); + using var worker = new TemporalWorker( + Client, + new TemporalWorkerOptions($"tq-{Guid.NewGuid()}") + { + Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier), + }.AddWorkflow()); + await worker.ExecuteAsync(async () => + { + await Env.Client.ExecuteWorkflowAsync( + (OneTaskWf wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + }); + } + + private class BlockingSlotSupplier : CustomSlotSupplier + { + public override async Task ReserveSlotAsync(SlotReserveContext ctx, CancellationToken cancellationToken) + { + await Task.Delay(100_000, cancellationToken); + cancellationToken.ThrowIfCancellationRequested(); + throw new InvalidOperationException("Should not be reachable"); + } + + public override SlotPermit? TryReserveSlot(SlotReserveContext ctx) + { + return null; + } + + public override void MarkSlotUsed(SlotMarkUsedContext ctx) + { + } + + public override void ReleaseSlot(SlotReleaseContext ctx) + { + } + } + + [Fact] + public async Task CanRunWith_BlockingSlotSupplier() + { + var mySlotSupplier = new BlockingSlotSupplier(); + using var worker = new TemporalWorker( + Client, + new TemporalWorkerOptions($"tq-{Guid.NewGuid()}") + { + Tuner = new WorkerTuner(mySlotSupplier, mySlotSupplier, mySlotSupplier), + }.AddWorkflow()); + await worker.ExecuteAsync(async () => + { + await Task.Delay(1000); + }); + } +}