From bd57b08bb38914e2b5fa6c06f0b9570a1d1b7919 Mon Sep 17 00:00:00 2001 From: Yousuf Jawwad Date: Fri, 13 Sep 2024 03:25:17 +0500 Subject: [PATCH] chore(dispatch): activity context abstraction --- dispatch/defaults.go | 67 ++++++++++++++++++++++++++++++++++++++++++++ queues/queues.go | 6 ++++ 2 files changed, 73 insertions(+) create mode 100644 dispatch/defaults.go diff --git a/dispatch/defaults.go b/dispatch/defaults.go new file mode 100644 index 0000000..466508f --- /dev/null +++ b/dispatch/defaults.go @@ -0,0 +1,67 @@ +package dispatch + +import ( + "time" + + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" + + "go.breu.io/durex/queues" +) + +// WithDefaultActivityContext returns a workflow.Context with the default activity options applied. +// The default options include a StartToCloseTimeout of 60 seconds. +// +// Example: +// +// ctx = shared.WithDefaultActivityContext(ctx) +func WithDefaultActivityContext(ctx workflow.Context) workflow.Context { + return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 60 * time.Second, + }) +} + +// WithIgnoredErrorsContext returns a workflow.Context with activity options configured with a +// StartToCloseTimeout of 60 seconds and a RetryPolicy that allows a single attempt and ignores +// specified error types. +// +// Example: +// +// ignored := []string{"CustomErrorType"} +// ctx = shared.WithIgnoredErrorsContext(ctx, ignored...) +func WithIgnoredErrorsContext(ctx workflow.Context, args ...string) workflow.Context { + return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 60 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 1, + NonRetryableErrorTypes: args, + }, + }) +} + +// WithMarathonContext returns a workflow.Context with activity options configured for long-running activities. +// It sets the StartToCloseTimeout to 60 minutes and the HeartbeatTimeout to 30 seconds. +// +// Example: +// +// ctx = shared.WithMarathonContext(ctx) +func WithMarathonContext(ctx workflow.Context) workflow.Context { + return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 60 * time.Minute, + HeartbeatTimeout: 30 * time.Second, + }) +} + +// WithCustomQueueContext returns a workflow.Context with activity options configured with a +// StartToCloseTimeout of 60 seconds and a dedicated task queue. This allows scheduling activities +// on a different queue than the one the workflow is running on. +// +// Example: +// +// ctx = shared.WithCustomQueueContext(ctx, queues.MyTaskQueue) +func WithCustomQueueContext(ctx workflow.Context, q queues.Queue) workflow.Context { + return workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 60 * time.Second, + TaskQueue: q.String(), + }) +} diff --git a/queues/queues.go b/queues/queues.go index 4f66385..0f7180e 100644 --- a/queues/queues.go +++ b/queues/queues.go @@ -40,6 +40,8 @@ type ( // Name gets the name of the queue as string. Name() Name + String() string + // Prefix gets the prefix of the queue as string. Prefix() string @@ -148,6 +150,10 @@ func (q Name) String() string { return string(q) } +func (q *queue) String() string { + return q.name.String() +} + func (q *queue) Name() Name { return q.name }