-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sqlcapture: use stateKey for tracking binding state #1101
Conversation
73fb937
to
f5a96d5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks directionally correct to me. I've added a couple of comments on specific bits, but overall this seems like the right approach.
How to transition over existing captures is a tricky question though, because the way the binding keys are implemented here it looks like the old stream IDs and the new state keys would be sharing the same "key space" during that switchover. If not for that, it seems like it ought to be reasonably straightforward to add some additional logic to updateState()
which would check for bindings which have no state associated with stateKey
but do have old-style state associated with the stream ID, and copy over the state appropriately.
sqlcapture/capture.go
Outdated
sort.Strings(streams) | ||
return streams | ||
slices.SortFunc(bindings, func(a, b *Binding) int { | ||
if a.StreamID < b.StreamID { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The body of this comparison function could just be return strings.Compare(a.StreamID, b.StreamID)
, I think?
sqlcapture/capture.go
Outdated
if !streamExistsInCatalog && state.Mode != TableModeIgnore { | ||
logrus.WithField("stream", streamID).Info("stream removed from catalog") | ||
c.State.Streams[streamID] = &TableState{Mode: TableModeIgnore, dirty: true} | ||
// TODO(whb): We can eventually remove this when backfill counters are fully incorporated into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checking my understanding here: The plan is that "backfill counter" will be incremented if the user disables+re-enables a single binding, even if they don't use the new "go re-backfill this" flow?
(Because in addition to permitting users to re-backfill a collection by toggling the binding, it's also important that this logic here currently prevents the possibility of disabling a stream, missing some amount of replication events, and then re-enabling it without triggering that new backfill.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point, we'll probably need to keep this state-clearing logic around since its not really valid to disable + resume a binding without re-backfilling it; I always seem to forget about that. I don't know that we'd always want to increment the backfill counter on re-enabling a binding, and although that might be possible, I don't think we'd do it right away 🤔.
e53fbb8
to
cab2b57
Compare
@@ -16,52 +16,52 @@ func TestRecommendedCatalogName(t *testing.T) { | |||
{ | |||
schema: "something", | |||
table: "typical", | |||
want: "something.typical", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file aren't from this PR, but I found that they needed updated when running all of the sqlcapture
tests.
Updates the sqlcapture connectors to use `stateKey` for each binding to track its state. This includes a migration for existing captures. Once all existing captures have started up with this new connector and we are confident that no additional old captures will be re-enabled, we can remove the migration code.
cab2b57
to
e2a449a
Compare
@willdonnelly would you please review this? It's absence has been coming up frequently, lately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Everything seems in order, let's ship it.
// they just need to either increment the backfill counter for all bindings, or disable all | ||
// bindings and then re-enable them (which will cause them all to get backfilled anew, as they | ||
// should after such an event). | ||
if allStreamsAreNew { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, so changing the check to work this way means that if the backfill counters for every binding are incremented all at once we'll also reset the cursor? Nice!
Description:
Adds support for state keys to SQL CDC captures.
The general idea is that a given stream (which is the combined schema & table name of a captured table) has its state accessible in the driver checkpoint via the runtime-provided
stateKey
.After some consideration, for the moment I have settled on representing this association between
streamID
andstateKey
at thesqlcapture
"framework" level, rather than threading it deeper into individual captures driven by the framework. This seems to work out decently well since a replication stream change event already has the information needed to construct astreamID
, but getting thestateKey
out of this requires some higher-level knowledge, much like how getting the correct binding index to emit a document also requires some additional knowledge.In addition to the automated tests, I manually tested this by:
Workflow steps:
After this change is merged, SQL CDC captures will be able to use backfill counters to trigger a re-backfill of a captured table.
Documentation links affected:
(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)
Notes for reviewers:
This change isdata:image/s3,"s3://crabby-images/d0bb7/d0bb7f7625ca5bf5c3cf7a2b7a514cf841ab8395" alt="Reviewable"