Skip to content

Commit

Permalink
source-firestore: use stateKey for tracking binding state
Browse files Browse the repository at this point in the history
Updates the connector to use `stateKey` for each binding to track its state.

This includes a migration for existing captures. Once all existing captures have started up with
this new connector and we are confident that no additional old captures will be re-enabled, we can
remove the migration code.
  • Loading branch information
williamhbaker committed Jan 22, 2024
1 parent a151d51 commit 537acf6
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 55 deletions.
2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestAddedBindingSameGroup-one
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/users/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fusers%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestAddedBindingSameGroup-two
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/groups/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"},"flow_source_tests/*/users/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fgroups%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"},"flow_source_tests%2F%2A%2Fusers%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestBindingDeletion-one
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestBindingDeletion-three
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestBindingDeletion-two
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/other":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fother":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestDeletions-one
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestDeletions-two
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestManySmallWrites-one
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/users/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fusers%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestManySmallWrites-two
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/users/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fusers%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestMultipleWatches-one
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/users/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"},"flow_source_tests/*/users/*/notes":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"},"flow_source_tests/*/users/*/tasks":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fusers%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"},"flow_source_tests%2F%2A%2Fusers%2F%2A%2Fnotes":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"},"flow_source_tests%2F%2A%2Fusers%2F%2A%2Ftasks":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestMultipleWatches-two
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/users/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"},"flow_source_tests/*/users/*/notes":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"},"flow_source_tests/*/users/*/tasks":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fusers%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"},"flow_source_tests%2F%2A%2Fusers%2F%2A%2Fnotes":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"},"flow_source_tests%2F%2A%2Fusers%2F%2A%2Ftasks":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestSimpleCapture-one
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 1 addition & 1 deletion source-firestore/.snapshots/TestSimpleCapture-two
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
# ================================
# Final State Checkpoint
# ================================
{"Resources":{"flow_source_tests/*/docs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}
{"bindingStateV1":{"flow_source_tests%2F%2A%2Fdocs":{"Backfill":{"Completed":true,"Cursor":"","MTime":"<TIMESTAMP>","StartAfter":"<TIMESTAMP>"},"ReadTime":"<TIMESTAMP>"}}}

2 changes: 2 additions & 0 deletions source-firestore/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"net/url"
"os"
"regexp"
"strings"
Expand Down Expand Up @@ -270,6 +271,7 @@ func simpleBindings(t testing.TB, names ...string) []*flow.CaptureSpec_Binding {
Collection: flow.CollectionSpec{Name: flow.Collection(path)},
ResourceConfigJson: json.RawMessage(fmt.Sprintf(`{"path": %q, "backfillMode": "async"}`, path)),
ResourcePath: []string{path},
StateKey: url.QueryEscape(path),
})
}
return bindings
Expand Down
147 changes: 105 additions & 42 deletions source-firestore/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,27 @@ func (driver) Pull(open *pc.Request_Open, stream *boilerplate.PullOutput) error
}
}

