Skip to content

Commit

Permalink
"Backported" un-bugfix in case people hit non-determinism errors
Browse files Browse the repository at this point in the history
Introduces and uses a "bug backport" config, to allow selecting old behavior
that could not be fixed in a completely transparent way.

See code comments for detailed use for this new flag.

In general we should treat this config as *somewhat desirable* to keep when
the upkeep cost is low or zero, but it does represent known tech debt that
we need to clean up at some point.
When adding or removing fields on it, make sure to cover it in the version notes!
  • Loading branch information
Groxx committed Nov 8, 2021
1 parent 4d64c01 commit 9871428
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 16 deletions.
11 changes: 6 additions & 5 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ type (
memo map[string]interface{}
searchAttributes map[string]interface{}
parentClosePolicy ParentClosePolicy
bugports Bugports
}

executeWorkflowParams struct {
Expand Down Expand Up @@ -596,7 +597,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
hasResult = false
v, ok, m := c.receiveAsyncImpl(callback)

if !ok && !m { //channel closed and empty
if !ok && !m { // channel closed and empty
return m
}

Expand All @@ -606,7 +607,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
state.unblocked()
return m
}
continue //corrupt signal. Drop and reset process
continue // corrupt signal. Drop and reset process
}
for {
if hasResult {
Expand All @@ -615,7 +616,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
state.unblocked()
return more
}
break //Corrupt signal. Drop and reset process.
break // Corrupt signal. Drop and reset process.
}
state.yield(fmt.Sprintf("blocked on %s.Receive", c.name))
}
Expand All @@ -631,7 +632,7 @@ func (c *channelImpl) ReceiveAsync(valuePtr interface{}) (ok bool) {
func (c *channelImpl) ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool) {
for {
v, ok, more := c.receiveAsyncImpl(nil)
if !ok && !more { //channel closed and empty
if !ok && !more { // channel closed and empty
return ok, more
}

Expand Down Expand Up @@ -774,7 +775,7 @@ func (c *channelImpl) Close() {
// Takes a value and assigns that 'to' value. logs a metric if it is unable to deserialize
func (c *channelImpl) assignValue(from interface{}, to interface{}) error {
err := decodeAndAssignValue(c.dataConverter, from, to)
//add to metrics
// add to metrics
if err != nil {
c.env.GetLogger().Error(fmt.Sprintf("Corrupt signal received on channel %s. Error deserializing", c.name), zap.Error(err))
c.env.GetMetricsScope().Counter(metrics.CorruptedSignalsCounter).Inc(1)
Expand Down
19 changes: 14 additions & 5 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2788,7 +2788,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowAlreadyRunning() {
ctx1 := WithChildWorkflowOptions(ctx, ChildWorkflowOptions{
WorkflowID: "Test_ChildWorkflowAlreadyRunning",
ExecutionStartToCloseTimeout: time.Minute,
//WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicate,
// WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicate,
})

var result1, result2 string
Expand Down Expand Up @@ -3104,7 +3104,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_Regression_ExecuteChildWorkflowWithCanc
// - <0 == do not cancel
// - 0 == cancel synchronously
// - >0 == cancel after waiting that long
check := func(cancelTime time.Duration, expected string) {
check := func(cancelTime time.Duration, bugport bool, expected string) {
env := s.NewTestWorkflowEnvironment()
env.Test(s.T())
env.RegisterWorkflowWithOptions(func(ctx Context) error {
Expand All @@ -3124,6 +3124,9 @@ func (s *WorkflowTestSuiteUnitTest) Test_Regression_ExecuteChildWorkflowWithCanc
ctx = WithChildWorkflowOptions(ctx, ChildWorkflowOptions{
ExecutionStartToCloseTimeout: 2 * time.Minute,
TaskStartToCloseTimeout: 2 * time.Minute,
Bugports: Bugports{
StartChildWorkflowsOnCanceledContext: bugport,
},
})
err := ExecuteChildWorkflow(ctx, "child").Get(ctx, nil)

Expand All @@ -3145,14 +3148,20 @@ func (s *WorkflowTestSuiteUnitTest) Test_Regression_ExecuteChildWorkflowWithCanc
}
s.Run("sanity check", func() {
// workflow should run the child successfully normally...
check(-1, "no err")
check(-1, false, "no err")
})
s.Run("canceled after child starts", func() {
// ... and cancel the child when the child is canceled...
check(30*time.Second, "canceled")
check(30*time.Second, false, "canceled")
})
s.Run("canceled before child starts", func() {
// ... and should not start the child (i.e. be canceled) when canceled before it is started.
check(0, "canceled")
check(0, false, "canceled")
})
s.Run("canceled before child starts with bugport enabled", func() {
// prior to v0.18.4, canceling before the child was started would still start the child,
// and it would continue running.
// the bugport provides this old behavior to ease migration, at least until we feel the need to remove it.
check(0, true, "no err")
})
}
84 changes: 78 additions & 6 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,65 @@ type (
// ParentClosePolicy - Optional policy to decide what to do for the child.
// Default is Terminate (if onboarded to this feature)
ParentClosePolicy ParentClosePolicy

// Bugports allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
// emulating old behavior until a fix is deployed.
//
// Bugports are always deprecated and may be removed in future versions.
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
// allow cleaning up the additional code complexity that they cause.
//
// deprecated
Bugports Bugports
}

// Bugports allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
// emulating old behavior until a fix is deployed.
// By default, bugs (especially rarely-occurring ones) are fixed and all users are opted into the new behavior.
// Back-ported buggy behavior *may* be available via these flags.
//
// Fields in here are NOT guaranteed to be stable. They will almost certainly be removed in the next major
// release, and might be removed earlier if a need arises, e.g. if the historical behavior causes too much of an
// increase in code complexity.
//
// See each individual field for details.
//
// Bugports are always deprecated and may be removed in future versions.
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
// allow cleaning up the additional code complexity that they cause.
//
// deprecated
Bugports struct {
// StartChildWorkflowsOnCanceledContext allows emulating older, buggy behavior that existed prior to v0.18.4.
//
// Prior to the fix, child workflows would be started and keep running when their context was canceled in two
// situations:
// 1) when the context was canceled before ExecuteChildWorkflow is called, and
// 2) when the context was canceled after ExecuteChildWorkflow but before the child workflow was started.
//
// 1 is unfortunately easy to trigger, though many workflows will encounter an error earlier and not reach the
// child-workflow-executing code. 2 is expected to be very rare in practice.
//
// To permanently emulate old behavior, use a disconnected context when starting child workflows, and
// cancel it only after `childfuture.GetWorkflowExecution().Get(...)` returns. This can be used when this flag
// is removed in the future.
//
// If you have currently-broken workflows and need to repair them, there are two primary options:
//
// 1: Check the BinaryChecksum value of your new deploy and/or of the decision that is currently failing
// workflows. Then set this flag when replaying history on those not-fixed checksums. Concretely, this means
// checking both `workflow.GetInfo(ctx).BinaryChecksum` (note that sufficiently old clients may not have
// recorded a value, and it may be nil) and `workflow.IsReplaying(ctx)`.
//
// 2: Reset broken workflows back to either before the buggy behavior was recorded, or before the fixed behavior
// was deployed. A "bad binary" reset type can do the latter in bulk, see the CLI's
// `cadence workflow reset-batch --reset_type BadBinary --help` for details. For the former, check the failing
// histories, identify the point at which the bug occurred, and reset to prior to that decision task.
//
// Added in 0.18.4, this may be removed in or after v0.19.0, so please migrate off of it ASAP.
//
// deprecated
StartChildWorkflowsOnCanceledContext bool
}
)

Expand Down Expand Up @@ -896,15 +955,23 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
decodeFutureImpl: mainFuture.(*decodeFutureImpl),
executionFuture: executionFuture.(*futureImpl),
}
// clients prior to v0.18.4 would incorrectly start child workflows that were started with cancelled contexts,
// and did not react to cancellation between requested and started.
correctChildCancellation := true
workflowOptionsFromCtx := getWorkflowEnvOptions(ctx)

// Starting with a canceled context should immediately fail, no need to even try.
if ctx.Err() != nil {
mainSettable.SetError(ctx.Err())
executionSettable.SetError(ctx.Err())
return result
if workflowOptionsFromCtx.bugports.StartChildWorkflowsOnCanceledContext {
// backport the bug
correctChildCancellation = false
} else {
mainSettable.SetError(ctx.Err())
executionSettable.SetError(ctx.Err())
return result
}
}

workflowOptionsFromCtx := getWorkflowEnvOptions(ctx)
dc := workflowOptionsFromCtx.dataConverter
env := getWorkflowEnvironment(ctx)
wfType, input, err := getValidatedWorkflowFunction(childWorkflowType, args, dc, env.GetRegistry())
Expand Down Expand Up @@ -951,7 +1018,11 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil

// forward the delayed cancellation if necessary
if shouldCancelAsync && e == nil && !mainFuture.IsReady() {
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
if workflowOptionsFromCtx.bugports.StartChildWorkflowsOnCanceledContext {
// do nothing: buggy behavior did not forward the cancellation
} else {
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
}
}
})

Expand All @@ -967,7 +1038,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
if childWorkflowExecution != nil && !mainFuture.IsReady() {
// child workflow started, and ctx cancelled. forward cancel to the child.
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
} else if childWorkflowExecution == nil {
} else if childWorkflowExecution == nil && correctChildCancellation {
// decision to start the child has been made, but it has not yet started.

// TODO: ideal, but not strictly necessary for correctness:
Expand Down Expand Up @@ -1294,6 +1365,7 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context {
wfOptions.memo = cwo.Memo
wfOptions.searchAttributes = cwo.SearchAttributes
wfOptions.parentClosePolicy = cwo.ParentClosePolicy
wfOptions.bugports = cwo.Bugports

return ctx1
}
Expand Down
Loading

0 comments on commit 9871428

Please sign in to comment.