Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial protos for priority #513

Merged
merged 13 commits into from
Jan 18, 2025
6 changes: 6 additions & 0 deletions temporal/api/command/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ message ScheduleActivityTaskCommandAttributes {
// If this is set, the activity would be assigned to the Build ID of the workflow. Otherwise,
// Assignment rules of the activity's Task Queue will be used to determine the Build ID.
bool use_workflow_build_id = 13;
// Priority metadata. If this message is not present, or any fields are not
// present, they inherit the values from the workflow.
temporal.api.common.v1.Priority priority = 14;
}

message RequestCancelActivityTaskCommandAttributes {
Expand Down Expand Up @@ -226,6 +229,9 @@ message StartChildWorkflowExecutionCommandAttributes {
// If this is set, the child workflow inherits the Build ID of the parent. Otherwise, the assignment
// rules of the child's Task Queue will be used to independently assign a Build ID to it.
bool inherit_build_id = 17;
// Priority metadata. If this message is not present, or any fields are not
// present, they inherit the values from the workflow.
temporal.api.common.v1.Priority priority = 18;
}

message ProtocolMessageCommandAttributes {
Expand Down
111 changes: 111 additions & 0 deletions temporal/api/common/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,114 @@ message Link {
BatchJob batch_job = 2;
}
}

// Priority contains metadata that controls relative ordering of task processing
// when tasks are backed up in a queue. Initially, Priority will be used in
// matching (workflow and activity) task queues. Later it may be used in history
// task queues and in rate limiting decisions.
//
// Priority is attached to workflows, activities, and Nexus tasks. By default,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and Nexus tasks

I don't see that in this PR, is that maybe coming later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I decided we shouldn't do anything with nexus pending more design discussion

// activities inherit Priority from the workflow that created them, but may
// override a subset of fields when an activity is started or modified.
//
// Despite being named "Priority", this message also contains fields that
// control "fairness" mechanisms.
//
// For all fields, the field not present or equal to zero/empty string means to
// inherit the value from the calling workflow. If there is no calling workflow,
// then use the default value.
//
// Note that for all fields other than fairness_key, the not-present/zero value
// is invalid, but for fairness_key, the empty string is a valid key. This means
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Note that for all fields other than fairness_key, the not-present/zero value
// is invalid, but for fairness_key, the empty string is a valid key. This means
// Note that for all fields other than fairness_key, the not-present/zero value
// is invalid when returned from the server, but for fairness_key, the empty string is a valid key. This means

I was a bit confused seeing absent/zero as invalid here and the paragraph above talking about absent/zero fields

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that quite captures my meaning. It's more about commands, actually: say you start an activity (or child wf) and want to override the priority, but not the fairness key. I'm imagining the sdk would attach a Priority message that includes a non-zero priority, and a zero (empty string) fairness key. The server would interpret that as: override the priority, inherit the fairness key from the workflow.

That is, you can override any individual field(s) by setting it to a non-zero value. That works well in all cases except one: you can't override the fairness key to the empty string. I think that limitation is fine, since that's a pretty weird thing to want to do: if you're using fairness keys at all, you want to set it explicitly in all cases and not mix the default (empty string) with other values.

Copy link
Member

@cretz cretz Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is, you can override any individual field(s) by setting it to a non-zero valu

So you can provide this object as partial not-present/zero values to inherit them? The statement "for all fields other than fairness_key, the not-present/zero value is invalid" seems to say I have to provide every field but fairness key each time I make this object (like when putting on a command and only wanting to set priority and inherit the rest)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "the zero value is invalid", I mean that zero is not meaningful if interpreted as a value, and therefore we can be absolutely sure that the intention was "default/inherit". Not that it's not acceptable to send this message with missing/zero values. On the contrary, it's always acceptable to leave out the whole message or any subset of fields. (It has to be so for backwards compatibility)

For fairness key, the zero value is meaningful if interpreted as a value, but I'm saying we don't interpret it that way, we interpret it as "inherit".

There may be some confusion here because in some contexts (schedule activity command) there is a "parent" to inherit from, and in other contexts (start workflow execution) there is no "parent", so you can't "inherit", you just get the default values. The same message is used in both cases though, since it's basically the same thing, it's just that the "parent" is the default values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "the zero value is invalid", I mean that zero is not meaningful if interpreted as a value

Ah, I misinterpreted "invalid" to mean "wouldn't pass validation" and would therefore error, meaning it's an invalid thing for SDK side to ever set (or leave unset).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll reword

// that a workflow with a non-empty-string fairness key can't override the
// fairness key of an activity to the empty string, it will be interpreted as
// "inherit the workflow's key".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make fairness_key optional to avoid that ambiguity

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server (and go sdk) are using proto3 which can't distinguish empty string from not present, so it wouldn't really help. We can move to the new proto editions eventually but I don't think it's happening any time soon.

Copy link
Member

@Sushisource Sushisource Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be totally possible in proto3, you'll get a *string instead of just string.

Of course I tried that in the API Go repo and it poops because of some lame generator limitation:

temporal/api/workflowservice/v1/request_response.proto: is a proto3 file that contains optional fields, but code generator protoc-gen-grpc-gateway hasn't been updated to support optional fields in proto3. Please ask the owner of this code generator to support proto3 optional.

So, fine, but, it's not a proto3 limitation: https://protobuf.dev/reference/go/go-generated/#singular-scalar-proto3

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, true.

It looks like grpc-gateway has been fixed (https://www.github.com/grpc-ecosystem/grpc-gateway/pull/1951), we just need to upgrade.

We can do it later, though, so I'd rather not block this on upgrading that.

//
// The overall semantics of Priority are:
// 1. First, consider "priority": higher priority (lower number) goes first.
// 2. Next, consider fairness: try to dispatch tasks for different fairness keys
// in proportion to their weight.
// 3. Finally, tasks may be ordered by an additional ordering key.
//
// Applications may use any subset of mechanisms that are useful to them and
// leave the other fields to use default values.
//
// Not all queues in the system may support the "full" semantics of all priority
// fields. (Currently only support in matching task queues is planned.)
message Priority {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if there's a better name for this than Priority since that's just one aspect of this message.
Maybe something like SchedulingPolicy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I went through a few names, and I agree it's confusing that "priority" the name of the message refers to a larger concept than "priority" the field. But I also want a concise name here. "PriorityFairnessMetadata" is accurate but too long.

SchedulingPolicy suggests something slightly different to me. This isn't a "policy", it's metadata to inform prioritization decisions. Let's keep coming up with ideas though.

Copy link
Member

@cretz cretz Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also be TaskDispatchStrategy or something similarly obtuse heh. But I'm ok with "priority" (though I do wonder if it needs to be qualified, see my other comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want a short simple name, not an obtuse one. In the standup today we discussed this and one popular option was to keep this named "Priority" and change the name of the field to "priority key" to differentiate it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this needs a qualifier since it's in the common package, e.g. TaskPriority, or if it should move to the taskqueue package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial application will be in matching task queues, but we can eventually use this in other contexts, for example attaching priorities to any api to influence load shedding at capacity, or attaching priorities to signals or timers (which do turn into tasks at some point but conceptually aren't really just tasks).

Copy link
Member

@cretz cretz Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my concern is pedantic, but so often these assumptions of generic reuse don't materialize due to some minor mismatch need for the other use case. But can keep it as generically named for all concepts of Temporal "priority" ever if we want.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a conversation yesterday about how this can fit into the current flow control work and it seems promising, so I'm really hoping we can get some reuse here. We can reconsider at the feature branch merge.

// Priority is a positive integer from 1 to n, where smaller integers
// correspond to higher priorities (tasks run sooner). In general, tasks in
// a queue should be processed in close to priority order, although small
// deviations are possible.
//
// The maximum priority value (minimum priority) is determined by server
// configuration, and defaults to 5.
//
// If priority is not present (or zero), then the effective priority will be
// the default priority, which is is calculated by (min+max)/2. With the
// default max of 5, and min of 1, that comes out to 3.
int32 priority = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know opinions differ on this matter (thanks, Go) but, I'd be for a uint here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.. my rule is only use uint when you're talking about bit strings and not interpreting the value as an integer. So I want to use signed int here (and in code).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider a float, heh. Though I am ok with integers. But agree, unlike Rust and some low-level langs, in proto and most others it's "int unless you have a really good reason not to".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely cannot be a float. The whole point is that there is a very limited number of options (so we can basically create a queue per value). Otherwise the whole setup of priority over fairness over ordering doesn't work.


// Fairness key is a short string that's used as a key for a fairness
// balancing mechanism. It may correspond to a tenant id, or to a fixed
// string like "high" or "low". The default is the empty string.
//
// The fairness mechanism attempts to dispatch tasks for a given key in
// proportion to its weight. For example, using a thousand distinct tenant
// ids, each with a weight of 1.0 (the default) will result in each tenant
// getting a roughly equal share of task dispatch throughput.
//
// (Note: this does not imply equal share of worker capacity! Fairness
// decisions are made only at dispatch time based on queue statistics, not
// current worker load.)
//
// As another example, using keys "high" and "low" with weight 9.0 and 1.0
// respectively will prefer dispatching "high" tasks over "low" tasks at a
// 9:1 ratio, while allowing either key to use all worker capacity if the
// other is not present.
//
// All fairness mechanisms, including rate limits, are best-effort and
// probabilistic. The results may not match what a "perfect" algorithm with
// infinite resources would produce. The more unique keys are used, the less
// accurate the results will be.
//
// Fairness keys are limited to 64 bytes.
string fairness_key = 2;

// Fairness weight for a task can come from multiple sources for
// flexibility. From highest to lowest precedence:
// 1. Weights for a small set of keys can be overridden in task queue
// configuration with an API.
// 2. It can be attached to the workflow/activity/task in this field.
// 3. The default weight of 1.0 will be used.
// Note that if the weight for a key is attached in this field, for best
// results, the same weight should be used for the same key for a reasonable
// amount of time (minutes). It may change, but it may take some time for
// the change to be reflected.
//
// The recommended range of usable weights is [1e-3, 1e3].
dnr marked this conversation as resolved.
Show resolved Hide resolved
float fairness_weight = 3;

// The fairness mechanism can also enforce rate limits per fairness key.
// Rate limits are specified in tasks dispatched per second.
// As with weights, rate limits can come from different sources:
// 1. Rate limits for a small set of keys can be overridden in task queue
// configuration with an API.
// 2. It can be attached to the workflow/activity/task in this field.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both this bullet and the same one above, I'm not sure I understand. The weight/limit "coming from" being attached to a task doesn't really make sense to me since I would think of it being attached to a task after it's been determined from some other source, since tasks are an output, not an input (except to workers, but, workers aren't doing this stuff).

Is it meant to mean that it's attached to a task at some earlier point within server, like when being forwarded between partitions or something? Would be good to elaborate a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's supposed to be "attached to the workflow" or "attached to the activity". If I just remove "/task" is that clearer?

I added "/task" because it could eventually be attached to nexus tasks and other things, but maybe we just shouldn't even mention that at this point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem we're trying to solve is:

  • There's no limit on cardinality of fairness keys.
  • We're willing to store overrides for only a limited number (e.g. 100) fairness keys in task queue configuration.
  • Users may want to set a distinct weight or rate limit on more than 100 keys.

So we allow just including the weight+rate limit in the priority message itself, essentially using workflow/activity/task metadata to store more than 100 keys, except they're distributed around the system so it's scalable. We don't promise to remember and act on an unlimited number of distinct values, but we can try to act on the ones that we see, as we see them, as best we can.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think removing task makes it more clear. Or just elaborate a bit to say that the value originates from the workflow/activition/operation invocation and thus ends up attached to the task.

// 3. The "default" rate limit for keys that are not overridden can be set
// in task queue configuration also.
// 4. Otherwise, a very high rate limit will be applied.
float fairness_rate_limit = 4;

// Ordering key is a positive integer from 1 to MaxInt64. After priority and
// fairness mechanisms are applied, tasks will be finally ordered by
// ordering_key. Note that fine-grained values are allowed here, as opposed
// to priority.
//
// For example, applications might use the start time of a workflow (in
// seconds since the unix epoch) as ordering key for the workflow and all
// activities in it. This will have the effect of prioritizing activities of
// workflows that were started earlier, encouring completing older workflows
// over making progress in newer ones.
int64 ordering_key = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, would consider calling this ordering_value or something as _key has a map-key feeling akin to fairness_key

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that, though "value" feels too generic. This basically corresponds to the concept of "clustering key" in Cassandra, which uses "key", so it feels okay to me.

}
7 changes: 7 additions & 0 deletions temporal/api/history/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ message WorkflowExecutionStartedEventAttributes {
string inherited_build_id = 32;
// Versioning override applied to this workflow when it was started.
temporal.api.workflow.v1.VersioningOverride versioning_override = 33;
// Priority metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you please be consistent with your use of punctuation?

Suggested change
// Priority metadata
// Priority metadata.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rule I use (very consistently) is: comments that contain full sentences get periods, comments that are only sentence fragments don't.

I didn't just make this up, it basically matches the recommendation here, for example: https://google.github.io/styleguide/go/decisions#comment-sentences

However, I can use a different rule if you like.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this is the rule I follow quite religiously in code and is a good one, it differentiates the "phrases" from the multi-sentence paragraphs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to prefer to use punctuation and proper capitalization in all doc strings. It's more consistent and prevents cases where the author adds more sentences later and forgets to add punctuation to the initial comment.

I switched from y'all's convention to this one but I'm obviously not blocking the PR for my personal preference.
Wondering what others think.

temporal.api.common.v1.Priority priority = 34;
}

message WorkflowExecutionCompletedEventAttributes {
Expand Down Expand Up @@ -333,6 +335,9 @@ message ActivityTaskScheduledEventAttributes {
// If this is set, the activity would be assigned to the Build ID of the workflow. Otherwise,
// Assignment rules of the activity's Task Queue will be used to determine the Build ID.
bool use_workflow_build_id = 13;
// Priority metadata. If this message is not present, or any fields are not
// present, they inherit the values from the workflow.
temporal.api.common.v1.Priority priority = 14;
}

message ActivityTaskStartedEventAttributes {
Expand Down Expand Up @@ -640,6 +645,8 @@ message StartChildWorkflowExecutionInitiatedEventAttributes {
// If this is set, the child workflow inherits the Build ID of the parent. Otherwise, the assignment
// rules of the child's Task Queue will be used to independently assign a Build ID to it.
bool inherit_build_id = 19;
// Priority metadata
temporal.api.common.v1.Priority priority = 20;
}

message StartChildWorkflowExecutionFailedEventAttributes {
Expand Down
8 changes: 8 additions & 0 deletions temporal/api/workflow/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ message WorkflowExecutionInfo {
// be versioned or unversioned, depending on `versioning_info.behavior` and `versioning_info.versioning_override`.
// Experimental. Versioning info is experimental and might change in the future.
WorkflowExecutionVersioningInfo versioning_info = 22;

// Priority metadata
temporal.api.common.v1.Priority priority = 23;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm, this is intentionally in visibility as opposed to putting in WorkflowExecutionExtendedInfo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. It was intentional but I'm not totally confident. I do think it can be valuable to have this info available through visibility, but I'm not sure of all the implications

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some discussion we tentatively agreed to keep this here

}

// Holds all the extra information about workflow execution that is not part of Visibility.
Expand Down Expand Up @@ -242,6 +245,9 @@ message PendingActivityInfo {
// The deployment this activity was dispatched to most recently. Present only if the activity
// was dispatched to a versioned worker.
temporal.api.deployment.v1.Deployment last_deployment = 20;

// Priority metadata
temporal.api.common.v1.Priority priority = 21;
}

message PendingChildExecutionInfo {
Expand Down Expand Up @@ -320,6 +326,8 @@ message NewWorkflowExecutionInfo {
// If set, takes precedence over the Versioning Behavior sent by the SDK on Workflow Task completion.
// To unset the override after the workflow is running, use UpdateWorkflowExecutionOptions.
VersioningOverride versioning_override = 15;
// Priority metadata
temporal.api.common.v1.Priority priority = 16;
}

// CallbackInfo contains the state of an attached workflow callback.
Expand Down
6 changes: 6 additions & 0 deletions temporal/api/workflowservice/v1/request_response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ message StartWorkflowExecutionRequest {
// If set, takes precedence over the Versioning Behavior sent by the SDK on Workflow Task completion.
// To unset the override after the workflow is running, use UpdateWorkflowExecutionOptions.
temporal.api.workflow.v1.VersioningOverride versioning_override = 25;
// Priority metadata
temporal.api.common.v1.Priority priority = 26;
}

message StartWorkflowExecutionResponse {
Expand Down Expand Up @@ -478,6 +480,8 @@ message PollActivityTaskQueueResponse {
// (or not) during activity scheduling. The service can override the provided one if some
// values are not specified or exceed configured system limits.
temporal.api.common.v1.RetryPolicy retry_policy = 17;
// Priority metadata
temporal.api.common.v1.Priority priority = 18;
Comment on lines +483 to +484
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it not also need to be on poll WFT response? Or is it being in the workflow started event sufficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that was my thought, the sdk can track the priority metadata of the workflow from the started event (plus workflow properties modified events when we add it there later)

}

message RecordActivityTaskHeartbeatRequest {
Expand Down Expand Up @@ -751,6 +755,8 @@ message SignalWithStartWorkflowExecutionRequest {
// If set, takes precedence over the Versioning Behavior sent by the SDK on Workflow Task completion.
// To unset the override after the workflow is running, use UpdateWorkflowExecutionOptions.
temporal.api.workflow.v1.VersioningOverride versioning_override = 25;
// Priority metadata
temporal.api.common.v1.Priority priority = 26;
}

message SignalWithStartWorkflowExecutionResponse {
Expand Down
Loading