Skip to content

Commit

Permalink
fix(): fixed workflow id for child (#14)
Browse files Browse the repository at this point in the history
Deprecated: queues.WorkflowSignal, use queue.Signal
  • Loading branch information
debuggerpk authored Oct 5, 2024
1 parent b5ce101 commit 4e859cc
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 81 deletions.
64 changes: 62 additions & 2 deletions queues/defs.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,78 @@
package queues

import (
"encoding/json"
)

type (
// WorkflowSignal is a string alias intended for defining groups of workflow signals.
//
// Depraecated: Use Signal instead.
WorkflowSignal string

// Signal is a string alias intended for defining groups of workflow signals, "register" , "send_welcome_email" etc.
// It ensures consistency and code clarity. The Signal type provides methods for conversion and serialization,
// promoting good developer experience.
Signal string

// Query is a string alias intended for defining groups of workflow queries. We could have created an alias for
// for Signal type, but for some wierd reason, if was causing temporal to panic when marshalling the type to JSON.
Query string
)

func (s WorkflowSignal) String() string {
return string(s)
}

func (s WorkflowSignal) MarshalJSON() ([]byte, error) {
return []byte(s.String()), nil
return json.Marshal(string(s))
}

func (s *WorkflowSignal) UnmarshalJSON(data []byte) error {
*s = WorkflowSignal(data)
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}

*s = WorkflowSignal(str)

return nil
}

func (s Signal) String() string {
return string(s)
}

func (s Signal) MarshalJSON() ([]byte, error) {
return json.Marshal(string(s))
}

func (s *Signal) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}

*s = Signal(str)

return nil
}

func (q Query) String() string {
return string(q)
}

func (q Query) MarshalJSON() ([]byte, error) {
return json.Marshal(string(q))
}

func (q *Query) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}

*q = Query(str)

return nil
}
103 changes: 63 additions & 40 deletions queues/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"go.breu.io/durex/workflows"
wrk "go.breu.io/durex/workflows"
)

