diff --git a/internal/activity.go b/internal/activity.go index dca59692f..f4d9a20c8 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -165,6 +165,9 @@ type ( // // NOTE: Experimental Summary string + + // TODO + Priority *commonpb.Priority } // LocalActivityOptions stores local activity specific parameters that will be stored inside of a context. diff --git a/internal/client.go b/internal/client.go index 05371a0d7..c4c882fa2 100644 --- a/internal/client.go +++ b/internal/client.go @@ -774,6 +774,9 @@ type ( // NOTE: Experimental VersioningOverride VersioningOverride + // Priority - Optional priority and fairness settings for workflow. + Priority *commonpb.Priority + // request ID. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. requestID string // workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation]. diff --git a/internal/internal_activity.go b/internal/internal_activity.go index b4b8f6be9..99121b444 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -74,6 +74,7 @@ type ( DisableEagerExecution bool VersioningIntent VersioningIntent Summary string + Priority *commonpb.Priority } // ExecuteLocalActivityOptions options for executing a local activity diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index a284cb653..ae850c912 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -593,6 +593,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow( attributes.WorkflowIdReusePolicy = params.WorkflowIDReusePolicy attributes.ParentClosePolicy = params.ParentClosePolicy attributes.RetryPolicy = params.RetryPolicy + attributes.Priority = params.Priority attributes.Header = params.Header attributes.Memo = memo attributes.SearchAttributes = searchAttr @@ -758,6 +759,7 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivityPar scheduleTaskAttr.RequestEagerExecution = !parameters.DisableEagerExecution scheduleTaskAttr.UseWorkflowBuildId = determineInheritBuildIdFlagForCommand( parameters.VersioningIntent, wc.workflowInfo.TaskQueueName, parameters.TaskQueueName) + scheduleTaskAttr.Priority = parameters.Priority startMetadata, err := buildUserMetadata(parameters.Summary, "", wc.dataConverter) if err != nil { diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 62349900b..9033c2d11 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -218,6 +218,7 @@ type ( WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy DataConverter converter.DataConverter RetryPolicy *commonpb.RetryPolicy + Priority *commonpb.Priority CronSchedule string ContextPropagators []ContextPropagator Memo map[string]interface{} diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 1101bff59..1b84b0055 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -740,11 +740,12 @@ func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflo // QueryWorkflow queries a given workflow execution // workflowID and queryType are required, other parameters are optional. -// - workflow ID of the workflow. -// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. -// - taskQueue can be default(empty string). If empty string then it will pick the taskQueue of the running execution of that workflow ID. -// - queryType is the type of the query. -// - args... are the optional query parameters. +// - workflow ID of the workflow. +// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. +// - taskQueue can be default(empty string). If empty string then it will pick the taskQueue of the running execution of that workflow ID. +// - queryType is the type of the query. +// - args... are the optional query parameters. +// // The errors it can return: // - serviceerror.InvalidArgument // - serviceerror.Internal @@ -943,8 +944,9 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request // DescribeTaskQueue returns information about the target taskqueue, right now this API returns the // pollers which polled this taskqueue in last few minutes. -// - taskqueue name of taskqueue -// - taskqueueType type of taskqueue, can be workflow or activity +// - taskqueue name of taskqueue +// - taskqueueType type of taskqueue, can be workflow or activity +// // The errors it can return: // - serviceerror.InvalidArgument // - serviceerror.Internal @@ -1684,6 +1686,7 @@ func (w *workflowClientInterceptor) createStartWorkflowRequest( CompletionCallbacks: in.Options.callbacks, Links: in.Options.links, VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride), + Priority: in.Options.Priority, } startRequest.UserMetadata, err = buildUserMetadata(in.Options.StaticSummary, in.Options.StaticDetails, dataConverter) @@ -2056,6 +2059,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow( WorkflowIdConflictPolicy: in.Options.WorkflowIDConflictPolicy, Header: header, VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride), + Priority: in.Options.Priority, } if in.Options.StartDelay != 0 { diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 134c7b26a..2b11d84f4 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -2155,6 +2155,7 @@ func newTestActivityTask(workflowID, runID, workflowTypeName, namespace string, }, WorkflowNamespace: namespace, Header: attr.GetHeader(), + Priority: attr.Priority, } return task } diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index b0f559533..dc8831323 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -241,6 +241,7 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context, Memo: options.Memo, CronSchedule: options.CronSchedule, RetryPolicy: convertToPBRetryPolicy(options.RetryPolicy), + Priority: options.Priority, }, }, func(result *commonpb.Payloads, wfErr error) { ncb := callback.GetNexus() diff --git a/internal/workflow.go b/internal/workflow.go index 5b0758919..76df51be2 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -434,6 +434,9 @@ type ( // // NOTE: Experimental StaticDetails string + + // TODO + Priority *commonpb.Priority } // RegisterWorkflowOptions consists of options for registering a workflow @@ -1768,6 +1771,7 @@ func GetChildWorkflowOptions(ctx Context) ChildWorkflowOptions { WaitForCancellation: opts.WaitForCancellation, WorkflowIDReusePolicy: opts.WorkflowIDReusePolicy, RetryPolicy: convertFromPBRetryPolicy(opts.RetryPolicy), + Priority: opts.Priority, CronSchedule: opts.CronSchedule, Memo: opts.Memo, SearchAttributes: opts.SearchAttributes, @@ -2338,6 +2342,7 @@ func WithActivityOptions(ctx Context, options ActivityOptions) Context { eap.RetryPolicy = convertToPBRetryPolicy(options.RetryPolicy) eap.DisableEagerExecution = options.DisableEagerExecution eap.VersioningIntent = options.VersioningIntent + eap.Priority = options.Priority eap.Summary = options.Summary return ctx1 } @@ -2401,6 +2406,7 @@ func GetActivityOptions(ctx Context) ActivityOptions { RetryPolicy: convertFromPBRetryPolicy(opts.RetryPolicy), DisableEagerExecution: opts.DisableEagerExecution, VersioningIntent: opts.VersioningIntent, + Priority: opts.Priority, Summary: opts.Summary, } }