Skip to content

Commit

Permalink
Don't replay commands from non-completed task (#1750)
Browse files Browse the repository at this point in the history
* Do not run replay on commands that are a part of a non-completed task

* Remove completedTask, don't need 2 variables to track

* trying something new

* New integration test passes. Updated index for where we should be running replay check

* Add more complicated test, fix replay partial history logic by breaking early, so we don't process events not in history yet

* Don't try to fix unrelated test

* Fix merge conflict fail, remove stale test debugging lines

* Fix test name

* Use unique workflow IDs when child workflow scenario gets called multiple times,
  • Loading branch information
yuandrew authored Feb 3, 2025
1 parent bfd12ac commit 8f05d01
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 1 deletion.
24 changes: 23 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,17 +1018,20 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
var replayOutbox []outboxEntry
var replayCommands []*commandpb.Command
var respondEvents []*historypb.HistoryEvent
var partialHistory bool

taskMessages := workflowTask.task.GetMessages()
skipReplayCheck := w.skipReplayCheck()
isInReplayer := IsReplayNamespace(w.wth.namespace)
shouldForceReplayCheck := func() bool {
isInReplayer := IsReplayNamespace(w.wth.namespace)
// If we are in the replayer we should always check the history replay, even if the workflow is completed
// Skip if the workflow panicked to avoid potentially breaking old histories
_, wfPanicked := w.err.(*workflowPanicError)
return !wfPanicked && isInReplayer
}

curReplayCmdsIndex := -1

metricsHandler := w.wth.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName()))
start := time.Now()
// This is set to nil once recorded
Expand All @@ -1051,6 +1054,17 @@ ProcessEvents:
binaryChecksum := nextTask.binaryChecksum
nextTaskBuildId := nextTask.buildID
admittedUpdates := nextTask.admittedMsgs

// Peak ahead to confirm there are no more events
isLastWFTForPartialWFE := len(reorderedEvents) > 0 &&
reorderedEvents[len(reorderedEvents)-1].EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED &&
len(reorderedHistory.next) == 0 &&
isInReplayer
if isLastWFTForPartialWFE {
partialHistory = true
break ProcessEvents
}

// Check if we are replaying so we know if we should use the messages in the WFT or the history
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
var msgs *eventMsgIndex
Expand Down Expand Up @@ -1092,6 +1106,10 @@ ProcessEvents:
if len(reorderedEvents) == 0 {
break ProcessEvents
}
// Since replayCommands updates a loop early, keep track of index before the
// early update to handle replaying incomplete WFE
curReplayCmdsIndex = len(replayCommands)

if binaryChecksum == "" {
w.workflowInfo.BinaryChecksum = w.wth.workerBuildID
} else {
Expand Down Expand Up @@ -1196,6 +1214,10 @@ ProcessEvents:
}
}

if partialHistory && curReplayCmdsIndex != -1 {
replayCommands = replayCommands[:curReplayCmdsIndex]
}

if metricsTimer != nil {
metricsTimer.Record(time.Since(start))
metricsTimer = nil
Expand Down
21 changes: 21 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,27 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory() {
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_IncompleteWorkflowExecution() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{
WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflow"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()),
}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(3),
}

history := &historypb.History{Events: testEvents}
logger := getLogger()
replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{})
require.NoError(s.T(), err)
replayer.RegisterWorkflow(testReplayWorkflow)
err = replayer.ReplayWorkflowHistory(logger, history)
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
Expand Down
43 changes: 43 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7024,3 +7024,46 @@ func (c *coroutineCountingWorkflowOutboundInterceptor) Go(
f(ctx)
})
}

func (ts *IntegrationTestSuite) TestPartialHistoryReplayFuzzer() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Run the workflow
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions("test-partial-history-replay-fuzzer"), ts.workflows.CommandsFuzz)
ts.NotNil(run)
ts.NoError(err)
ts.NoError(run.Get(ctx, nil))

// Obtain history
var history historypb.History
iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false,
enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter.HasNext() {
event, err := iter.Next()
ts.NoError(err)
history.Events = append(history.Events, event)
}

var startedPoints []int
for i, event := range history.Events {
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
startedPoints = append(startedPoints, i)
}
}
startedPoints = append(startedPoints, len(history.Events)-1)