type (
Expand All @@ -48,11 +48,11 @@ type (
Prefix() string

// WorkflowID sanitzes the workflow ID given the workflows.Options.
WorkflowID(options workflows.Options) string
WorkflowID(opts wrk.Options) string

// ExecuteWorkflow executes a workflow given the context, workflows.Options, workflow function or function name, and
// optional payload.
// Lets say, we have a queue called "default", we can either pass in the workflow function or the function name.
// optional payload. Lets say, we have a queue called "default", we can either pass in the workflow function or the
// function name.
//
// q := queues.New(queues.WithName("default"), queues.WithClient(client))
// q.ExecuteWorkflow(
Expand All @@ -64,10 +64,11 @@ type (
// WorkflowFn, // or "WorkflowFunctionName"
// payload..., // optional.
// )
ExecuteWorkflow(ctx context.Context, options workflows.Options, fn any, payload ...any) (WorkflowRun, error)
ExecuteWorkflow(ctx context.Context, opts wrk.Options, fn any, payload ...any) (WorkflowRun, error)

// ExecuteChildWorkflow executes a child workflow given the parent workflow context, workflows.Options,
// workflow function or function name and optional payload. It must be executed from within a workflow.
// ExecuteChildWorkflow executes a child workflow given the parent workflow context, workflows.Options, workflow
// function or function name and optional payload. It must be executed from within a workflow.
//
// future, err := q.ExecuteChildWorkflow(
// ctx,
// workflows.NewOptions(
Expand All @@ -78,7 +79,7 @@ type (
// WorkflowFn, // or "WorkflowFunctionName"
// payload..., // optional.
// )
ExecuteChildWorkflow(ctx workflow.Context, options workflows.Options, fn any, payload ...any) (ChildWorkflowFuture, error)
ExecuteChildWorkflow(ctx workflow.Context, opts wrk.Options, fn any, payload ...any) (ChildWorkflowFuture, error)

// SignalWorkflow signals a workflow given the workflow ID, signal name and optional payload.
//
Expand All @@ -94,7 +95,7 @@ type (
// ); err != nil {
// // handle error
// }
SignalWorkflow(ctx context.Context, options workflows.Options, signal WorkflowSignal, payload any) error
SignalWorkflow(ctx context.Context, opts wrk.Options, signal Signal, payload any) error

// SignalWithStartWorkflow signals a workflow given the workflow ID, signal name and optional payload.
//
Expand All @@ -110,9 +111,7 @@ type (
// WorkflowFn, // or "WorkflowFunctionName"
// payload..., // optional.
// )
SignalWithStartWorkflow(
ctx context.Context, options workflows.Options, signal WorkflowSignal, args any, fn any, payload ...any,
) (WorkflowRun, error)
SignalWithStartWorkflow(ctx context.Context, opts wrk.Options, signal Signal, args any, fn any, payload ...any) (WorkflowRun, error)

// SignalExternalWorkflow signals a workflow given the workflow ID, signal name and optional payload.
//
Expand All @@ -126,15 +125,41 @@ type (
// "signal-name",
// payload, // or nil
// )
SignalExternalWorkflow(ctx workflow.Context, options workflows.Options, signal WorkflowSignal, args any) (WorkflowFuture, error)
SignalExternalWorkflow(ctx workflow.Context, opts wrk.Options, signal Signal, args any) (WorkflowFuture, error)

// QueryWorkflow queries a workflow given the workflow ID, query name and optional payload.
QueryWorkflow(ctx context.Context, options workflows.Options, query WorkflowSignal, args ...any) (converter.EncodedValue, error)
// QueryWorkflow queries a workflow given the workflow opts, query name and optional arguments.
//
// result, err := q.QueryWorkflow(
// ctx,
// workflows.NewOptions(
// workflows.WithWorkflowID("my-workflow-id"),
// ),
// Signal("query-name"),
// arg1, arg2, // Optional arguments passed to the query function.
// )
//
// if err != nil {
// // handle error
// }
// // Decode the result.
QueryWorkflow(ctx context.Context, opts wrk.Options, query Query, args ...any) (converter.EncodedValue, error)

// CreateWorker creates a worker against the queue.
// CreateWorker configures the worker for the queue.
//
// This function configures the worker responsible for executing registered workflows and activities. It uses a
// builder pattern, with helper functions prefixed by queues.WithWorkerOption{Option}, where {Option} corresponds to
// a field in Temporal's worker.Option. This allows configuring worker behavior at runtime, such as setting maximum
// concurrent tasks, enabling sessions etc. The worker is a singleton, meaning only one worker can be created per
// queue. Call this function *before* registering workflows and activities (queues.RegisterWorkflow and
// queues.RegisterActivity) to ensure correct association.
//
// q := queues.New(queues.WithName("my-queue"), queues.WithClient(client))
// q.CreateWorker(
// queues.WithWorkerOptionEnableSessionWorker(true), // Enable session worker.
// )
CreateWorker(opts ...WorkerOption)

// Start starts the worker against the queue.
// Start starts the worker against the queue. CreateWorker must be called before calling this function.
Start() error

// Shutdown shuts down the worker against the queue.
Expand Down Expand Up @@ -182,18 +207,18 @@ func (q *queue) Prefix() string {
return q.prefix
}

func (q *queue) WorkflowID(options workflows.Options) string {
func (q *queue) WorkflowID(opts wrk.Options) string {
prefix := ""
if options.IsChild() {
prefix, _ = options.ParentWorkflowID()
if opts.IsChild() {
prefix = opts.ParentWorkflowID()
} else {
prefix = q.Prefix()
}

return fmt.Sprintf("%s.%s", prefix, options.IDSuffix())
return fmt.Sprintf("%s.%s", prefix, opts.IDSuffix())
}

func (q *queue) ExecuteWorkflow(ctx context.Context, opts workflows.Options, fn any, payload ...any) (WorkflowRun, error) {
func (q *queue) ExecuteWorkflow(ctx context.Context, opts wrk.Options, fn any, payload ...any) (WorkflowRun, error) {
if opts.IsChild() {
return nil, ErrChildWorkflowExecutionAttempt
}
Expand All @@ -214,9 +239,9 @@ func (q *queue) ExecuteWorkflow(ctx context.Context, opts workflows.Options, fn
)
}

func (q *queue) ExecuteChildWorkflow(ctx workflow.Context, opts workflows.Options, fn any, payload ...any) (ChildWorkflowFuture, error) {
func (q *queue) ExecuteChildWorkflow(ctx workflow.Context, opts wrk.Options, fn any, payload ...any) (ChildWorkflowFuture, error) {
if !opts.IsChild() {
return nil, workflows.ErrParentNil
return nil, wrk.ErrParentNil
}

copts := workflow.ChildWorkflowOptions{
Expand All @@ -230,7 +255,7 @@ func (q *queue) ExecuteChildWorkflow(ctx workflow.Context, opts workflows.Option
}

// SignalWorkflow signals a workflow given the workflow ID, signal name and optional payload.
func (q *queue) SignalWorkflow(ctx context.Context, opts workflows.Options, signal WorkflowSignal, args any) error {
func (q *queue) SignalWorkflow(ctx context.Context, opts wrk.Options, signal Signal, args any) error {
if q.client == nil {
return ErrClientNil
}
Expand All @@ -243,14 +268,14 @@ func (q *queue) SignalWorkflow(ctx context.Context, opts workflows.Options, sign
}

func (q *queue) SignalWithStartWorkflow(
ctx context.Context, opts workflows.Options, signal WorkflowSignal, args any, fn any, payload ...any,
ctx context.Context, opts wrk.Options, signal Signal, args any, fn any, payload ...any,
) (WorkflowRun, error) {
if q.client == nil {
return nil, ErrClientNil
}

if opts.IsChild() {
return nil, workflows.ErrParentNil
return nil, wrk.ErrParentNil
}

return q.client.SignalWithStartWorkflow(
Expand All @@ -268,30 +293,28 @@ func (q *queue) SignalWithStartWorkflow(
)
}

func (q *queue) SignalExternalWorkflow(
ctx workflow.Context, opts workflows.Options, signal WorkflowSignal, args any,
) (WorkflowFuture, error) {
func (q *queue) SignalExternalWorkflow(ctx workflow.Context, opts wrk.Options, signal Signal, args any) (WorkflowFuture, error) {
if !opts.IsChild() {
return nil, workflows.ErrParentNil
return nil, wrk.ErrParentNil
}

return workflow.SignalExternalWorkflow(ctx, q.WorkflowID(opts), "", signal.String(), args), nil
}

func (q *queue) QueryWorkflow(
ctx context.Context, options workflows.Options, query WorkflowSignal, args ...any,
ctx context.Context, opts wrk.Options, query Query, args ...any,
) (converter.EncodedValue, error) {
if q.client == nil {
return nil, ErrClientNil
}

return q.client.QueryWorkflow(ctx, q.WorkflowID(options), "", query.String(), args...)
return q.client.QueryWorkflow(ctx, q.WorkflowID(opts), "", query.String(), args...)
}

func (q *queue) RetryPolicy(opts workflows.Options) *temporal.RetryPolicy {
func (q *queue) RetryPolicy(opts wrk.Options) *temporal.RetryPolicy {
attempts := opts.MaxAttempts()
if attempts < workflows.RetryForever &&
q.workflowMaxAttempts < workflows.RetryForever &&
if attempts < wrk.RetryForever &&
q.workflowMaxAttempts < wrk.RetryForever &&
q.workflowMaxAttempts > attempts {
attempts = q.workflowMaxAttempts
}
Expand All @@ -301,9 +324,9 @@ func (q *queue) RetryPolicy(opts workflows.Options) *temporal.RetryPolicy {

func (q *queue) CreateWorker(opts ...WorkerOption) {
q.once.Do(func() {
options := NewWorkerOptions(opts...)
opts := NewWorkerOptions(opts...)

q.worker = worker.New(q.client, q.Name().String(), options)
q.worker = worker.New(q.client, q.Name().String(), opts)
})
}

Expand Down Expand Up @@ -356,7 +379,7 @@ func WithClient(c client.Client) QueueOption {
}
}

// New creates a new queue with the given options.
// New creates a new queue with the given opts.
// For a queue named "default", we will defined it as follows:
//
// var DefaultQueue = queue.New(
Expand All @@ -365,7 +388,7 @@ func WithClient(c client.Client) QueueOption {
// queue.WithMaxWorkflowAttempts(1),
// )
func New(opts ...QueueOption) Queue {
q := &queue{workflowMaxAttempts: workflows.RetryForever}
q := &queue{workflowMaxAttempts: wrk.RetryForever}
for _, opt := range opts {
opt(q)
}
Expand Down
2 changes: 1 addition & 1 deletion queues/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *QueueTestSuite) TestExecuteChildWorkflow() {
func (s *QueueTestSuite) TestSignalWorkflow() {
ctx := context.Background()
id := uuid.New()
name := queues.WorkflowSignal("signal")
name := queues.Signal("signal")
opts, _ := workflows.NewOptions(
workflows.WithBlock("signal"),
workflows.WithBlockID(id.String()),
Expand Down
32 changes: 8 additions & 24 deletions workflows/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ const (
type (
// Options defines the interface for creating workflow options.
Options interface {
IsChild() bool // IsChild returns true if the workflow id is a child workflow id.
ParentWorkflowID() (string, error) // ParentWorkflowID returns the parent workflow id.
IDSuffix() string // IDSuffix santizes the suffix of the workflow id and then formats it as a string.
MaxAttempts() int32 // MaxAttempts returns the max attempts for the workflow.
IgnoredErrors() []string // IgnoredErrors returns the list of errors that are ok to ignore.
IsChild() bool // IsChild returns true if the workflow id is a child workflow id.
ParentWorkflowID() string // ParentWorkflowID returns the parent workflow id.
IDSuffix() string // IDSuffix santizes the suffix of the workflow id and then formats it as a string.
MaxAttempts() int32 // MaxAttempts returns the max attempts for the workflow.
IgnoredErrors() []string // IgnoredErrors returns the list of errors that are ok to ignore.
}

// Option sets the specified options.
Expand All @@ -27,8 +27,6 @@ type (
props map[string]string

options struct {
parent_context workflow.Context // The parent workflow context.

ParentID string `json:"parent"` // The parent workflow ID.
Block string `json:"block"` // The block name.
BlockID string `json:"block_id"` // The block identifier.
Expand All @@ -48,14 +46,9 @@ func (w *options) IsChild() bool {
return w.ParentID != ""
}

func (w *options) ID() string {
id := w.IDSuffix()

if w.IsChild() {
return w.ParentID + "." + id
}

return id
// ParentWorkflowID returns the parent workflow id.
func (w *options) ParentWorkflowID() string {
return w.ParentID
}

// IDSuffix sanitizes the suffix and returns it.
Expand All @@ -77,15 +70,6 @@ func (w *options) IDSuffix() string {
return strings.Join(sanitized, ".")
}

// ParentWorkflowID returns the parent workflow id.
func (w *options) ParentWorkflowID() (string, error) {
if w.parent_context == nil {
return "", ErrParentNil
}

return workflow.GetInfo(w.parent_context).WorkflowExecution.ID, nil
}

// MaxAttempts returns the max attempts for the workflow.
func (w *options) MaxAttempts() int32 {
return w.MaximumAttempt
Expand Down
Loading

0 comments on commit 4e859cc

Please sign in to comment.