Skip to content

Commit

Permalink
Add priority annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Jan 28, 2025
1 parent e7a505f commit bd0897e
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 7 deletions.
3 changes: 3 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ type (
//
// NOTE: Experimental
Summary string

// TODO
Priority *commonpb.Priority
}

// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
Expand Down
3 changes: 3 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,9 @@ type (
// NOTE: Experimental
VersioningOverride VersioningOverride

// Priority - Optional priority and fairness settings for workflow.
Priority *commonpb.Priority

// request ID. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
requestID string
// workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
Expand Down
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type (
DisableEagerExecution bool
VersioningIntent VersioningIntent
Summary string
Priority *commonpb.Priority
}

// ExecuteLocalActivityOptions options for executing a local activity
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
attributes.WorkflowIdReusePolicy = params.WorkflowIDReusePolicy
attributes.ParentClosePolicy = params.ParentClosePolicy
attributes.RetryPolicy = params.RetryPolicy
attributes.Priority = params.Priority
attributes.Header = params.Header
attributes.Memo = memo
attributes.SearchAttributes = searchAttr
Expand Down Expand Up @@ -758,6 +759,7 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivityPar
scheduleTaskAttr.RequestEagerExecution = !parameters.DisableEagerExecution
scheduleTaskAttr.UseWorkflowBuildId = determineInheritBuildIdFlagForCommand(
parameters.VersioningIntent, wc.workflowInfo.TaskQueueName, parameters.TaskQueueName)
scheduleTaskAttr.Priority = parameters.Priority

startMetadata, err := buildUserMetadata(parameters.Summary, "", wc.dataConverter)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ type (
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy
DataConverter converter.DataConverter
RetryPolicy *commonpb.RetryPolicy
Priority *commonpb.Priority
CronSchedule string
ContextPropagators []ContextPropagator
Memo map[string]interface{}
Expand Down
18 changes: 11 additions & 7 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,11 +740,12 @@ func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflo

// QueryWorkflow queries a given workflow execution
// workflowID and queryType are required, other parameters are optional.
// - workflow ID of the workflow.
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
// - taskQueue can be default(empty string). If empty string then it will pick the taskQueue of the running execution of that workflow ID.
// - queryType is the type of the query.
// - args... are the optional query parameters.
// - workflow ID of the workflow.
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
// - taskQueue can be default(empty string). If empty string then it will pick the taskQueue of the running execution of that workflow ID.
// - queryType is the type of the query.
// - args... are the optional query parameters.
//
// The errors it can return:
// - serviceerror.InvalidArgument
// - serviceerror.Internal
Expand Down Expand Up @@ -943,8 +944,9 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request

// DescribeTaskQueue returns information about the target taskqueue, right now this API returns the
// pollers which polled this taskqueue in last few minutes.
// - taskqueue name of taskqueue
// - taskqueueType type of taskqueue, can be workflow or activity
// - taskqueue name of taskqueue
// - taskqueueType type of taskqueue, can be workflow or activity
//
// The errors it can return:
// - serviceerror.InvalidArgument
// - serviceerror.Internal
Expand Down Expand Up @@ -1684,6 +1686,7 @@ func (w *workflowClientInterceptor) createStartWorkflowRequest(
CompletionCallbacks: in.Options.callbacks,
Links: in.Options.links,
VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride),
Priority: in.Options.Priority,
}

startRequest.UserMetadata, err = buildUserMetadata(in.Options.StaticSummary, in.Options.StaticDetails, dataConverter)
Expand Down Expand Up @@ -2056,6 +2059,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow(
WorkflowIdConflictPolicy: in.Options.WorkflowIDConflictPolicy,
Header: header,
VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride),
Priority: in.Options.Priority,
}

if in.Options.StartDelay != 0 {
Expand Down
1 change: 1 addition & 0 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2155,6 +2155,7 @@ func newTestActivityTask(workflowID, runID, workflowTypeName, namespace string,
},
WorkflowNamespace: namespace,
Header: attr.GetHeader(),
Priority: attr.Priority,
}
return task
}
Expand Down
1 change: 1 addition & 0 deletions internal/nexus_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
Memo: options.Memo,
CronSchedule: options.CronSchedule,
RetryPolicy: convertToPBRetryPolicy(options.RetryPolicy),
Priority: options.Priority,
},
}, func(result *commonpb.Payloads, wfErr error) {
ncb := callback.GetNexus()
Expand Down
6 changes: 6 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ type (
//
// NOTE: Experimental
StaticDetails string

// TODO
Priority *commonpb.Priority
}

// RegisterWorkflowOptions consists of options for registering a workflow
Expand Down Expand Up @@ -1768,6 +1771,7 @@ func GetChildWorkflowOptions(ctx Context) ChildWorkflowOptions {
WaitForCancellation: opts.WaitForCancellation,
WorkflowIDReusePolicy: opts.WorkflowIDReusePolicy,
RetryPolicy: convertFromPBRetryPolicy(opts.RetryPolicy),
Priority: opts.Priority,
CronSchedule: opts.CronSchedule,
Memo: opts.Memo,
SearchAttributes: opts.SearchAttributes,
Expand Down Expand Up @@ -2338,6 +2342,7 @@ func WithActivityOptions(ctx Context, options ActivityOptions) Context {
eap.RetryPolicy = convertToPBRetryPolicy(options.RetryPolicy)
eap.DisableEagerExecution = options.DisableEagerExecution
eap.VersioningIntent = options.VersioningIntent
eap.Priority = options.Priority
eap.Summary = options.Summary
return ctx1
}
Expand Down Expand Up @@ -2401,6 +2406,7 @@ func GetActivityOptions(ctx Context) ActivityOptions {
RetryPolicy: convertFromPBRetryPolicy(opts.RetryPolicy),
DisableEagerExecution: opts.DisableEagerExecution,
VersioningIntent: opts.VersioningIntent,
Priority: opts.Priority,
Summary: opts.Summary,
}
}
Expand Down

0 comments on commit bd0897e

Please sign in to comment.