// Replay partial history, cutting off at each WFT_STARTED event
for i := len(startedPoints) - 1; i >= 0; i-- {
point := startedPoints[i]
history.Events = history.Events[:point+1]

replayer := worker.NewWorkflowReplayer()

ts.NoError(err)
replayer.RegisterWorkflow(ts.workflows.CommandsFuzz)
replayer.RegisterWorkflow(ts.workflows.childWorkflowWaitOnSignal)
ts.NoError(replayer.ReplayWorkflowHistory(nil, &history))
}
}
150 changes: 150 additions & 0 deletions test/replaytests/partial-replay-non-command-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2025-01-21T21:13:17.763980Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
"taskId": "1048587",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "TripWorkflow"
},
"taskQueue": {
"name": "recovery",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"input": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg=="
},
"data": "MA=="
}
]
},
"workflowExecutionTimeout": "0s",
"workflowRunTimeout": "0s",
"workflowTaskTimeout": "10s",
"originalExecutionRunId": "7360b8c8-735b-4364-a950-9f8bb78c04e5",
"identity": "78486@Andrews-MacBook-Pro.local@",
"firstExecutionRunId": "7360b8c8-735b-4364-a950-9f8bb78c04e5",
"attempt": 1,
"firstWorkflowTaskBackoff": "0s",
"header": {},
"workflowId": "trip_workflow"
}
},
{
"eventId": "2",
"eventTime": "2025-01-21T21:13:17.764040Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048588",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "recovery",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2025-01-21T21:13:17.766282Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1048593",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "78326@Andrews-MacBook-Pro.local@",
"requestId": "e116305f-6b36-414a-ac33-a1ca9c9a1640",
"historySizeBytes": "279",
"workerVersion": {
"buildId": "0f02752b442ba36079c7735a5ea5e1ee"
}
}
},
{
"eventId": "4",
"eventTime": "2025-01-21T21:13:17.768731Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1048597",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "78326@Andrews-MacBook-Pro.local@",
"workerVersion": {
"buildId": "0f02752b442ba36079c7735a5ea5e1ee"
},
"sdkMetadata": {
"langUsedFlags": [
3
],
"sdkName": "temporal-go",
"sdkVersion": "1.31.0"
},
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2025-01-21T21:13:40.639292Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
"taskId": "1048600",
"workflowExecutionSignaledEventAttributes": {
"signalName": "trip_event",
"input": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg=="
},
"data": "eyJJRCI6IiIsIlRvdGFsIjoxMH0="
}
]
},
"identity": "78651@Andrews-MacBook-Pro.local@",
"header": {}
}
},
{
"eventId": "6",
"eventTime": "2025-01-21T21:13:40.639294Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048601",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "Andrews-MacBook-Pro.local:1bee34bb-8c2b-4738-84b5-25f257233211",
"kind": "TASK_QUEUE_KIND_STICKY",
"normalName": "recovery"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "7",
"eventTime": "2025-01-21T21:13:45.641420Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT",
"taskId": "1048605",
"workflowTaskTimedOutEventAttributes": {
"scheduledEventId": "6",
"timeoutType": "TIMEOUT_TYPE_SCHEDULE_TO_START"
}
},
{
"eventId": "8",
"eventTime": "2025-01-21T21:13:45.641428Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048606",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "recovery",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
}
]
}
9 changes: 9 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,15 @@ func (s *replayTestSuite) TestSelectorNonBlocking() {
require.NoError(s.T(), err)
}

func (s *replayTestSuite) TestPartialReplayNonCommandEvent() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(TripWorkflow)
// Verify we can replay partial history that has ended on a non-command event
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "partial-replay-non-command-event.json")
s.NoError(err)
require.NoError(s.T(), err)
}

type captureConverter struct {
converter.DataConverter
toPayloads []interface{}
Expand Down
20 changes: 20 additions & 0 deletions test/replaytests/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,3 +656,23 @@ func SelectorBlockingDefaultActivity(ctx context.Context, value string) (string,
logger.Info("Activity", "value", value)
return value + " was logged!", nil
}

func TripWorkflow(ctx workflow.Context, tripCounter int) error {
logger := workflow.GetLogger(ctx)
workflowID := workflow.GetInfo(ctx).WorkflowExecution.ID
logger.Info("Trip Workflow Started for User.",
"User", workflowID,
"TripCounter", tripCounter)

// TripCh to wait on trip completed event signals
tripCh := workflow.GetSignalChannel(ctx, "trip_event")
for i := 0; i < 10; i++ {
var trip int
tripCh.Receive(ctx, &trip)
logger.Info("Trip complete event received.", "Total", trip)
tripCounter++
}

logger.Info("Starting a new run.", "TripCounter", tripCounter)
return workflow.NewContinueAsNewError(ctx, "TripWorkflow", tripCounter)
}
Loading

0 comments on commit 8f05d01

Please sign in to comment.