var resourceBindings []resource
for _, binding := range open.Capture.Bindings {
var res resource
if err := pf.UnmarshalStrict(binding.ResourceConfigJson, &res); err != nil {
return fmt.Errorf("parsing resource config: %w", err)
}
resourceBindings = append(resourceBindings, res)
if err := migrateState(&prevState, open.Capture.Bindings); err != nil {
return fmt.Errorf("migrating previous state: %w", err)
}

updatedResourceStates, err := initResourceStates(prevState.Resources, resourceBindings)
updatedResourceStates, err := initResourceStates(prevState.Resources, open.Capture.Bindings)
if err != nil {
return fmt.Errorf("error initializing resource states: %w", err)
}

// Build a mapping of document paths to state keys, to allow efficient lookups of state keys
// from the path of retrieved documents.
stateKeys := make(map[string]boilerplate.StateKey)
for sk, res := range updatedResourceStates {
stateKeys[res.path] = sk
}

var capture = &capture{
Config: cfg,
State: &captureState{
Resources: updatedResourceStates,
stateKeys: stateKeys,
},
Output: stream,

Expand All @@ -105,7 +108,51 @@ type capture struct {

type captureState struct {
sync.RWMutex
Resources map[string]*resourceState
Resources map[boilerplate.StateKey]*resourceState `json:"bindingStateV1,omitempty"`
OldResources map[string]*resourceState `json:"Resources,omitempty"` // TODO(whb): Remove once all captures have migrated.
stateKeys map[string]boilerplate.StateKey // Allow for lookups of the stateKey from a document path.
}

func migrateState(state *captureState, bindings []*pf.CaptureSpec_Binding) error {
if state.Resources != nil && state.OldResources != nil {
return fmt.Errorf("application error: both Resources and OldResources were non-nil")
} else if state.Resources != nil {
log.Info("skipping state migration since it's already done")
return nil
}

state.Resources = make(map[boilerplate.StateKey]*resourceState)

for _, b := range bindings {
if b.StateKey == "" {
return fmt.Errorf("state key was empty for binding %s", b.ResourcePath)
}

var res resource
if err := pf.UnmarshalStrict(b.ResourceConfigJson, &res); err != nil {
return fmt.Errorf("parsing resource config: %w", err)
}

ll := log.WithFields(log.Fields{
"stateKey": b.StateKey,
"path": res.Path,
})

stateFromOld, ok := state.OldResources[res.Path]
if !ok {
// This may happen if the connector has never emitted any checkpoints with data for this
// binding.
ll.Warn("no state found for binding while migrating state")
continue
}

state.Resources[boilerplate.StateKey(b.StateKey)] = stateFromOld
ll.Info("migrated binding state")
}

state.OldResources = nil

return nil
}

type resourceState struct {
Expand All @@ -119,6 +166,7 @@ type resourceState struct {
Inconsistent bool `json:"Inconsistent,omitempty"`

bindingIndex int
path string
}

type backfillState struct {
Expand Down Expand Up @@ -149,12 +197,21 @@ func (s *backfillState) String() string {

// Given the prior resource states from the last DriverCheckpoint along with
// the current capture bindings, compute a new set of resource states.
func initResourceStates(prevStates map[string]*resourceState, resourceBindings []resource) (map[string]*resourceState, error) {
func initResourceStates(prevStates map[boilerplate.StateKey]*resourceState, bindings []*pf.CaptureSpec_Binding) (map[boilerplate.StateKey]*resourceState, error) {
var now = time.Now()
var states = make(map[string]*resourceState)
for idx, resource := range resourceBindings {
var state = &resourceState{bindingIndex: idx}
if prevState, ok := prevStates[resource.Path]; ok && !prevState.Inconsistent {
var states = make(map[boilerplate.StateKey]*resourceState)
for idx, binding := range bindings {
var res resource
if err := pf.UnmarshalStrict(binding.ResourceConfigJson, &res); err != nil {
return nil, fmt.Errorf("parsing resource config: %w", err)
}
var stateKey = boilerplate.StateKey(binding.StateKey)

var state = &resourceState{
bindingIndex: idx,
path: res.Path,
}
if prevState, ok := prevStates[stateKey]; ok && !prevState.Inconsistent {
state.ReadTime = prevState.ReadTime
state.Backfill = prevState.Backfill
} else if ok && prevState.Inconsistent {
Expand All @@ -171,13 +228,13 @@ func initResourceStates(prevStates map[string]*resourceState, resourceBindings [
startTime = time.Now()
}
state.ReadTime = now
if resource.BackfillMode == backfillModeNone {
if res.BackfillMode == backfillModeNone {
state.Backfill = nil
} else {
state.Backfill = &backfillState{StartAfter: startTime}
}
} else {
switch resource.BackfillMode {
switch res.BackfillMode {
case backfillModeNone:
state.ReadTime = now
state.Backfill = nil
Expand All @@ -188,17 +245,17 @@ func initResourceStates(prevStates map[string]*resourceState, resourceBindings [
state.ReadTime = time.Time{}
state.Backfill = nil
default:
return nil, fmt.Errorf("invalid backfill mode %q for %q", resource.BackfillMode, resource.Path)
return nil, fmt.Errorf("invalid backfill mode %q for %q", res.BackfillMode, res.Path)
}
if resource.InitTimestamp != "" {
if ts, err := time.Parse(time.RFC3339Nano, resource.InitTimestamp); err != nil {
return nil, fmt.Errorf("invalid initTimestamp value %q: %w", resource.InitTimestamp, err)
if res.InitTimestamp != "" {
if ts, err := time.Parse(time.RFC3339Nano, res.InitTimestamp); err != nil {
return nil, fmt.Errorf("invalid initTimestamp value %q: %w", res.InitTimestamp, err)
} else {
state.ReadTime = ts
}
}
}
states[resource.Path] = state
states[stateKey] = state
}
return states, nil
}
Expand All @@ -210,8 +267,10 @@ func (s *captureState) Validate() error {
func (s *captureState) BindingIndex(resourcePath string) (int, bool) {
s.RLock()
defer s.RUnlock()
if state := s.Resources[resourcePath]; state != nil {
return state.bindingIndex, true
if sk, ok := s.stateKeys[resourcePath]; ok {
if state := s.Resources[sk]; state != nil {
return state.bindingIndex, true
}
}
// Return MaxInt just to be extra clear that we're not capturing this resource
return math.MaxInt, false
Expand All @@ -220,28 +279,32 @@ func (s *captureState) BindingIndex(resourcePath string) (int, bool) {
func (s *captureState) ReadTime(resourcePath string) (time.Time, bool) {
s.RLock()
defer s.RUnlock()
if state := s.Resources[resourcePath]; state != nil {
return state.ReadTime, true
if sk, ok := s.stateKeys[resourcePath]; ok {
if state := s.Resources[sk]; state != nil {
return state.ReadTime, true
}
}
return time.Time{}, false
}

func (s *captureState) BackfillingAsync(rpath resourcePath) bool {
s.RLock()
defer s.RUnlock()
if state := s.Resources[rpath]; state != nil {
return state.Backfill != nil
if sk, ok := s.stateKeys[rpath]; ok {
if state := s.Resources[sk]; state != nil {
return state.Backfill != nil
}
}
return false
}

func (s *captureState) UpdateReadTimes(collectionID string, readTime time.Time) (json.RawMessage, error) {
s.Lock()
var updated = make(map[string]*resourceState)
for resourcePath, resourceState := range s.Resources {
if getLastCollectionGroupID(resourcePath) == collectionID {
var updated = make(map[boilerplate.StateKey]*resourceState)
for stateKey, resourceState := range s.Resources {
if getLastCollectionGroupID(resourceState.path) == collectionID {
resourceState.ReadTime = readTime
updated[resourcePath] = resourceState
updated[stateKey] = resourceState
}
}
s.Unlock()
Expand All @@ -255,11 +318,11 @@ func (s *captureState) UpdateReadTimes(collectionID string, readTime time.Time)

func (s *captureState) UpdateBackfillState(collectionID string, state *backfillState) (json.RawMessage, error) {
s.Lock()
var updated = make(map[string]*resourceState)
for resourcePath, resourceState := range s.Resources {
if getLastCollectionGroupID(resourcePath) == collectionID && resourceState.Backfill != nil {
var updated = make(map[boilerplate.StateKey]*resourceState)
for stateKey, resourceState := range s.Resources {
if getLastCollectionGroupID(resourceState.path) == collectionID && resourceState.Backfill != nil {
resourceState.Backfill = state
updated[resourcePath] = resourceState
updated[stateKey] = resourceState
}
}
s.Unlock()
Expand All @@ -273,11 +336,11 @@ func (s *captureState) UpdateBackfillState(collectionID string, state *backfillS

func (s *captureState) MarkInconsistent(collectionID string) (json.RawMessage, error) {
s.Lock()
var updated = make(map[string]*resourceState)
for resourcePath, resourceState := range s.Resources {
if getLastCollectionGroupID(resourcePath) == collectionID {
var updated = make(map[boilerplate.StateKey]*resourceState)
for stateKey, resourceState := range s.Resources {
if getLastCollectionGroupID(resourceState.path) == collectionID {
resourceState.Inconsistent = true
updated[resourcePath] = resourceState
updated[stateKey] = resourceState
}
}
s.Unlock()
Expand All @@ -298,8 +361,8 @@ func (c *capture) Run(ctx context.Context) error {
// are both 'messages').
var watchCollections = make(map[collectionGroupID]time.Time)
var backfillCollections = make(map[collectionGroupID]*backfillState)
for resourcePath, resourceState := range c.State.Resources {
var collectionID = getLastCollectionGroupID(resourcePath)
for _, resourceState := range c.State.Resources {
var collectionID = getLastCollectionGroupID(resourceState.path)
if startTime, ok := watchCollections[collectionID]; !ok || resourceState.ReadTime.Before(startTime) {
watchCollections[collectionID] = resourceState.ReadTime
}
Expand All @@ -308,7 +371,7 @@ func (c *capture) Run(ctx context.Context) error {
} else if resumeState, ok := backfillCollections[collectionID]; !ok {
backfillCollections[collectionID] = resourceState.Backfill
} else if !resumeState.Equal(resourceState.Backfill) {
return fmt.Errorf("internal error: backfill state mismatch for resource %q with collection ID %q", resourcePath, collectionID)
return fmt.Errorf("internal error: backfill state mismatch for resource %q with collection ID %q", resourceState.path, collectionID)
}
}

Expand Down

0 comments on commit 537acf6

Please sign in to comment.