From 931c0f580464a8afced2c994401427aa44dc93a4 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Sat, 25 Jan 2025 02:58:30 -0800 Subject: [PATCH 1/2] sleep-for-days sample --- sleep-for-days/README.md | 16 +++++++++++ sleep-for-days/sleepfordays_workflow.go | 37 +++++++++++++++++++++++++ sleep-for-days/starter/main.go | 37 +++++++++++++++++++++++++ sleep-for-days/worker/main.go | 29 +++++++++++++++++++ 4 files changed, 119 insertions(+) create mode 100644 sleep-for-days/README.md create mode 100644 sleep-for-days/sleepfordays_workflow.go create mode 100644 sleep-for-days/starter/main.go create mode 100644 sleep-for-days/worker/main.go diff --git a/sleep-for-days/README.md b/sleep-for-days/README.md new file mode 100644 index 00000000..e3f88df8 --- /dev/null +++ b/sleep-for-days/README.md @@ -0,0 +1,16 @@ +### Sleep for days + +This sample demonstrates how to use Temporal to run a workflow that periodically sleeps for a number of days. + +### Steps to run this sample: +1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). +2) Run the following command to start the worker +``` +go run worker/main.go +``` +3) Run the following command to start the example +``` +go run starter/main.go +``` + +This sample will run indefinitely until you send a `complete` signal to the workflow. See how to send a signal via Temporal CLI [here](https://docs.temporal.io/cli/workflow#signal). \ No newline at end of file diff --git a/sleep-for-days/sleepfordays_workflow.go b/sleep-for-days/sleepfordays_workflow.go new file mode 100644 index 00000000..0ebececc --- /dev/null +++ b/sleep-for-days/sleepfordays_workflow.go @@ -0,0 +1,37 @@ +package helloworld + +import ( + "context" + "fmt" + "time" + + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/workflow" +) + +const CompleteSignal = "complete" + +func SleepForDaysWorkflow(ctx workflow.Context, days int) (string, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + }) + + isComplete := false + sigChan := workflow.GetSignalChannel(ctx, CompleteSignal) + workflow.Go(ctx, func(ctx workflow.Context) { + sigChan.Receive(ctx, &isComplete) + }) + + for !isComplete { + workflow.ExecuteActivity(ctx, SendEmailActivity, fmt.Sprintf("Sleeping for %d days", days)).IsReady() + workflow.Sleep(ctx, time.Hour*24*time.Duration(days)) + } + + return "done", nil +} + +// A stub Activity for sending an email. +func SendEmailActivity(ctx context.Context, msg string) error { + activity.GetLogger(ctx).Info(fmt.Sprintf(`Sending email: "%v"\n`, msg)) + return nil +} diff --git a/sleep-for-days/starter/main.go b/sleep-for-days/starter/main.go new file mode 100644 index 00000000..8071b065 --- /dev/null +++ b/sleep-for-days/starter/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "log" + + sleepfordays "github.com/temporalio/samples-go/sleep-for-days" + "go.temporal.io/sdk/client" +) + +func main() { + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + workflowOptions := client.StartWorkflowOptions{ + TaskQueue: "sleep-for-days", + } + + const numDaysSleep = 3 + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, sleepfordays.SleepForDaysWorkflow, numDaysSleep) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + + log.Println("Started sleep-for-days workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + // Synchronously wait for the workflow completion (will run indefinitely until it receives a signal) + var result string + err = we.Get(context.Background(), &result) + if err != nil { + log.Fatalln("Unable get workflow result", err) + } + log.Println("Workflow result:", result) +} diff --git a/sleep-for-days/worker/main.go b/sleep-for-days/worker/main.go new file mode 100644 index 00000000..cef3ab5a --- /dev/null +++ b/sleep-for-days/worker/main.go @@ -0,0 +1,29 @@ +package main + +import ( + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + + sleepfordays "github.com/temporalio/samples-go/sleep-for-days" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "sleep-for-days", worker.Options{}) + + w.RegisterWorkflow(sleepfordays.SleepForDaysWorkflow) + w.RegisterActivity(sleepfordays.SendEmailActivity) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} From 492f5a54a0d1307f7a0dcffce2e0b9f35d67c992 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 31 Jan 2025 16:17:51 -0800 Subject: [PATCH 2/2] add test, fixed bugs --- sleep-for-days/README.md | 2 +- sleep-for-days/sleepfordays_workflow.go | 20 +++++----- sleep-for-days/sleepfordays_workflow_test.go | 39 ++++++++++++++++++++ sleep-for-days/starter/main.go | 3 +- 4 files changed, 51 insertions(+), 13 deletions(-) create mode 100644 sleep-for-days/sleepfordays_workflow_test.go diff --git a/sleep-for-days/README.md b/sleep-for-days/README.md index e3f88df8..0d6969aa 100644 --- a/sleep-for-days/README.md +++ b/sleep-for-days/README.md @@ -1,6 +1,6 @@ ### Sleep for days -This sample demonstrates how to use Temporal to run a workflow that periodically sleeps for a number of days. +This sample demonstrates how to create a Temporal workflow that runs forever, sending an email every 30 days. ### Steps to run this sample: 1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). diff --git a/sleep-for-days/sleepfordays_workflow.go b/sleep-for-days/sleepfordays_workflow.go index 0ebececc..1e44256f 100644 --- a/sleep-for-days/sleepfordays_workflow.go +++ b/sleep-for-days/sleepfordays_workflow.go @@ -1,4 +1,4 @@ -package helloworld +package sleepfordays import ( "context" @@ -9,22 +9,22 @@ import ( "go.temporal.io/sdk/workflow" ) -const CompleteSignal = "complete" - -func SleepForDaysWorkflow(ctx workflow.Context, days int) (string, error) { +func SleepForDaysWorkflow(ctx workflow.Context) (string, error) { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 10 * time.Second, }) isComplete := false - sigChan := workflow.GetSignalChannel(ctx, CompleteSignal) - workflow.Go(ctx, func(ctx workflow.Context) { - sigChan.Receive(ctx, &isComplete) - }) + sigChan := workflow.GetSignalChannel(ctx, "complete") for !isComplete { - workflow.ExecuteActivity(ctx, SendEmailActivity, fmt.Sprintf("Sleeping for %d days", days)).IsReady() - workflow.Sleep(ctx, time.Hour*24*time.Duration(days)) + workflow.ExecuteActivity(ctx, SendEmailActivity, "Sleeping for 30 days") + selector := workflow.NewSelector(ctx) + selector.AddFuture(workflow.NewTimer(ctx, time.Hour*24*30), func(f workflow.Future) {}) + selector.AddReceive(sigChan, func(c workflow.ReceiveChannel, more bool) { + isComplete = true + }) + selector.Select(ctx) } return "done", nil diff --git a/sleep-for-days/sleepfordays_workflow_test.go b/sleep-for-days/sleepfordays_workflow_test.go new file mode 100644 index 00000000..ee7d28e5 --- /dev/null +++ b/sleep-for-days/sleepfordays_workflow_test.go @@ -0,0 +1,39 @@ +package sleepfordays + +import ( + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "go.temporal.io/sdk/testsuite" +) + +func TestSleepForDaysWorkflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + numActivityCalls := 0 + env.RegisterActivity(SendEmailActivity) + env.OnActivity(SendEmailActivity, mock.Anything, mock.Anything).Run( + func(args mock.Arguments) { numActivityCalls++ }, + ).Return(nil) + + startTime := env.Now() + + // Time-skip 90 days. + env.RegisterDelayedCallback(func() { + // Check that the activity has been called 3 times. + require.Equal(t, 3, numActivityCalls) + // Send the signal to complete the workflow. + env.SignalWorkflow("complete", nil) + // Expect no more activity calls to have been made - workflow is complete. + require.Equal(t, 3, numActivityCalls) + // Expect more than 90 days to have passed. + require.Equal(t, env.Now().Sub(startTime), time.Hour*24*90) + }, time.Hour*24*90) + + // Execute workflow. + env.ExecuteWorkflow(SleepForDaysWorkflow) +} diff --git a/sleep-for-days/starter/main.go b/sleep-for-days/starter/main.go index 8071b065..bfbead7c 100644 --- a/sleep-for-days/starter/main.go +++ b/sleep-for-days/starter/main.go @@ -19,8 +19,7 @@ func main() { TaskQueue: "sleep-for-days", } - const numDaysSleep = 3 - we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, sleepfordays.SleepForDaysWorkflow, numDaysSleep) + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, sleepfordays.SleepForDaysWorkflow) if err != nil { log.Fatalln("Unable to execute workflow", err) }