Skip to content

Commit

Permalink
Revert WorkflowSignalOperation (#1801)
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy authored Feb 4, 2025
1 parent 8f05d01 commit 2030f9b
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 154 deletions.
40 changes: 0 additions & 40 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,46 +93,6 @@ func NewSyncOperation[I any, O any](
}
}

// SignalWorkflowInput encapsulates the values required to send a signal to a workflow.
//
// NOTE: Experimental
type SignalWorkflowInput struct {
// WorkflowID is the ID of the workflow which will receive the signal. Required.
WorkflowID string
// RunID is the run ID of the workflow which will receive the signal. Optional. If empty, the signal will be
// delivered to the running execution of the indicated workflow ID.
RunID string
// SignalName is the name of the signal. Required.
SignalName string
// Arg is the payload attached to the signal. Optional.
Arg any
}

// NewWorkflowSignalOperation is a helper for creating a synchronous nexus.Operation to deliver a signal, linking the
// signal to a Nexus operation. Request ID from the Nexus options is propagated to the workflow to ensure idempotency.
//
// NOTE: Experimental
func NewWorkflowSignalOperation[T any](
name string,
getSignalInput func(context.Context, T, nexus.StartOperationOptions) SignalWorkflowInput,
) nexus.Operation[T, nexus.NoValue] {
return NewSyncOperation(name, func(ctx context.Context, c client.Client, in T, options nexus.StartOperationOptions) (nexus.NoValue, error) {
signalInput := getSignalInput(ctx, in, options)

if options.RequestID != "" {
ctx = context.WithValue(ctx, internal.NexusOperationRequestIDKey, options.RequestID)
}

links, err := convertNexusLinks(options.Links, GetLogger(ctx))
if err != nil {
return nil, err
}
ctx = context.WithValue(ctx, internal.NexusOperationLinksKey, links)

return nil, c.SignalWorkflow(ctx, signalInput.WorkflowID, signalInput.RunID, signalInput.SignalName, signalInput.Arg)
})
}

func (o *syncOperation[I, O]) Name() string {
return o.name
}
Expand Down
114 changes: 0 additions & 114 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,120 +576,6 @@ func TestInvalidOperationInput(t *testing.T) {
require.ErrorContains(t, run.Get(ctx, nil), `cannot assign argument of type int to type string for operation workflow-op`)
}

func TestSignalOperationFromWorkflow(t *testing.T) {
receiverID := "nexus-signal-receiver-" + uuid.NewString()

op := temporalnexus.NewWorkflowSignalOperation("signal-operation", func(_ context.Context, input string, _ nexus.StartOperationOptions) temporalnexus.SignalWorkflowInput {
return temporalnexus.SignalWorkflowInput{
WorkflowID: receiverID,
SignalName: "nexus-signal",
Arg: input,
}
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
tc := newTestContext(t, ctx)

senderWF := func(ctx workflow.Context) error {
c := workflow.NewNexusClient(tc.endpoint, "test")
fut := c.ExecuteOperation(ctx, op, "nexus", workflow.NexusOperationOptions{})

var exec workflow.NexusOperationExecution
if err := fut.GetNexusOperationExecution().Get(ctx, &exec); err != nil {
return fmt.Errorf("expected start to succeed: %w", err)
}
if exec.OperationID != "" {
return fmt.Errorf("expected empty operation ID")
}

return fut.Get(ctx, nil)
}

w := worker.New(tc.client, tc.taskQueue, worker.Options{})
service := nexus.NewService("test")
require.NoError(t, service.Register(op))
w.RegisterNexusService(service)
w.RegisterWorkflow(waitForSignalWorkflow)
w.RegisterWorkflow(senderWF)
require.NoError(t, w.Start())
t.Cleanup(w.Stop)

receiver, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
ID: receiverID,
TaskQueue: tc.taskQueue,
// The endpoint registry may take a bit to propagate to the history service, use a shorter workflow task
// timeout to speed up the attempts.
WorkflowTaskTimeout: time.Second,
}, waitForSignalWorkflow, "successful")
require.NoError(t, err)

sender, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
TaskQueue: tc.taskQueue,
// The endpoint registry may take a bit to propagate to the history service, use a shorter workflow task
// timeout to speed up the attempts.
WorkflowTaskTimeout: time.Second,
}, senderWF)
require.NoError(t, err)
require.NoError(t, sender.Get(ctx, nil))

iter := tc.client.GetWorkflowHistory(
ctx,
sender.GetID(),
sender.GetRunID(),
false,
enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
)
var nexusOperationScheduleEventID int64
var targetEvent *historypb.HistoryEvent
for iter.HasNext() {
event, err := iter.Next()
require.NoError(t, err)
if event.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED {
nexusOperationScheduleEventID = event.GetEventId()
require.NotEmpty(t, event.GetNexusOperationScheduledEventAttributes().GetRequestId())
break
}
}

var out string
require.NoError(t, receiver.Get(ctx, &out))
require.Equal(t, "nexus", out)

iter = tc.client.GetWorkflowHistory(
ctx,
receiver.GetID(),
receiver.GetRunID(),
false,
enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
)
for iter.HasNext() {
event, err := iter.Next()
require.NoError(t, err)
if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
targetEvent = event
break
}
}
require.NotNil(t, targetEvent)
require.NotNil(t, targetEvent.GetWorkflowExecutionSignaledEventAttributes())
require.Len(t, targetEvent.GetLinks(), 1)
require.True(t, proto.Equal(
&common.Link_WorkflowEvent{
Namespace: tc.testConfig.Namespace,
WorkflowId: sender.GetID(),
RunId: sender.GetRunID(),
Reference: &common.Link_WorkflowEvent_EventRef{
EventRef: &common.Link_WorkflowEvent_EventReference{
EventId: nexusOperationScheduleEventID,
EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
},
},
},
targetEvent.GetLinks()[0].GetWorkflowEvent(),
))
}

func TestAsyncOperationFromWorkflow(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down

0 comments on commit 2030f9b

Please sign in to comment.