From 5abec1564bea43c92e7b2e5d71dadfe8d651b022 Mon Sep 17 00:00:00 2001 From: Will Baker Date: Wed, 24 Jan 2024 09:02:30 -0500 Subject: [PATCH] source-mongodb: always emit an updated checkpoint after `updateResourceStates` `updateResourceStates` may remove bindings from the checkpoint, and these removals won't be reflected in persisted merge patches in future document checkpoints. Because of this, we should always emit a full update to the state checkpoint in `Pull` after `updateResourceStates` has run to persist any changes in the state due to bindings being removed. --- source-mongodb/pull.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/source-mongodb/pull.go b/source-mongodb/pull.go index 16e00ddfd5..af2fdc7cce 100644 --- a/source-mongodb/pull.go +++ b/source-mongodb/pull.go @@ -53,12 +53,11 @@ func (d *driver) Pull(open *pc.Request_Open, stream *boilerplate.PullOutput) err } } - migrated, err := migrateState(&prevState, open.Capture.Bindings) - if err != nil { + if _, err := migrateState(&prevState, open.Capture.Bindings); err != nil { return fmt.Errorf("migrating binding states: %w", err) } - prevState, err = updateResourceStates(prevState, bindings) + prevState, err := updateResourceStates(prevState, bindings) if err != nil { return fmt.Errorf("error initializing resource states: %w", err) } @@ -94,10 +93,10 @@ func (d *driver) Pull(open *pc.Request_Open, stream *boilerplate.PullOutput) err return err } - if migrated { - if err := c.outputCheckpoint(&prevState, false); err != nil { - return err - } + // Persist the checkpoint in full, which may have been updated in updateResourceStates to remove + // bindings. + if err := c.outputCheckpoint(&prevState, false); err != nil { + return err } if !prevState.isBackfillComplete(bindings) {