Skip to content

Commit

Permalink
chore(): updated structures and supporting structures
Browse files Browse the repository at this point in the history
  • Loading branch information
debuggerpk committed Oct 4, 2024
1 parent bd57b08 commit e8c892c
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 88 deletions.
1 change: 1 addition & 0 deletions dispatch/defaults.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package dispatch provides helper functions for configuring workflow.Context objects with default activity options.
package dispatch

import (
Expand Down
2 changes: 2 additions & 0 deletions queues/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ var (
// ErrClientNil is returned when the temporal client is nil.
ErrClientNil = errors.New("client is nil")

ErrWorkerNil = errors.New("worker is nil")

// ErrChildWorkflowExecutionAttempt is returned when attempting to execute a child workflow without the parent.
ErrChildWorkflowExecutionAttempt = errors.New("attempting to execute child workflow directly. use ExecuteWorkflow instead")

Expand Down
40 changes: 32 additions & 8 deletions queues/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package queues
import (
"context"
"fmt"
"sync"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -127,7 +128,7 @@ type (
SignalExternalWorkflow(ctx workflow.Context, options workflows.Options, signal WorkflowSignal, args any) (WorkflowFuture, error)

// CreateWorker creates a worker against the queue.
CreateWorker(opts ...WorkerOption) worker.Worker
CreateWorker(opts ...WorkerOption)
}

// QueueOption is the option for a queue.
Expand All @@ -143,6 +144,9 @@ type (
workflowMaxAttempts int32 // The maximum number of attempts for a workflow.

client client.Client // The temporal client.

once sync.Once // The sync.Once for the queue.
worker worker.Worker // The temporal worker.
}
)

Expand All @@ -163,14 +167,14 @@ func (q *queue) Prefix() string {
}

func (q *queue) WorkflowID(options workflows.Options) string {
pfix := ""
prefix := ""
if options.IsChild() {
pfix, _ = options.ParentWorkflowID()
prefix, _ = options.ParentWorkflowID()
} else {
pfix = q.Prefix()
prefix = q.Prefix()
}

return fmt.Sprintf("%s.%s", pfix, options.IDSuffix())
return fmt.Sprintf("%s.%s", prefix, options.IDSuffix())
}

func (q *queue) ExecuteWorkflow(ctx context.Context, opts workflows.Options, fn any, payload ...any) (WorkflowRun, error) {
Expand Down Expand Up @@ -269,10 +273,30 @@ func (q *queue) RetryPolicy(opts workflows.Options) *temporal.RetryPolicy {
return &temporal.RetryPolicy{MaximumAttempts: attempts, NonRetryableErrorTypes: opts.IgnoredErrors()}
}

func (q *queue) CreateWorker(opts ...WorkerOption) worker.Worker {
options := NewWorkerOptions(opts...)
func (q *queue) CreateWorker(opts ...WorkerOption) {
q.once.Do(func() {
options := NewWorkerOptions(opts...)

q.worker = worker.New(q.client, q.Name().String(), options)
})
}

func (q *queue) Start(ctx context.Context) error {
if q.worker == nil {
return ErrWorkerNil
}

return q.worker.Start()
}

func (q *queue) Stop(ctx context.Context) error {
if q.worker == nil {
return ErrWorkerNil
}

q.worker.Stop()

return worker.New(q.client, q.Name().String(), options)
return nil
}

// WithName sets the queue name and the prefix for the workflow ID.
Expand Down
141 changes: 66 additions & 75 deletions workflows/options.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,3 @@
// Copyright (c) 2023 Breu Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

package workflows

import (
Expand Down Expand Up @@ -45,33 +26,43 @@ type (
// props defines the interface for creating id properties.
props map[string]string

// optionProvider defines the interface for creating workflow options.
options struct {
parent workflow.Context

block string
blockID string
elm string
elmID string
mod string
modID string
props props // props is a map of id properties.
propOrder []string // propOrder is the order in which the properties are added.

maxattempts int32
ignorederrors []string // ingorederrors is a list of errors that are ok to ignore.
parent_context workflow.Context // The parent workflow context.

ParentID string `json:"parent"` // The parent workflow ID.
Block string `json:"block"` // The block name.
BlockID string `json:"block_id"` // The block identifier.
Element string `json:"element"` // The element name.
ElementID string `json:"element_id"` // The element identifier.
Mod string `json:"mod"` // The modifier name.
ModID string `json:"mod_id"` // The modifier identifier.
Props props `json:"props"` // The properties associated with the workflow.
Order []string `json:"order"` // The order of execution for the workflow elements.
MaximumAttempt int32 `json:"maximum_attempt"` // The maximum number of attempts for the workflow.
IgnoreErrors []string `json:"ignore_errors"` // A list of errors to ignore during the workflow execution.
}
)

// IsChild returns true if the workflow id is a child workflow id.
func (w *options) IsChild() bool {
return w.parent != nil
return w.ParentID != ""
}

func (w *options) ID() string {
id := w.IDSuffix()

if w.IsChild() {
return w.ParentID + "." + id
}

return id
}

// IDSuffix sanitizes the suffix and returns it.
func (w *options) IDSuffix() string {
parts := []string{w.block, w.blockID, w.elm, w.elmID, w.mod, w.modID}
for _, key := range w.propOrder {
parts = append(parts, key, w.props[key])
parts := []string{w.Block, w.BlockID, w.Element, w.ElementID, w.Mod, w.ModID}
for _, key := range w.Order {
parts = append(parts, key, w.Props[key])
}

sanitized := make([]string, 0)
Expand All @@ -86,40 +77,41 @@ func (w *options) IDSuffix() string {
return strings.Join(sanitized, ".")
}

// ParentWorkflowID returns the parent workflow id.
func (w *options) ParentWorkflowID() (string, error) {
if w.parent == nil {
if w.parent_context == nil {
return "", ErrParentNil
}

return workflow.GetInfo(w.parent).WorkflowExecution.ID, nil
return workflow.GetInfo(w.parent_context).WorkflowExecution.ID, nil
}

// MaxAttempts returns the max attempts for the workflow.
func (w *options) MaxAttempts() int32 {
return w.maxattempts
return w.MaximumAttempt
}

// IgnoredErrors returns the list of errors that are ok to ignore.
func (w *options) IgnoredErrors() []string {
return w.ignorederrors
return w.IgnoreErrors
}

// WithParent sets the parent workflow context.
func WithParent(parent workflow.Context) Option {
return func(o Options) error {
o.(*options).parent = parent
o.(*options).ParentID = workflow.GetInfo(parent).WorkflowExecution.ID
return nil
}
}

// WithBlock sets the block name.
func WithBlock(val string) Option {
return func(o Options) error {
if o.(*options).block != "" {
return NewDuplicateIDPropError("block", o.(*options).block, val)
if o.(*options).Block != "" {
return NewDuplicateIDPropError("block", o.(*options).Block, val)
}

o.(*options).block = format(val)
o.(*options).Block = sanitize(val)

return nil
}
Expand All @@ -128,11 +120,11 @@ func WithBlock(val string) Option {
// WithBlockID sets the block value.
func WithBlockID(val string) Option {
return func(o Options) error {
if o.(*options).blockID != "" {
return NewDuplicateIDPropError("blockID", o.(*options).blockID, val)
if o.(*options).BlockID != "" {
return NewDuplicateIDPropError("blockID", o.(*options).BlockID, val)
}

o.(*options).blockID = format(val)
o.(*options).BlockID = sanitize(val)

return nil
}
Expand All @@ -141,11 +133,11 @@ func WithBlockID(val string) Option {
// WithElement sets the element name.
func WithElement(val string) Option {
return func(o Options) error {
if o.(*options).elm != "" {
return NewDuplicateIDPropError("element", o.(*options).elm, val)
if o.(*options).Element != "" {
return NewDuplicateIDPropError("element", o.(*options).Element, val)
}

o.(*options).elm = format(val)
o.(*options).Element = sanitize(val)

return nil
}
Expand All @@ -154,11 +146,11 @@ func WithElement(val string) Option {
// WithElementID sets the element value.
func WithElementID(val string) Option {
return func(o Options) error {
if o.(*options).elmID != "" {
return NewDuplicateIDPropError("element id", o.(*options).elmID, val)
if o.(*options).ElementID != "" {
return NewDuplicateIDPropError("element id", o.(*options).ElementID, val)
}

o.(*options).elmID = format(val)
o.(*options).ElementID = sanitize(val)

return nil
}
Expand All @@ -167,11 +159,11 @@ func WithElementID(val string) Option {
// WithMod sets the modifier name.
func WithMod(val string) Option {
return func(o Options) error {
if o.(*options).mod != "" {
return NewDuplicateIDPropError("modifier", o.(*options).mod, val)
if o.(*options).Mod != "" {
return NewDuplicateIDPropError("modifier", o.(*options).Mod, val)
}

o.(*options).mod = format(val)
o.(*options).Mod = sanitize(val)

return nil
}
Expand All @@ -180,11 +172,11 @@ func WithMod(val string) Option {
// WithModID sets the modifier value.
func WithModID(val string) Option {
return func(o Options) error {
if o.(*options).modID != "" {
return NewDuplicateIDPropError("modifier id", o.(*options).modID, val)
if o.(*options).ModID != "" {
return NewDuplicateIDPropError("modifier id", o.(*options).ModID, val)
}

o.(*options).modID = format(val)
o.(*options).ModID = sanitize(val)

return nil
}
Expand All @@ -193,8 +185,8 @@ func WithModID(val string) Option {
// WithProp sets the prop given a key & value.
func WithProp(key, val string) Option {
return func(o Options) error {
o.(*options).propOrder = append(o.(*options).propOrder, key)
o.(*options).props[format(key)] = format(val)
o.(*options).Order = append(o.(*options).Order, key)
o.(*options).Props[sanitize(key)] = sanitize(val)

return nil
}
Expand All @@ -203,29 +195,28 @@ func WithProp(key, val string) Option {
// WithMaxAttempts sets the max attempts for the workflow.
func WithMaxAttempts(attempts int32) Option {
return func(o Options) error {
o.(*options).maxattempts = attempts
o.(*options).MaximumAttempt = attempts
return nil
}
}

// WithIgnoredError adds an error to the list of errors that are ok to ignore for the workflow.
func WithIgnoredError(err string) Option {
return func(o Options) error {
o.(*options).ignorederrors = append(o.(*options).ignorederrors, err)
o.(*options).IgnoreErrors = append(o.(*options).IgnoreErrors, err)
return nil
}
}

// NewOptions sets workflow options required to run a workflow like workflow id, max attempts, etc.
//
// The idempotent workflow ID Sometimes we need to signal the workflow from a completely disconnected
// part of the application. For us, it is important to arrive at the same workflow ID regardless of the conditions.
// We try to follow the block, element, modifier pattern popularized by advocates of mantainable CSS. For more info,
// https://getbem.com.
// The idempotent workflow ID Sometimes we need to signal the workflow from a completely disconnected part of the
// application. For us, it is important to arrive at the same workflow ID regardless of the conditions. We try to follow
// the block, element, modifier pattern popularized by advocates of maintainable CSS. For more info,
// https://getbem.com/.
//
// Example:
// For the block github with installation id 123, the element being the repository with id 456, and the modifier being the
// pull request with id 789, we would call
// Example: For the block github with installation id 123, the element being the repository with id 456, and the
// modifier being the pull request with id 789, we would call
//
// opts := NewOptions(
// WithBlock("github"),
Expand All @@ -249,10 +240,10 @@ func NewOptions(opts ...Option) (Options, error) {
)

w := &options{
props: make(props),
propOrder: make([]string, 0),
maxattempts: RetryForever,
ignorederrors: make([]string, 0),
Props: make(props),
Order: make([]string, 0),
MaximumAttempt: RetryForever,
IgnoreErrors: make([]string, 0),
}

for _, opt := range opts {
Expand Down
10 changes: 5 additions & 5 deletions workflows/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import (
)

var (
uuidrxstr = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" // taken from github.com/go-playground/validator
uuid = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" // taken from github.com/go-playground/validator
)

var (
uuidrx = regexp.MustCompile(uuidrxstr)
re_uuid = regexp.MustCompile(uuid)
)

// format sanitizes a string and returns it.
func format(s string) string {
if uuidrx.MatchString(s) {
// sanitize sanitizes a string and returns it.
func sanitize(s string) string {
if re_uuid.MatchString(s) {
return s
}

Expand Down

0 comments on commit e8c892c

Please sign in to comment.