From ef4c174695c9d786e7d95c29bdab094c091712cc Mon Sep 17 00:00:00 2001 From: Yousuf Jawwad Date: Sun, 6 Oct 2024 03:08:14 +0500 Subject: [PATCH] fix(): fixed workflow id for child Deprecated: queues.WorkflowSignal, use queue.Signal --- queues/defs.go | 64 ++++++++++++++++++++++- queues/queues.go | 103 +++++++++++++++++++++++--------------- queues/queues_test.go | 2 +- workflows/options.go | 32 +++--------- workflows/options_test.go | 21 +++----- 5 files changed, 141 insertions(+), 81 deletions(-) diff --git a/queues/defs.go b/queues/defs.go index e23da3c..51de20e 100644 --- a/queues/defs.go +++ b/queues/defs.go @@ -1,7 +1,23 @@ package queues +import ( + "encoding/json" +) + type ( + // WorkflowSignal is a string alias intended for defining groups of workflow signals. + // + // Depraecated: Use Signal instead. WorkflowSignal string + + // Signal is a string alias intended for defining groups of workflow signals, "register" , "send_welcome_email" etc. + // It ensures consistency and code clarity. The Signal type provides methods for conversion and serialization, + // promoting good developer experience. + Signal string + + // Query is a string alias intended for defining groups of workflow queries. We could have created an alias for + // for Signal type, but for some wierd reason, if was causing temporal to panic when marshalling the type to JSON. + Query string ) func (s WorkflowSignal) String() string { @@ -9,10 +25,54 @@ func (s WorkflowSignal) String() string { } func (s WorkflowSignal) MarshalJSON() ([]byte, error) { - return []byte(s.String()), nil + return json.Marshal(string(s)) } func (s *WorkflowSignal) UnmarshalJSON(data []byte) error { - *s = WorkflowSignal(data) + var str string + if err := json.Unmarshal(data, &str); err != nil { + return err + } + + *s = WorkflowSignal(str) + + return nil +} + +func (s Signal) String() string { + return string(s) +} + +func (s Signal) MarshalJSON() ([]byte, error) { + return json.Marshal(string(s)) +} + +func (s *Signal) UnmarshalJSON(data []byte) error { + var str string + if err := json.Unmarshal(data, &str); err != nil { + return err + } + + *s = Signal(str) + + return nil +} + +func (q Query) String() string { + return string(q) +} + +func (q Query) MarshalJSON() ([]byte, error) { + return json.Marshal(string(q)) +} + +func (q *Query) UnmarshalJSON(data []byte) error { + var str string + if err := json.Unmarshal(data, &str); err != nil { + return err + } + + *q = Query(str) + return nil } diff --git a/queues/queues.go b/queues/queues.go index a96d238..5f735b2 100644 --- a/queues/queues.go +++ b/queues/queues.go @@ -30,7 +30,7 @@ import ( "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" - "go.breu.io/durex/workflows" + wrk "go.breu.io/durex/workflows" ) type ( @@ -48,11 +48,11 @@ type ( Prefix() string // WorkflowID sanitzes the workflow ID given the workflows.Options. - WorkflowID(options workflows.Options) string + WorkflowID(opts wrk.Options) string // ExecuteWorkflow executes a workflow given the context, workflows.Options, workflow function or function name, and - // optional payload. - // Lets say, we have a queue called "default", we can either pass in the workflow function or the function name. + // optional payload. Lets say, we have a queue called "default", we can either pass in the workflow function or the + // function name. // // q := queues.New(queues.WithName("default"), queues.WithClient(client)) // q.ExecuteWorkflow( @@ -64,10 +64,11 @@ type ( // WorkflowFn, // or "WorkflowFunctionName" // payload..., // optional. // ) - ExecuteWorkflow(ctx context.Context, options workflows.Options, fn any, payload ...any) (WorkflowRun, error) + ExecuteWorkflow(ctx context.Context, opts wrk.Options, fn any, payload ...any) (WorkflowRun, error) - // ExecuteChildWorkflow executes a child workflow given the parent workflow context, workflows.Options, - // workflow function or function name and optional payload. It must be executed from within a workflow. + // ExecuteChildWorkflow executes a child workflow given the parent workflow context, workflows.Options, workflow + // function or function name and optional payload. It must be executed from within a workflow. + // // future, err := q.ExecuteChildWorkflow( // ctx, // workflows.NewOptions( @@ -78,7 +79,7 @@ type ( // WorkflowFn, // or "WorkflowFunctionName" // payload..., // optional. // ) - ExecuteChildWorkflow(ctx workflow.Context, options workflows.Options, fn any, payload ...any) (ChildWorkflowFuture, error) + ExecuteChildWorkflow(ctx workflow.Context, opts wrk.Options, fn any, payload ...any) (ChildWorkflowFuture, error) // SignalWorkflow signals a workflow given the workflow ID, signal name and optional payload. // @@ -94,7 +95,7 @@ type ( // ); err != nil { // // handle error // } - SignalWorkflow(ctx context.Context, options workflows.Options, signal WorkflowSignal, payload any) error + SignalWorkflow(ctx context.Context, opts wrk.Options, signal Signal, payload any) error // SignalWithStartWorkflow signals a workflow given the workflow ID, signal name and optional payload. // @@ -110,9 +111,7 @@ type ( // WorkflowFn, // or "WorkflowFunctionName" // payload..., // optional. // ) - SignalWithStartWorkflow( - ctx context.Context, options workflows.Options, signal WorkflowSignal, args any, fn any, payload ...any, - ) (WorkflowRun, error) + SignalWithStartWorkflow(ctx context.Context, opts wrk.Options, signal Signal, args any, fn any, payload ...any) (WorkflowRun, error) // SignalExternalWorkflow signals a workflow given the workflow ID, signal name and optional payload. // @@ -126,15 +125,41 @@ type ( // "signal-name", // payload, // or nil // ) - SignalExternalWorkflow(ctx workflow.Context, options workflows.Options, signal WorkflowSignal, args any) (WorkflowFuture, error) + SignalExternalWorkflow(ctx workflow.Context, opts wrk.Options, signal Signal, args any) (WorkflowFuture, error) - // QueryWorkflow queries a workflow given the workflow ID, query name and optional payload. - QueryWorkflow(ctx context.Context, options workflows.Options, query WorkflowSignal, args ...any) (converter.EncodedValue, error) + // QueryWorkflow queries a workflow given the workflow opts, query name and optional arguments. + // + // result, err := q.QueryWorkflow( + // ctx, + // workflows.NewOptions( + // workflows.WithWorkflowID("my-workflow-id"), + // ), + // Signal("query-name"), + // arg1, arg2, // Optional arguments passed to the query function. + // ) + // + // if err != nil { + // // handle error + // } + // // Decode the result. + QueryWorkflow(ctx context.Context, opts wrk.Options, query Query, args ...any) (converter.EncodedValue, error) - // CreateWorker creates a worker against the queue. + // CreateWorker configures the worker for the queue. + // + // This function configures the worker responsible for executing registered workflows and activities. It uses a + // builder pattern, with helper functions prefixed by queues.WithWorkerOption{Option}, where {Option} corresponds to + // a field in Temporal's worker.Option. This allows configuring worker behavior at runtime, such as setting maximum + // concurrent tasks, enabling sessions etc. The worker is a singleton, meaning only one worker can be created per + // queue. Call this function *before* registering workflows and activities (queues.RegisterWorkflow and + // queues.RegisterActivity) to ensure correct association. + // + // q := queues.New(queues.WithName("my-queue"), queues.WithClient(client)) + // q.CreateWorker( + // queues.WithWorkerOptionEnableSessionWorker(true), // Enable session worker. + // ) CreateWorker(opts ...WorkerOption) - // Start starts the worker against the queue. + // Start starts the worker against the queue. CreateWorker must be called before calling this function. Start() error // Shutdown shuts down the worker against the queue. @@ -182,18 +207,18 @@ func (q *queue) Prefix() string { return q.prefix } -func (q *queue) WorkflowID(options workflows.Options) string { +func (q *queue) WorkflowID(opts wrk.Options) string { prefix := "" - if options.IsChild() { - prefix, _ = options.ParentWorkflowID() + if opts.IsChild() { + prefix = opts.ParentWorkflowID() } else { prefix = q.Prefix() } - return fmt.Sprintf("%s.%s", prefix, options.IDSuffix()) + return fmt.Sprintf("%s.%s", prefix, opts.IDSuffix()) } -func (q *queue) ExecuteWorkflow(ctx context.Context, opts workflows.Options, fn any, payload ...any) (WorkflowRun, error) { +func (q *queue) ExecuteWorkflow(ctx context.Context, opts wrk.Options, fn any, payload ...any) (WorkflowRun, error) { if opts.IsChild() { return nil, ErrChildWorkflowExecutionAttempt } @@ -214,9 +239,9 @@ func (q *queue) ExecuteWorkflow(ctx context.Context, opts workflows.Options, fn ) } -func (q *queue) ExecuteChildWorkflow(ctx workflow.Context, opts workflows.Options, fn any, payload ...any) (ChildWorkflowFuture, error) { +func (q *queue) ExecuteChildWorkflow(ctx workflow.Context, opts wrk.Options, fn any, payload ...any) (ChildWorkflowFuture, error) { if !opts.IsChild() { - return nil, workflows.ErrParentNil + return nil, wrk.ErrParentNil } copts := workflow.ChildWorkflowOptions{ @@ -230,7 +255,7 @@ func (q *queue) ExecuteChildWorkflow(ctx workflow.Context, opts workflows.Option } // SignalWorkflow signals a workflow given the workflow ID, signal name and optional payload. -func (q *queue) SignalWorkflow(ctx context.Context, opts workflows.Options, signal WorkflowSignal, args any) error { +func (q *queue) SignalWorkflow(ctx context.Context, opts wrk.Options, signal Signal, args any) error { if q.client == nil { return ErrClientNil } @@ -243,14 +268,14 @@ func (q *queue) SignalWorkflow(ctx context.Context, opts workflows.Options, sign } func (q *queue) SignalWithStartWorkflow( - ctx context.Context, opts workflows.Options, signal WorkflowSignal, args any, fn any, payload ...any, + ctx context.Context, opts wrk.Options, signal Signal, args any, fn any, payload ...any, ) (WorkflowRun, error) { if q.client == nil { return nil, ErrClientNil } if opts.IsChild() { - return nil, workflows.ErrParentNil + return nil, wrk.ErrParentNil } return q.client.SignalWithStartWorkflow( @@ -268,30 +293,28 @@ func (q *queue) SignalWithStartWorkflow( ) } -func (q *queue) SignalExternalWorkflow( - ctx workflow.Context, opts workflows.Options, signal WorkflowSignal, args any, -) (WorkflowFuture, error) { +func (q *queue) SignalExternalWorkflow(ctx workflow.Context, opts wrk.Options, signal Signal, args any) (WorkflowFuture, error) { if !opts.IsChild() { - return nil, workflows.ErrParentNil + return nil, wrk.ErrParentNil } return workflow.SignalExternalWorkflow(ctx, q.WorkflowID(opts), "", signal.String(), args), nil } func (q *queue) QueryWorkflow( - ctx context.Context, options workflows.Options, query WorkflowSignal, args ...any, + ctx context.Context, opts wrk.Options, query Query, args ...any, ) (converter.EncodedValue, error) { if q.client == nil { return nil, ErrClientNil } - return q.client.QueryWorkflow(ctx, q.WorkflowID(options), "", query.String(), args...) + return q.client.QueryWorkflow(ctx, q.WorkflowID(opts), "", query.String(), args...) } -func (q *queue) RetryPolicy(opts workflows.Options) *temporal.RetryPolicy { +func (q *queue) RetryPolicy(opts wrk.Options) *temporal.RetryPolicy { attempts := opts.MaxAttempts() - if attempts < workflows.RetryForever && - q.workflowMaxAttempts < workflows.RetryForever && + if attempts < wrk.RetryForever && + q.workflowMaxAttempts < wrk.RetryForever && q.workflowMaxAttempts > attempts { attempts = q.workflowMaxAttempts } @@ -301,9 +324,9 @@ func (q *queue) RetryPolicy(opts workflows.Options) *temporal.RetryPolicy { func (q *queue) CreateWorker(opts ...WorkerOption) { q.once.Do(func() { - options := NewWorkerOptions(opts...) + opts := NewWorkerOptions(opts...) - q.worker = worker.New(q.client, q.Name().String(), options) + q.worker = worker.New(q.client, q.Name().String(), opts) }) } @@ -356,7 +379,7 @@ func WithClient(c client.Client) QueueOption { } } -// New creates a new queue with the given options. +// New creates a new queue with the given opts. // For a queue named "default", we will defined it as follows: // // var DefaultQueue = queue.New( @@ -365,7 +388,7 @@ func WithClient(c client.Client) QueueOption { // queue.WithMaxWorkflowAttempts(1), // ) func New(opts ...QueueOption) Queue { - q := &queue{workflowMaxAttempts: workflows.RetryForever} + q := &queue{workflowMaxAttempts: wrk.RetryForever} for _, opt := range opts { opt(q) } diff --git a/queues/queues_test.go b/queues/queues_test.go index 4428af2..1779e84 100644 --- a/queues/queues_test.go +++ b/queues/queues_test.go @@ -128,7 +128,7 @@ func (s *QueueTestSuite) TestExecuteChildWorkflow() { func (s *QueueTestSuite) TestSignalWorkflow() { ctx := context.Background() id := uuid.New() - name := queues.WorkflowSignal("signal") + name := queues.Signal("signal") opts, _ := workflows.NewOptions( workflows.WithBlock("signal"), workflows.WithBlockID(id.String()), diff --git a/workflows/options.go b/workflows/options.go index 7a7f70f..b8917f9 100644 --- a/workflows/options.go +++ b/workflows/options.go @@ -13,11 +13,11 @@ const ( type ( // Options defines the interface for creating workflow options. Options interface { - IsChild() bool // IsChild returns true if the workflow id is a child workflow id. - ParentWorkflowID() (string, error) // ParentWorkflowID returns the parent workflow id. - IDSuffix() string // IDSuffix santizes the suffix of the workflow id and then formats it as a string. - MaxAttempts() int32 // MaxAttempts returns the max attempts for the workflow. - IgnoredErrors() []string // IgnoredErrors returns the list of errors that are ok to ignore. + IsChild() bool // IsChild returns true if the workflow id is a child workflow id. + ParentWorkflowID() string // ParentWorkflowID returns the parent workflow id. + IDSuffix() string // IDSuffix santizes the suffix of the workflow id and then formats it as a string. + MaxAttempts() int32 // MaxAttempts returns the max attempts for the workflow. + IgnoredErrors() []string // IgnoredErrors returns the list of errors that are ok to ignore. } // Option sets the specified options. @@ -27,8 +27,6 @@ type ( props map[string]string options struct { - parent_context workflow.Context // The parent workflow context. - ParentID string `json:"parent"` // The parent workflow ID. Block string `json:"block"` // The block name. BlockID string `json:"block_id"` // The block identifier. @@ -48,14 +46,9 @@ func (w *options) IsChild() bool { return w.ParentID != "" } -func (w *options) ID() string { - id := w.IDSuffix() - - if w.IsChild() { - return w.ParentID + "." + id - } - - return id +// ParentWorkflowID returns the parent workflow id. +func (w *options) ParentWorkflowID() string { + return w.ParentID } // IDSuffix sanitizes the suffix and returns it. @@ -77,15 +70,6 @@ func (w *options) IDSuffix() string { return strings.Join(sanitized, ".") } -// ParentWorkflowID returns the parent workflow id. -func (w *options) ParentWorkflowID() (string, error) { - if w.parent_context == nil { - return "", ErrParentNil - } - - return workflow.GetInfo(w.parent_context).WorkflowExecution.ID, nil -} - // MaxAttempts returns the max attempts for the workflow. func (w *options) MaxAttempts() int32 { return w.MaximumAttempt diff --git a/workflows/options_test.go b/workflows/options_test.go index f09b65c..494a429 100644 --- a/workflows/options_test.go +++ b/workflows/options_test.go @@ -13,12 +13,11 @@ func TestWorkflowMod(t *testing.T) { workflows.WithMod("mod"), ) - parent, err := workflow.ParentWorkflowID() + parent := workflow.ParentWorkflowID() suffix := workflow.IDSuffix() assert.Equal(t, "mod", suffix) assert.Equal(t, "", parent) - assert.ErrorIs(t, err, workflows.ErrParentNil) } func TestWorkflowModWithID(t *testing.T) { @@ -27,12 +26,11 @@ func TestWorkflowModWithID(t *testing.T) { workflows.WithModID("modid"), ) - parent, err := workflow.ParentWorkflowID() + parent := workflow.ParentWorkflowID() suffix := workflow.IDSuffix() assert.Equal(t, "mod.modid", suffix) assert.Equal(t, "", parent) - assert.ErrorIs(t, err, workflows.ErrParentNil) } func TestWorkflowElement(t *testing.T) { @@ -40,12 +38,11 @@ func TestWorkflowElement(t *testing.T) { workflows.WithElement("elm"), ) - parent, err := workflow.ParentWorkflowID() + parent := workflow.ParentWorkflowID() suffix := workflow.IDSuffix() assert.Equal(t, "elm", suffix) assert.Equal(t, "", parent) - assert.ErrorIs(t, err, workflows.ErrParentNil) } func TestWorkflowElementWithID(t *testing.T) { @@ -54,12 +51,11 @@ func TestWorkflowElementWithID(t *testing.T) { workflows.WithElementID("elmid"), ) - parent, err := workflow.ParentWorkflowID() + parent := workflow.ParentWorkflowID() suffix := workflow.IDSuffix() assert.Equal(t, "elm.elmid", suffix) assert.Equal(t, "", parent) - assert.ErrorIs(t, err, workflows.ErrParentNil) } func TestWorkflowBlock(t *testing.T) { @@ -67,12 +63,11 @@ func TestWorkflowBlock(t *testing.T) { workflows.WithBlock("block"), ) - parent, err := workflow.ParentWorkflowID() + parent := workflow.ParentWorkflowID() suffix := workflow.IDSuffix() assert.Equal(t, "block", suffix) assert.Equal(t, "", parent) - assert.ErrorIs(t, err, workflows.ErrParentNil) } func TestWorkflowBlockWithID(t *testing.T) { @@ -81,12 +76,11 @@ func TestWorkflowBlockWithID(t *testing.T) { workflows.WithBlockID("blockid"), ) - parent, err := workflow.ParentWorkflowID() + parent := workflow.ParentWorkflowID() suffix := workflow.IDSuffix() assert.Equal(t, "block.blockid", suffix) assert.Equal(t, "", parent) - assert.ErrorIs(t, err, workflows.ErrParentNil) } func TestWithProp(t *testing.T) { @@ -100,12 +94,11 @@ func TestWithProp(t *testing.T) { workflows.WithProp("prop", "value"), ) - parent, err := workflow.ParentWorkflowID() + parent := workflow.ParentWorkflowID() suffix := workflow.IDSuffix() assert.Equal(t, "block.blockid.elm.elmid.mod.modid.prop.value", suffix) assert.Equal(t, "", parent) - assert.ErrorIs(t, err, workflows.ErrParentNil) } func TestWithPropMultiple(t *testing.T) { workflow, _ := workflows.NewOptions(