Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Nexus SignalWorkflowOperation #1770

Merged
merged 13 commits into from
Jan 17, 2025
15 changes: 14 additions & 1 deletion temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/nexus-rpc/sdk-go/nexus"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/internal"
"go.temporal.io/sdk/internal/common/metrics"
Expand Down Expand Up @@ -94,6 +93,20 @@ func NewSyncOperation[I any, O any](
}
}

// NewSignalWorkflowOperation is a helper for creating a synchronous nexus.Operation to deliver a signal.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also call out that this ensures idempotency via the request ID mechanism (to some degree - violated if the workflow spans multiple runs) and the bidi linking is also important to call out (for now it's only uni-directional but we'll fix that).

// NOTE: Experimental
func NewSignalWorkflowOperation[I any](
name string,
workflowID string,
runID string,
signalName string,
) nexus.Operation[I, nexus.NoValue] {
return NewSyncOperation(name, func(ctx context.Context, c client.Client, i I, options nexus.StartOperationOptions) (nexus.NoValue, error) {
return nil, c.SignalWorkflow(ctx, workflowID, runID, signalName, i)
pdoerner marked this conversation as resolved.
Show resolved Hide resolved
})
}

func (o *syncOperation[I, O]) Name() string {
return o.name
}
Expand Down
102 changes: 102 additions & 0 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ func waitForCancelWorkflow(ctx workflow.Context, ownID string) (string, error) {
return "", workflow.Await(ctx, func() bool { return false })
}

func waitForSignalWorkflow(ctx workflow.Context, _ string) (string, error) {
ch := workflow.GetSignalChannel(ctx, "nexus-signal")
var val string
ch.Receive(ctx, &val)
return val, ctx.Err()
}

var workflowOp = temporalnexus.NewWorkflowRunOperation(
"workflow-op",
waitForCancelWorkflow,
Expand Down Expand Up @@ -371,6 +378,52 @@ func TestNexusSyncOperation(t *testing.T) {
})
}

func TestNexusSignalWorkflowOperation(t *testing.T) {
pdoerner marked this conversation as resolved.
Show resolved Hide resolved
receiverID := "nexus-signal-receiver-" + uuid.NewString()
op := temporalnexus.NewSignalWorkflowOperation[string]("signal-op", receiverID, "", "nexus-signal")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

tc := newTestContext(t, ctx)

w := worker.New(tc.client, tc.taskQueue, worker.Options{})
service := nexus.NewService("test")
require.NoError(t, service.Register(op))
w.RegisterNexusService(service)
w.RegisterWorkflow(waitForSignalWorkflow)
require.NoError(t, w.Start())
t.Cleanup(w.Stop)

run, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
ID: receiverID,
TaskQueue: tc.taskQueue,
}, waitForSignalWorkflow, "successful")
require.NoError(t, err)

nc := tc.newNexusClient(t, service.Name)

result, err := nexus.ExecuteOperation(ctx, nc, op, "nexus",
nexus.ExecuteOperationOptions{
RequestID: "test-request-id",
Header: nexus.Header{"test": "ok"},
CallbackURL: "http://localhost/test",
CallbackHeader: nexus.Header{"test": "ok"},
})
require.NoError(t, err)
require.Nil(t, result)

var out string
require.NoError(t, run.Get(ctx, &out))
require.Equal(t, "nexus", out)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test the request ID and links are properly attached.

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, op.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, op.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, op.Name())
}, time.Second*3, time.Millisecond*100)
}

func TestNexusWorkflowRunOperation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down Expand Up @@ -550,6 +603,55 @@ func TestSyncOperationFromWorkflow(t *testing.T) {
})
}

func TestSignalOperationFromWorkflow(t *testing.T) {
receiverID := "nexus-signal-receiver-" + uuid.NewString()
op := temporalnexus.NewSignalWorkflowOperation[string]("signal-op", receiverID, "", "nexus-signal")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
tc := newTestContext(t, ctx)

senderWF := func(ctx workflow.Context) error {
c := workflow.NewNexusClient(tc.endpoint, "test")
fut := c.ExecuteOperation(ctx, op, "nexus", workflow.NexusOperationOptions{})

var exec workflow.NexusOperationExecution
if err := fut.GetNexusOperationExecution().Get(ctx, &exec); err != nil {
return fmt.Errorf("expected start to succeed: %w", err)
}
if exec.OperationID != "" {
return fmt.Errorf("expected empty operation ID")
}

return fut.Get(ctx, nil)
}

w := worker.New(tc.client, tc.taskQueue, worker.Options{})
service := nexus.NewService("test")
require.NoError(t, service.Register(op))
w.RegisterNexusService(service)
w.RegisterWorkflow(waitForSignalWorkflow)
w.RegisterWorkflow(senderWF)
require.NoError(t, w.Start())
t.Cleanup(w.Stop)

receiver, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
ID: receiverID,
TaskQueue: tc.taskQueue,
}, waitForSignalWorkflow, "successful")
require.NoError(t, err)

sender, err := tc.client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
TaskQueue: tc.taskQueue,
}, senderWF)
require.NoError(t, err)
require.NoError(t, sender.Get(ctx, nil))

var out string
require.NoError(t, receiver.Get(ctx, &out))
require.Equal(t, "nexus", out)
}

func TestAsyncOperationFromWorkflow(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down
Loading