From 454a8a8a227db030f9ae80458eaf844c82699c07 Mon Sep 17 00:00:00 2001 From: pdoerner <122412190+pdoerner@users.noreply.github.com> Date: Fri, 17 Jan 2025 09:57:22 -0800 Subject: [PATCH] Add Nexus SignalWorkflowOperation (#1770) * Add Nexus SignalWorkflowOperation * request ID and links handling * docs * test * test timeout * test * test timeout * cleanup * feedback * comments * experimental --- internal/internal_workflow_client.go | 10 ++- internal/nexus_operations.go | 8 ++ temporalnexus/operation.go | 75 +++++++++++++---- test/nexus_test.go | 121 +++++++++++++++++++++++++++ 4 files changed, 199 insertions(+), 15 deletions(-) diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 7060161f8..1101bff59 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -1972,9 +1972,10 @@ func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *Clie return err } + links, _ := ctx.Value(NexusOperationLinksKey).([]*commonpb.Link) + request := &workflowservice.SignalWorkflowExecutionRequest{ Namespace: w.client.namespace, - RequestId: uuid.New(), WorkflowExecution: &commonpb.WorkflowExecution{ WorkflowId: in.WorkflowID, RunId: in.RunID, @@ -1983,6 +1984,13 @@ func (w *workflowClientInterceptor) SignalWorkflow(ctx context.Context, in *Clie Input: input, Identity: w.client.identity, Header: header, + Links: links, + } + + if requestID, ok := ctx.Value(NexusOperationRequestIDKey).(string); ok && requestID != "" { + request.RequestId = requestID + } else { + request.RequestId = uuid.New() } grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index 4bbf0bbf8..b0f559533 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -66,6 +66,14 @@ type isWorkflowRunOpContextKeyType struct{} // panic as we don't want to expose a partial client to sync operations. var IsWorkflowRunOpContextKey = isWorkflowRunOpContextKeyType{} +type nexusOperationRequestIDKeyType struct{} + +var NexusOperationRequestIDKey = nexusOperationRequestIDKeyType{} + +type nexusOperationLinksKeyType struct{} + +var NexusOperationLinksKey = nexusOperationLinksKeyType{} + // NexusOperationContextFromGoContext gets the [NexusOperationContext] associated with the given [context.Context]. func NexusOperationContextFromGoContext(ctx context.Context) (nctx *NexusOperationContext, ok bool) { nctx, ok = ctx.Value(nexusOperationContextKey).(*NexusOperationContext) diff --git a/temporalnexus/operation.go b/temporalnexus/operation.go index 58b4b8b4e..84ee119a2 100644 --- a/temporalnexus/operation.go +++ b/temporalnexus/operation.go @@ -45,7 +45,6 @@ import ( "github.com/nexus-rpc/sdk-go/nexus" "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" - "go.temporal.io/sdk/client" "go.temporal.io/sdk/internal" "go.temporal.io/sdk/internal/common/metrics" @@ -94,6 +93,46 @@ func NewSyncOperation[I any, O any]( } } +// SignalWorkflowInput encapsulates the values required to send a signal to a workflow. +// +// NOTE: Experimental +type SignalWorkflowInput struct { + // WorkflowID is the ID of the workflow which will receive the signal. Required. + WorkflowID string + // RunID is the run ID of the workflow which will receive the signal. Optional. If empty, the signal will be + // delivered to the running execution of the indicated workflow ID. + RunID string + // SignalName is the name of the signal. Required. + SignalName string + // Arg is the payload attached to the signal. Optional. + Arg any +} + +// NewWorkflowSignalOperation is a helper for creating a synchronous nexus.Operation to deliver a signal, linking the +// signal to a Nexus operation. Request ID from the Nexus options is propagated to the workflow to ensure idempotency. +// +// NOTE: Experimental +func NewWorkflowSignalOperation[T any]( + name string, + getSignalInput func(context.Context, T, nexus.StartOperationOptions) SignalWorkflowInput, +) nexus.Operation[T, nexus.NoValue] { + return NewSyncOperation(name, func(ctx context.Context, c client.Client, in T, options nexus.StartOperationOptions) (nexus.NoValue, error) { + signalInput := getSignalInput(ctx, in, options) + + if options.RequestID != "" { + ctx = context.WithValue(ctx, internal.NexusOperationRequestIDKey, options.RequestID) + } + + links, err := convertNexusLinks(options.Links, GetLogger(ctx)) + if err != nil { + return nil, err + } + ctx = context.WithValue(ctx, internal.NexusOperationLinksKey, links) + + return nil, c.SignalWorkflow(ctx, signalInput.WorkflowID, signalInput.RunID, signalInput.SignalName, signalInput.Arg) + }) +} + func (o *syncOperation[I, O]) Name() string { return o.name } @@ -360,8 +399,26 @@ func ExecuteUntypedWorkflow[R any]( }) } + links, err := convertNexusLinks(nexusOptions.Links, nctx.Log) + if err != nil { + return nil, err + } + internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links) + + run, err := nctx.Client.ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...) + if err != nil { + return nil, err + } + return workflowHandle[R]{ + namespace: nctx.Namespace, + id: run.GetID(), + runID: run.GetRunID(), + }, nil +} + +func convertNexusLinks(nexusLinks []nexus.Link, log log.Logger) ([]*common.Link, error) { var links []*common.Link - for _, nexusLink := range nexusOptions.Links { + for _, nexusLink := range nexusLinks { switch nexusLink.Type { case string((&common.Link_WorkflowEvent{}).ProtoReflect().Descriptor().FullName()): link, err := ConvertNexusLinkToLinkWorkflowEvent(nexusLink) @@ -374,18 +431,8 @@ func ExecuteUntypedWorkflow[R any]( }, }) default: - nctx.Log.Warn("ignoring unsupported link data type: %q", nexusLink.Type) + log.Warn("ignoring unsupported link data type: %q", nexusLink.Type) } } - internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links) - - run, err := nctx.Client.ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...) - if err != nil { - return nil, err - } - return workflowHandle[R]{ - namespace: nctx.Namespace, - id: run.GetID(), - runID: run.GetRunID(), - }, nil + return links, nil } diff --git a/test/nexus_test.go b/test/nexus_test.go index 12dcb0f1e..1944bc2eb 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -205,6 +205,13 @@ func waitForCancelWorkflow(ctx workflow.Context, ownID string) (string, error) { return "", workflow.Await(ctx, func() bool { return false }) } +func waitForSignalWorkflow(ctx workflow.Context, _ string) (string, error) { + ch := workflow.GetSignalChannel(ctx, "nexus-signal") + var val string + ch.Receive(ctx, &val) + return val, ctx.Err() +} + var workflowOp = temporalnexus.NewWorkflowRunOperation( "workflow-op", waitForCancelWorkflow, @@ -550,6 +557,120 @@ func TestSyncOperationFromWorkflow(t *testing.T) { }) } +func TestSignalOperationFromWorkflow(t *testing.T) { + receiverID := "nexus-signal-receiver-" + uuid.NewString() + + op := temporalnexus.NewWorkflowSignalOperation("signal-operation", func(_ context.Context, input string, _ nexus.StartOperationOptions) temporalnexus.SignalWorkflowInput { + return temporalnexus.SignalWorkflowInput{ + WorkflowID: receiverID, + SignalName: "nexus-signal", + Arg: input, + } + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + tc := newTestContext(t, ctx) + + senderWF := func(ctx workflow.Context) error { + c := workflow.NewNexusClient(tc.endpoint, "test") + fut := c.ExecuteOperation(ctx, op, "nexus", workflow.NexusOperationOptions{}) + + var exec workflow.NexusOperationExecution + if err := fut.GetNexusOperationExecution().Get(ctx, &exec); err != nil { + return fmt.Errorf("expected start to succeed: %w", err) + } + if exec.OperationID != "" { + return fmt.Errorf("expected empty operation ID") + } + + return fut.Get(ctx, nil) + } + + w := worker.New(tc.client, tc.taskQueue, worker.Options{}) + service := nexus.NewService("test") + require.NoError(t, service.Register(op)) + w.RegisterNexusService(service) + w.RegisterWorkflow(waitForSignalWorkflow) + w.RegisterWorkflow(senderWF) + require.NoError(t, w.Start()) + t.Cleanup(w.Stop) + + receiver, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: receiverID, + TaskQueue: tc.taskQueue, + // The endpoint registry may take a bit to propagate to the history service, use a shorter workflow task + // timeout to speed up the attempts. + WorkflowTaskTimeout: time.Second, + }, waitForSignalWorkflow, "successful") + require.NoError(t, err) + + sender, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + TaskQueue: tc.taskQueue, + // The endpoint registry may take a bit to propagate to the history service, use a shorter workflow task + // timeout to speed up the attempts. + WorkflowTaskTimeout: time.Second, + }, senderWF) + require.NoError(t, err) + require.NoError(t, sender.Get(ctx, nil)) + + iter := tc.client.GetWorkflowHistory( + ctx, + sender.GetID(), + sender.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + var nexusOperationScheduleEventID int64 + var targetEvent *historypb.HistoryEvent + for iter.HasNext() { + event, err := iter.Next() + require.NoError(t, err) + if event.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED { + nexusOperationScheduleEventID = event.GetEventId() + require.NotEmpty(t, event.GetNexusOperationScheduledEventAttributes().GetRequestId()) + break + } + } + + var out string + require.NoError(t, receiver.Get(ctx, &out)) + require.Equal(t, "nexus", out) + + iter = tc.client.GetWorkflowHistory( + ctx, + receiver.GetID(), + receiver.GetRunID(), + false, + enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, + ) + for iter.HasNext() { + event, err := iter.Next() + require.NoError(t, err) + if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED { + targetEvent = event + break + } + } + require.NotNil(t, targetEvent) + require.NotNil(t, targetEvent.GetWorkflowExecutionSignaledEventAttributes()) + require.Len(t, targetEvent.GetLinks(), 1) + require.True(t, proto.Equal( + &common.Link_WorkflowEvent{ + Namespace: tc.testConfig.Namespace, + WorkflowId: sender.GetID(), + RunId: sender.GetRunID(), + Reference: &common.Link_WorkflowEvent_EventRef{ + EventRef: &common.Link_WorkflowEvent_EventReference{ + EventId: nexusOperationScheduleEventID, + EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + }, + }, + }, + targetEvent.GetLinks()[0].GetWorkflowEvent(), + )) +} + func TestAsyncOperationFromWorkflow(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel()