diff --git a/error.go b/error.go index 566299d1d..878538654 100644 --- a/error.go +++ b/error.go @@ -21,6 +21,8 @@ package temporal import ( + "go.temporal.io/temporal-proto/serviceerror" + "go.temporal.io/temporal/internal" "go.temporal.io/temporal/workflow" ) @@ -54,6 +56,12 @@ func IsCustomError(err error) bool { return ok } +// IsWorkflowExecutionAlreadyStartedError return if the err is a WorkflowExecutionAlreadyStartedError +func IsWorkflowExecutionAlreadyStartedError(err error) bool { + _, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted) + return ok +} + // IsCanceledError return if the err is a CanceledError func IsCanceledError(err error) bool { _, ok := err.(*CanceledError) diff --git a/idls b/idls new file mode 160000 index 000000000..56ca0b5ea --- /dev/null +++ b/idls @@ -0,0 +1 @@ +Subproject commit 56ca0b5ea223d62bd095aca0ad3b0fc0beefa7d6 diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index fef44a79c..2ffc91df7 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1808,7 +1808,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *workflowservice. if <-ctx.Done(); ctx.Err() == context.DeadlineExceeded { return nil, ctx.Err() } - if err != nil { + if err != nil && err != ErrActivityResultPending { ath.logger.Error("Activity error.", zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()), zap.String(tagRunID, t.WorkflowExecution.GetRunId()), diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index a460a2bda..7a991e882 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -460,7 +460,12 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi rootCtx = context.Background() } + workflowTypeLocal := task.params.WorkflowInfo.WorkflowType + ctx := context.WithValue(rootCtx, activityEnvContextKey, &activityEnvironment{ + workflowType: &workflowTypeLocal, + workflowDomain: task.params.WorkflowInfo.Domain, + taskList: task.params.WorkflowInfo.TaskListName, activityType: ActivityType{Name: activityType}, activityID: fmt.Sprintf("%v", task.activityID), workflowExecution: task.params.WorkflowInfo.WorkflowExecution, @@ -501,6 +506,7 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi // this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout deadline = task.expireTime } + ctx, cancel := context.WithDeadline(ctx, deadline) task.Lock() if task.canceled { diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 4404ce56a..6fc11b88d 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -53,6 +53,7 @@ import ( "go.uber.org/zap/zapcore" "go.temporal.io/temporal/internal/common/backoff" + "go.temporal.io/temporal/internal/common/metrics" ) const ( @@ -125,21 +126,24 @@ type ( // Task list name to poll. TaskList string - // Defines how many concurrent poll requests for the task list by this worker. - ConcurrentPollRoutineSize int - // Defines how many concurrent activity executions by this worker. ConcurrentActivityExecutionSize int // Defines rate limiting on number of activity tasks that can be executed per second per worker. WorkerActivitiesPerSecond float64 + // MaxConcurrentActivityPollers is the max number of pollers for activity task list + MaxConcurrentActivityPollers int + // Defines how many concurrent decision task executions by this worker. ConcurrentDecisionTaskExecutionSize int // Defines rate limiting on number of decision tasks that can be executed per second per worker. WorkerDecisionTasksPerSecond float64 + // MaxConcurrentDecisionPollers is the max number of pollers for decision task list + MaxConcurrentDecisionPollers int + // Defines how many concurrent local activity executions by this worker. ConcurrentLocalActivityExecutionSize int @@ -271,7 +275,7 @@ func newWorkflowTaskWorkerInternal(taskHandler WorkflowTaskHandler, service work ensureRequiredParams(¶ms) poller := newWorkflowTaskPoller(taskHandler, service, params) worker := newBaseWorker(baseWorkerOptions{ - pollerCount: params.ConcurrentPollRoutineSize, + pollerCount: params.MaxConcurrentDecisionPollers, pollerRate: defaultPollerRate, maxConcurrentTask: params.ConcurrentDecisionTaskExecutionSize, maxTaskPerSecond: params.WorkerDecisionTasksPerSecond, @@ -365,7 +369,7 @@ func newSessionWorker(service workflowservice.WorkflowServiceClient, params work params.TaskList = sessionEnvironment.GetResourceSpecificTasklist() activityWorker := newActivityWorker(service, params, overrides, env, nil) - params.ConcurrentPollRoutineSize = 1 + params.MaxConcurrentActivityPollers = 1 params.TaskList = creationTasklist creationWorker := newActivityWorker(service, params, overrides, env, sessionEnvironment.GetTokenBucket()) @@ -424,7 +428,7 @@ func newActivityTaskWorker(taskHandler ActivityTaskHandler, service workflowserv base := newBaseWorker( baseWorkerOptions{ - pollerCount: workerParams.ConcurrentPollRoutineSize, + pollerCount: workerParams.MaxConcurrentActivityPollers, pollerRate: defaultPollerRate, maxConcurrentTask: workerParams.ConcurrentActivityExecutionSize, maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond, @@ -1316,9 +1320,7 @@ func extractHistoryFromFile(jsonfileName string, lastEventID int64) (*commonprot return history, nil } -// NewAggregatedWorker returns an instance to manage the workers. Use defaultConcurrentPollRoutineSize (which is 2) as -// poller size. The typical RTT (round-trip time) is below 1ms within data center. And the poll API latency is about 5ms. -// With 2 poller, we could achieve around 300~400 RPS. +// NewAggregatedWorker returns an instance to manage both activity and decision workers func NewAggregatedWorker(client *WorkflowClient, taskList string, options WorkerOptions) *AggregatedWorker { setClientDefaults(client) setWorkerOptionsDefaults(&options) @@ -1331,13 +1333,14 @@ func NewAggregatedWorker(client *WorkflowClient, taskList string, options Worker workerParams := workerExecutionParameters{ DomainName: client.domain, TaskList: taskList, - ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize, ConcurrentActivityExecutionSize: options.MaxConcurrentActivityExecutionSize, WorkerActivitiesPerSecond: options.WorkerActivitiesPerSecond, + MaxConcurrentActivityPollers: options.MaxConcurrentActivityTaskPollers, ConcurrentLocalActivityExecutionSize: options.MaxConcurrentLocalActivityExecutionSize, WorkerLocalActivitiesPerSecond: options.WorkerLocalActivitiesPerSecond, ConcurrentDecisionTaskExecutionSize: options.MaxConcurrentDecisionTaskExecutionSize, WorkerDecisionTasksPerSecond: options.WorkerDecisionTasksPerSecond, + MaxConcurrentDecisionPollers: options.MaxConcurrentDecisionTaskPollers, Identity: client.identity, MetricsScope: client.metricsScope, Logger: options.Logger, @@ -1431,7 +1434,8 @@ func processTestTags(wOptions *WorkerOptions, ep *workerExecutionParameters) { switch key { case workerOptionsConfigConcurrentPollRoutineSize: if size, err := strconv.Atoi(val); err == nil { - ep.ConcurrentPollRoutineSize = size + ep.MaxConcurrentActivityPollers = size + ep.MaxConcurrentDecisionPollers = size } } } @@ -1499,12 +1503,18 @@ func setWorkerOptionsDefaults(options *WorkerOptions) { if options.WorkerActivitiesPerSecond == 0 { options.WorkerActivitiesPerSecond = defaultWorkerActivitiesPerSecond } + if options.MaxConcurrentActivityTaskPollers <= 0 { + options.MaxConcurrentActivityTaskPollers = defaultConcurrentPollRoutineSize + } if options.MaxConcurrentDecisionTaskExecutionSize == 0 { options.MaxConcurrentDecisionTaskExecutionSize = defaultMaxConcurrentTaskExecutionSize } if options.WorkerDecisionTasksPerSecond == 0 { options.WorkerDecisionTasksPerSecond = defaultWorkerTaskExecutionRate } + if options.MaxConcurrentDecisionTaskPollers <= 0 { + options.MaxConcurrentDecisionTaskPollers = defaultConcurrentPollRoutineSize + } if options.MaxConcurrentLocalActivityExecutionSize == 0 { options.MaxConcurrentLocalActivityExecutionSize = defaultMaxConcurrentLocalActivityExecutionSize } @@ -1522,8 +1532,8 @@ func setWorkerOptionsDefaults(options *WorkerOptions) { } } +// setClientDefaults should be needed only in unit tests. func setClientDefaults(client *WorkflowClient) { - // This should be needed only in unit tests. if client.dataConverter == nil { client.dataConverter = getDefaultDataConverter() } @@ -1533,6 +1543,9 @@ func setClientDefaults(client *WorkflowClient) { if client.tracer == nil { client.tracer = opentracing.NoopTracer{} } + if client.metricsScope == nil { + client.metricsScope = metrics.NewTaggedScope(nil) + } } // getTestTags returns the test tags in the context. diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 93beb658c..05a1503e0 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -177,10 +177,11 @@ func (s *InterfacesTestSuite) TestInterface() { domain := "testDomain" // Workflow execution parameters. workflowExecutionParameters := workerExecutionParameters{ - TaskList: "testTaskList", - ConcurrentPollRoutineSize: 4, - Logger: logger, - Tracer: opentracing.NoopTracer{}, + TaskList: "testTaskList", + MaxConcurrentActivityPollers: 4, + MaxConcurrentDecisionPollers: 4, + Logger: logger, + Tracer: opentracing.NoopTracer{}, } domainStatus := enums.DomainStatusRegistered @@ -207,10 +208,11 @@ func (s *InterfacesTestSuite) TestInterface() { // Create activity execution parameters. activityExecutionParameters := workerExecutionParameters{ - TaskList: "testTaskList", - ConcurrentPollRoutineSize: 10, - Logger: logger, - Tracer: opentracing.NoopTracer{}, + TaskList: "testTaskList", + MaxConcurrentActivityPollers: 10, + MaxConcurrentDecisionPollers: 10, + Logger: logger, + Tracer: opentracing.NoopTracer{}, } // Register activity instances and launch the worker. diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index e5acf0088..d6da2aaca 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -31,6 +31,7 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -1172,14 +1173,136 @@ func TestActivityNilArgs(t *testing.T) { require.Equal(t, nilErr, reflectResults[0].Interface()) } +func TestWorkerOptionDefaults(t *testing.T) { + client := &WorkflowClient{} + taskList := "worker-options-tl" + aggWorker := NewAggregatedWorker(client, taskList, WorkerOptions{}) + + decisionWorker := aggWorker.workflowWorker + require.True(t, decisionWorker.executionParameters.Identity != "") + require.NotNil(t, decisionWorker.executionParameters.Logger) + require.NotNil(t, decisionWorker.executionParameters.MetricsScope) + require.Nil(t, decisionWorker.executionParameters.ContextPropagators) + + expected := workerExecutionParameters{ + DomainName: DefaultDomainName, + TaskList: taskList, + MaxConcurrentActivityPollers: defaultConcurrentPollRoutineSize, + MaxConcurrentDecisionPollers: defaultConcurrentPollRoutineSize, + ConcurrentLocalActivityExecutionSize: defaultMaxConcurrentLocalActivityExecutionSize, + ConcurrentActivityExecutionSize: defaultMaxConcurrentActivityExecutionSize, + ConcurrentDecisionTaskExecutionSize: defaultMaxConcurrentTaskExecutionSize, + WorkerActivitiesPerSecond: defaultTaskListActivitiesPerSecond, + WorkerDecisionTasksPerSecond: defaultWorkerTaskExecutionRate, + TaskListActivitiesPerSecond: defaultTaskListActivitiesPerSecond, + WorkerLocalActivitiesPerSecond: defaultWorkerLocalActivitiesPerSecond, + StickyScheduleToStartTimeout: stickyDecisionScheduleToStartTimeoutSeconds * time.Second, + DataConverter: getDefaultDataConverter(), + Tracer: opentracing.NoopTracer{}, + Logger: decisionWorker.executionParameters.Logger, + MetricsScope: decisionWorker.executionParameters.MetricsScope, + Identity: decisionWorker.executionParameters.Identity, + UserContext: decisionWorker.executionParameters.UserContext, + } + + assertWorkerExecutionParamsEqual(t, expected, decisionWorker.executionParameters) + + activityWorker := aggWorker.activityWorker + require.True(t, activityWorker.executionParameters.Identity != "") + require.NotNil(t, activityWorker.executionParameters.Logger) + require.NotNil(t, activityWorker.executionParameters.MetricsScope) + require.Nil(t, activityWorker.executionParameters.ContextPropagators) + assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters) +} + +func TestWorkerOptionNonDefaults(t *testing.T) { + taskList := "worker-options-tl" + + client := &WorkflowClient{ + workflowService: nil, + connectionCloser: nil, + domain: "worker-options-test", + registry: nil, + identity: "143@worker-options-test-1", + dataConverter: &defaultDataConverter{}, + contextPropagators: nil, + tracer: nil, + } + + options := WorkerOptions{ + TaskListActivitiesPerSecond: 8888, + MaxConcurrentSessionExecutionSize: 3333, + MaxConcurrentDecisionTaskExecutionSize: 2222, + MaxConcurrentActivityExecutionSize: 1111, + MaxConcurrentLocalActivityExecutionSize: 101, + MaxConcurrentDecisionTaskPollers: 11, + MaxConcurrentActivityTaskPollers: 12, + WorkerLocalActivitiesPerSecond: 222, + WorkerDecisionTasksPerSecond: 111, + WorkerActivitiesPerSecond: 99, + StickyScheduleToStartTimeout: 555 * time.Minute, + BackgroundActivityContext: context.Background(), + Logger: zap.NewNop(), + } + + aggWorker := NewAggregatedWorker(client, taskList, options) + + decisionWorker := aggWorker.workflowWorker + require.Len(t, decisionWorker.executionParameters.ContextPropagators, 0) + + expected := workerExecutionParameters{ + TaskList: taskList, + MaxConcurrentActivityPollers: options.MaxConcurrentActivityTaskPollers, + MaxConcurrentDecisionPollers: options.MaxConcurrentDecisionTaskPollers, + ConcurrentLocalActivityExecutionSize: options.MaxConcurrentLocalActivityExecutionSize, + ConcurrentActivityExecutionSize: options.MaxConcurrentActivityExecutionSize, + ConcurrentDecisionTaskExecutionSize: options.MaxConcurrentDecisionTaskExecutionSize, + WorkerActivitiesPerSecond: options.WorkerActivitiesPerSecond, + WorkerDecisionTasksPerSecond: options.WorkerDecisionTasksPerSecond, + TaskListActivitiesPerSecond: options.TaskListActivitiesPerSecond, + WorkerLocalActivitiesPerSecond: options.WorkerLocalActivitiesPerSecond, + StickyScheduleToStartTimeout: options.StickyScheduleToStartTimeout, + DataConverter: client.dataConverter, + Tracer: client.tracer, + Logger: options.Logger, + MetricsScope: client.metricsScope, + Identity: client.identity, + } + + assertWorkerExecutionParamsEqual(t, expected, decisionWorker.executionParameters) + + activityWorker := aggWorker.activityWorker + require.Len(t, activityWorker.executionParameters.ContextPropagators, 0) + assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters) +} + +func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParameters, paramsB workerExecutionParameters) { + require.Equal(t, paramsA.TaskList, paramsA.TaskList) + require.Equal(t, paramsA.Identity, paramsB.Identity) + require.Equal(t, paramsA.DataConverter, paramsB.DataConverter) + require.Equal(t, paramsA.Tracer, paramsB.Tracer) + require.Equal(t, paramsA.ConcurrentLocalActivityExecutionSize, paramsB.ConcurrentLocalActivityExecutionSize) + require.Equal(t, paramsA.ConcurrentActivityExecutionSize, paramsB.ConcurrentActivityExecutionSize) + require.Equal(t, paramsA.ConcurrentDecisionTaskExecutionSize, paramsB.ConcurrentDecisionTaskExecutionSize) + require.Equal(t, paramsA.WorkerActivitiesPerSecond, paramsB.WorkerActivitiesPerSecond) + require.Equal(t, paramsA.WorkerDecisionTasksPerSecond, paramsB.WorkerDecisionTasksPerSecond) + require.Equal(t, paramsA.TaskListActivitiesPerSecond, paramsB.TaskListActivitiesPerSecond) + require.Equal(t, paramsA.StickyScheduleToStartTimeout, paramsB.StickyScheduleToStartTimeout) + require.Equal(t, paramsA.MaxConcurrentDecisionPollers, paramsB.MaxConcurrentDecisionPollers) + require.Equal(t, paramsA.MaxConcurrentActivityPollers, paramsB.MaxConcurrentActivityPollers) + require.Equal(t, paramsA.NonDeterministicWorkflowPolicy, paramsB.NonDeterministicWorkflowPolicy) + require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay) + require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution) +} + /* type encodingTest struct { encoding encoding input []interface{} } -var testWorkflowID1 = s.WorkflowExecution{WorkflowId: "testWID", RunId: "runID"} -var testWorkflowID2 = s.WorkflowExecution{WorkflowId: "testWID2", RunId: "runID2"} +var testWorkflowID1 = s.WorkflowExecution{WorkflowId: common.StringPtr("testWID"), RunId: common.StringPtr("runID")} +var testWorkflowID2 = s.WorkflowExecution{WorkflowId: common.StringPtr("testWID2"), RunId: common.StringPtr("runID2")} var thriftEncodingTests = []encodingTest{ {&thriftEncoding{}, []interface{}{&testWorkflowID1}}, {&thriftEncoding{}, []interface{}{&testWorkflowID1, &testWorkflowID2}}, diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 3013ea2ca..9701d4903 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -95,12 +95,12 @@ func (s *WorkersTestSuite) TestWorkflowWorker() { ctx, cancel := context.WithCancel(context.Background()) executionParameters := workerExecutionParameters{ - DomainName: DefaultDomainName, - TaskList: "testTaskList", - ConcurrentPollRoutineSize: 5, - Logger: logger, - UserContext: ctx, - UserContextCancel: cancel, + DomainName: DefaultDomainName, + TaskList: "testTaskList", + MaxConcurrentDecisionPollers: 5, + Logger: logger, + UserContext: ctx, + UserContextCancel: cancel, } overrides := &workerOverrides{workflowTaskHandler: newSampleWorkflowTaskHandler()} workflowWorker := newWorkflowWorkerInternal(s.service, executionParameters, nil, overrides, newRegistry()) @@ -118,10 +118,10 @@ func (s *WorkersTestSuite) TestActivityWorker() { s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.RespondActivityTaskCompletedResponse{}, nil).AnyTimes() executionParameters := workerExecutionParameters{ - DomainName: DefaultDomainName, - TaskList: "testTaskList", - ConcurrentPollRoutineSize: 5, - Logger: logger, + DomainName: DefaultDomainName, + TaskList: "testTaskList", + MaxConcurrentActivityPollers: 5, + Logger: logger, } overrides := &workerOverrides{activityTaskHandler: newSampleActivityTaskHandler()} a := &greeterActivity{} @@ -161,7 +161,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() { executionParameters := workerExecutionParameters{ DomainName: DefaultDomainName, TaskList: "testTaskList", - ConcurrentPollRoutineSize: 5, + MaxConcurrentActivityPollers: 5, ConcurrentActivityExecutionSize: 2, Logger: logger, UserContext: ctx, @@ -193,10 +193,10 @@ func (s *WorkersTestSuite) TestPollForDecisionTask_InternalServiceError() { s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollForDecisionTaskResponse{}, serviceerror.NewInternal("")).AnyTimes() executionParameters := workerExecutionParameters{ - DomainName: DefaultDomainName, - TaskList: "testDecisionTaskList", - ConcurrentPollRoutineSize: 5, - Logger: zap.NewNop(), + DomainName: DefaultDomainName, + TaskList: "testDecisionTaskList", + MaxConcurrentDecisionPollers: 5, + Logger: zap.NewNop(), } overrides := &workerOverrides{workflowTaskHandler: newSampleWorkflowTaskHandler()} workflowWorker := newWorkflowWorkerInternal(s.service, executionParameters, nil, overrides, newRegistry()) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 0eb604a63..1fd4e345d 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -557,7 +557,7 @@ func (env *testWorkflowEnvironmentImpl) executeActivity( func (env *testWorkflowEnvironmentImpl) executeLocalActivity( activityFn interface{}, args ...interface{}, -) (Value, error) { +) (val Value, err error) { params := executeLocalActivityParams{ localActivityOptions: localActivityOptions{ ScheduleToCloseTimeoutSeconds: common.Int32Ceil(env.testTimeout.Seconds()), diff --git a/internal/worker.go b/internal/worker.go index 51c08d98d..9a5c80d10 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -42,7 +42,8 @@ type ( // Notice that the number is represented in float, so that you can set it to less than // 1 if needed. For example, set the number to 0.1 means you want your activity to be executed // once for every 10 seconds. This can be used to protect down stream services from flooding. - // The zero value of this uses the default value. Default: 100k + // The zero value of this uses the default value + // default: 100k WorkerActivitiesPerSecond float64 // Optional: To set the maximum concurrent local activity executions this worker can have. @@ -55,7 +56,8 @@ type ( // Notice that the number is represented in float, so that you can set it to less than // 1 if needed. For example, set the number to 0.1 means you want your local activity to be executed // once for every 10 seconds. This can be used to protect down stream services from flooding. - // The zero value of this uses the default value. Default: 100k + // The zero value of this uses the default value + // default: 100k WorkerLocalActivitiesPerSecond float64 // Optional: Sets the rate limiting on number of activities that can be executed per second. @@ -64,9 +66,16 @@ type ( // Notice that the number is represented in float, so that you can set it to less than // 1 if needed. For example, set the number to 0.1 means you want your activity to be executed // once for every 10 seconds. This can be used to protect down stream services from flooding. - // The zero value of this uses the default value. Default: 100k + // The zero value of this uses the default value. + // default: 100k TaskListActivitiesPerSecond float64 + // Optional: Sets the maximum number of goroutines that will concurrently poll the + // cadence-server to retrieve activity tasks. Changing this value will affect the + // rate at which the worker is able to consume tasks from a task list. + // default: 2 + MaxConcurrentActivityTaskPollers int + // Optional: To set the maximum concurrent decision task executions this worker can have. // The zero value of this uses the default value. // default: defaultMaxConcurrentTaskExecutionSize(1k) @@ -74,13 +83,15 @@ type ( // Optional: Sets the rate limiting on number of decision tasks that can be executed per second per // worker. This can be used to limit resources used by the worker. - // The zero value of this uses the default value. Default: 100k + // The zero value of this uses the default value. + // default: 100k WorkerDecisionTasksPerSecond float64 - // Optional: if the activities need auto heart beating for those activities - // by the framework - // default: false not to heartbeat. - AutoHeartBeat bool + // Optional: Sets the maximum number of goroutines that will concurrently poll the + // cadence-server to retrieve decision tasks. Changing this value will affect the + // rate at which the worker is able to consume tasks from a task list. + // default: 2 + MaxConcurrentDecisionTaskPollers int // Optional: Logger framework can use to log. // default: default logger provided. @@ -102,18 +113,18 @@ type ( DisableActivityWorker bool // Optional: Disable sticky execution. - // default: false // Sticky Execution is to run the decision tasks for one workflow execution on same worker host. This is an // optimization for workflow execution. When sticky execution is enabled, worker keeps the workflow state in // memory. New decision task contains the new history events will be dispatched to the same worker. If this // worker crashes, the sticky decision task will timeout after StickyScheduleToStartTimeout, and temporal server // will clear the stickiness for that workflow execution and automatically reschedule a new decision task that // is available for any worker to pick up and resume the progress. + // default: false DisableStickyExecution bool // Optional: Sticky schedule to start timeout. - // default: 5s // The resolution is seconds. See details about StickyExecution on the comments for DisableStickyExecution. + // default: 5s StickyScheduleToStartTimeout time.Duration // Optional: sets context for activity. The context can be used to pass any configuration to activity diff --git a/internal/workflow.go b/internal/workflow.go index bac9e25dc..b19762794 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -716,8 +716,8 @@ type WorkflowInfo struct { ContinuedExecutionRunID string ParentWorkflowDomain string ParentWorkflowExecution *WorkflowExecution - Memo *commonproto.Memo - SearchAttributes *commonproto.SearchAttributes + Memo *commonproto.Memo // Value can be decoded using data converter (DefaultDataConverter, or custom one if set). + SearchAttributes *commonproto.SearchAttributes // Value can be decoded using DefaultDataConverter. BinaryChecksum string } diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 1194d7f1f..bbcd3bd5e 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -172,7 +172,7 @@ func (t *TestActivityEnvironment) ExecuteActivity(activityFn interface{}, args . // ExecuteLocalActivity executes a local activity. The tested activity will be executed synchronously in the calling goroutinue. // Caller should use Value.Get() to extract strong typed result value. -func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (Value, error) { +func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (val Value, err error) { return t.impl.executeLocalActivity(activityFn, args...) } diff --git a/test/activity_test.go b/test/activity_test.go index bf4d966d0..2b5ede52d 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -22,6 +22,7 @@ package test import ( "context" + "fmt" "strings" "sync" "time" @@ -85,6 +86,21 @@ func (a *Activities) fail(_ context.Context) error { return errFailOnPurpose } +func (a *Activities) InspectActivityInfo(ctx context.Context, domain, taskList, wfType string) error { + a.append("inspectActivityInfo") + info := activity.GetInfo(ctx) + if info.WorkflowDomain != domain { + return fmt.Errorf("expected domainName %v but got %v", domain, info.WorkflowDomain) + } + if info.WorkflowType == nil || info.WorkflowType.Name != wfType { + return fmt.Errorf("expected workflowType %v but got %v", wfType, info.WorkflowType) + } + if info.TaskList != taskList { + return fmt.Errorf("expected taskList %v but got %v", taskList, info.TaskList) + } + return nil +} + func (a *Activities) append(name string) { a.mu.Lock() defer a.mu.Unlock() @@ -129,4 +145,5 @@ func (a *Activities) register(worker worker.Worker) { worker.RegisterActivityWithOptions(a.fail, activity.RegisterOptions{Name: "Fail", DisableAlreadyRegisteredCheck: true}) // Check prefix worker.RegisterActivityWithOptions(a.activities2, activity.RegisterOptions{Name: "Prefix_", DisableAlreadyRegisteredCheck: true}) + worker.RegisterActivityWithOptions(a.InspectActivityInfo, activity.RegisterOptions{Name: "inspectActivityInfo"}) } diff --git a/test/integration_test.go b/test/integration_test.go index 393efa32d..b3e1ac7e2 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -431,6 +431,16 @@ func (ts *IntegrationTestSuite) TestLargeQueryResultError() { ts.Nil(value) } +func (ts *IntegrationTestSuite) TestInspectActivityInfo() { + err := ts.executeWorkflow("test-activity-info", ts.workflows.InspectActivityInfo, nil) + ts.Nil(err) +} + +func (ts *IntegrationTestSuite) TestInspectLocalActivityInfo() { + err := ts.executeWorkflow("test-local-activity-info", ts.workflows.InspectLocalActivityInfo, nil) + ts.Nil(err) +} + func (ts *IntegrationTestSuite) registerDomain() { client, err := client.NewDomainClient(client.Options{HostPort: ts.config.ServiceAddr}) ts.NoError(err) diff --git a/test/workflow_test.go b/test/workflow_test.go index 352fc77a5..9b29a7201 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -421,7 +421,13 @@ func (w *Workflows) ConsistentQueryWorkflow(ctx workflow.Context, delay time.Dur laCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ ScheduleToCloseTimeout: 5 * time.Second, }) - _ = workflow.ExecuteLocalActivity(laCtx, LocalSleep, delay).Get(laCtx, nil) + + workflowInfo := internal.GetWorkflowInfo(laCtx) + if &workflowInfo.WorkflowType == nil { + return errors.New("failed to get work flow type") + } + + workflow.ExecuteLocalActivity(laCtx, LocalSleep, delay).Get(laCtx, nil) queryResult = signalData return nil } @@ -486,6 +492,26 @@ func (w *Workflows) sleep(ctx workflow.Context, d time.Duration) error { return workflow.ExecuteActivity(ctx, "Activities_Sleep", d).Get(ctx, nil) } +func (w *Workflows) InspectActivityInfo(ctx workflow.Context) error { + info := workflow.GetInfo(ctx) + domain := info.Domain + wfType := info.WorkflowType.Name + taskList := info.TaskListName + ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) + return workflow.ExecuteActivity(ctx, "inspectActivityInfo", domain, taskList, wfType).Get(ctx, nil) +} + +func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error { + info := workflow.GetInfo(ctx) + domain := info.Domain + wfType := info.WorkflowType.Name + taskList := info.TaskListName + ctx = workflow.WithLocalActivityOptions(ctx, w.defaultLocalActivityOptions()) + activites := Activities{} + return workflow.ExecuteLocalActivity( + ctx, activites.InspectActivityInfo, domain, taskList, wfType).Get(ctx, nil) +} + func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.Basic) worker.RegisterWorkflow(w.ActivityRetryOnError) @@ -500,6 +526,8 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ChildWorkflowSuccess) worker.RegisterWorkflow(w.ChildWorkflowSuccessWithParentClosePolicyTerminate) worker.RegisterWorkflow(w.ChildWorkflowSuccessWithParentClosePolicyAbandon) + worker.RegisterWorkflow(w.InspectActivityInfo) + worker.RegisterWorkflow(w.InspectLocalActivityInfo) worker.RegisterWorkflow(w.sleep) worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childForMemoAndSearchAttr) @@ -517,6 +545,13 @@ func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions { StartToCloseTimeout: 9 * time.Second, } } + +func (w *Workflows) defaultLocalActivityOptions() workflow.LocalActivityOptions { + return workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: 5 * time.Second, + } +} + func (w *Workflows) defaultActivityOptionsWithRetry() workflow.ActivityOptions { return workflow.ActivityOptions{ ScheduleToStartTimeout: 5 * time.Second,