Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqlcapture: use stateKey for tracking binding state #1101

Merged
merged 1 commit into from
Jan 31, 2024
Merged

Conversation

williamhbaker
Copy link
Member

@williamhbaker williamhbaker commented Dec 4, 2023

Description:

Adds support for state keys to SQL CDC captures.

The general idea is that a given stream (which is the combined schema & table name of a captured table) has its state accessible in the driver checkpoint via the runtime-provided stateKey.

After some consideration, for the moment I have settled on representing this association between streamID and stateKey at the sqlcapture "framework" level, rather than threading it deeper into individual captures driven by the framework. This seems to work out decently well since a replication stream change event already has the information needed to construct a streamID, but getting the stateKey out of this requires some higher-level knowledge, much like how getting the correct binding index to emit a document also requires some additional knowledge.

In addition to the automated tests, I manually tested this by:

  1. Create a capture of multiple tables using the old connector image
  2. Switch to the new image, and see that the state was migrated correctly...no re-backfilling or anything like that
  3. Use the backfill counter to trigger a re-backfill. Removing & re-adding bindings still works for that too. Also incrementing the backfill counter for all bindings will reset the cursor.

Workflow steps:

After this change is merged, SQL CDC captures will be able to use backfill counters to trigger a re-backfill of a captured table.

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:


This change is Reviewable

Copy link
Member

@willdonnelly willdonnelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks directionally correct to me. I've added a couple of comments on specific bits, but overall this seems like the right approach.

How to transition over existing captures is a tricky question though, because the way the binding keys are implemented here it looks like the old stream IDs and the new state keys would be sharing the same "key space" during that switchover. If not for that, it seems like it ought to be reasonably straightforward to add some additional logic to updateState() which would check for bindings which have no state associated with stateKey but do have old-style state associated with the stream ID, and copy over the state appropriately.

sort.Strings(streams)
return streams
slices.SortFunc(bindings, func(a, b *Binding) int {
if a.StreamID < b.StreamID {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body of this comparison function could just be return strings.Compare(a.StreamID, b.StreamID), I think?

if !streamExistsInCatalog && state.Mode != TableModeIgnore {
logrus.WithField("stream", streamID).Info("stream removed from catalog")
c.State.Streams[streamID] = &TableState{Mode: TableModeIgnore, dirty: true}
// TODO(whb): We can eventually remove this when backfill counters are fully incorporated into
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking my understanding here: The plan is that "backfill counter" will be incremented if the user disables+re-enables a single binding, even if they don't use the new "go re-backfill this" flow?

(Because in addition to permitting users to re-backfill a collection by toggling the binding, it's also important that this logic here currently prevents the possibility of disabling a stream, missing some amount of replication events, and then re-enabling it without triggering that new backfill.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, we'll probably need to keep this state-clearing logic around since its not really valid to disable + resume a binding without re-backfilling it; I always seem to forget about that. I don't know that we'd always want to increment the backfill counter on re-enabling a binding, and although that might be possible, I don't think we'd do it right away 🤔.

@williamhbaker williamhbaker force-pushed the wb/statekey branch 2 times, most recently from e53fbb8 to cab2b57 Compare January 24, 2024 19:06
@@ -16,52 +16,52 @@ func TestRecommendedCatalogName(t *testing.T) {
{
schema: "something",
table: "typical",
want: "something.typical",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file aren't from this PR, but I found that they needed updated when running all of the sqlcapture tests.

Updates the sqlcapture connectors to use `stateKey` for each binding to track its state.

This includes a migration for existing captures. Once all existing captures have started up with
this new connector and we are confident that no additional old captures will be re-enabled, we can
remove the migration code.
@williamhbaker williamhbaker changed the title wip: sqlcapture state keys sqlcapture: use stateKey for tracking binding state Jan 24, 2024
@williamhbaker williamhbaker marked this pull request as ready for review January 24, 2024 19:37
@jgraettinger
Copy link
Member

@willdonnelly would you please review this? It's absence has been coming up frequently, lately.

Copy link
Member

@willdonnelly willdonnelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Everything seems in order, let's ship it.

// they just need to either increment the backfill counter for all bindings, or disable all
// bindings and then re-enable them (which will cause them all to get backfilled anew, as they
// should after such an event).
if allStreamsAreNew {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so changing the check to work this way means that if the backfill counters for every binding are incremented all at once we'll also reset the cursor? Nice!

@williamhbaker williamhbaker merged commit fc4fd33 into main Jan 31, 2024
44 checks passed
@williamhbaker williamhbaker deleted the wb/statekey branch January 31, 2024 17:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants