-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
source-snowflake: Initial implementation #1137
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a quick look and made some comments on this draft - you're probably already aware of these things, but here it is anyway just in case.
7abe0ea
to
244d95a
Compare
This isn't quite finished, I still need to implement the desired "only emit partial-progress checkpoints when the current document has a different collection key from the previous (and before the current document is emitted)" behavior and it could do with more testing. However the development history of this connector was incredibly long and ugly -- this was started back in the summer and it's nearly Christmastime now, and kept getting picked up and put down again. Now that the overall design and architecture is finally at a place I'm happy with, it seemed like a good point to make a clean break and discard all that noise.
This commit adds an `ORDER BY` clause to staging table readout queries, where the ordering is supposed to be over the column(s) which are used as the collection key. This ensures that the table contents will be captured in a consistent order even across connector restarts. Unfortunately it's not always guaranteed that we'll have a useful set of columns as the collection key, because the user *could* just use `[/_meta/seq, /_meta/off]` as the collection key and that's a not-entirely-unreasonable choice if one wants a pure "change log" sort of capture. So if the collection key isn't made up of valid column names (where "valid" means top-level properties which aren't `_meta`) then we fall back to omitting the `ORDER BY` clause and hoping that either we don't get restarted halfway through a table or that if we do Snowflake gives us the same result order after the restart. This commit does not implement the whole "only emit checkpoints in between rows with different collection-key values" but it moves the partial-progress checkpoint logic around so there's an obvious place to add that next.
Adds logic so that partial-progress checkpoints will only be emitted in between rows with different values for the `ORDER BY` columns. If the collection key doesn't describe a set of usable ordering columns the behavior will be the same as before -- just emitting a checkpoint after exactly K documents each time.
This commit adds a new test `TestLargeCapture` which runs a capture of a single table multiple times, inserting new rows at a couple of points and at others "rewinding" the state checkpoint to a position partway through a staging table from the previous capture (this simulates the runtime killing and restarting the task without having persisted all of its output). In the real world this sort of scenario would involve millions of rows and here we're using hundreds, but aside from that this should be a reasonable test of the correctness properties involved.
The streams and staging tables created by `source-snowflake` have as part of their name a hashed ID which uniquely identifies the source table to which they belong. With this change, the IDs will also incorporate the Flow capture task name, so that multiple capture tasks from the same database can coexist peacefully. One corollary of this is that the connector cannot simply assume that a stream or staging table which it doesn't recognize is old junk from a deleted binding which should be cleaned up, so for now that assumption has been disabled and old streams/tables of a deleted binding will linger forever. This will be improved soon.
Adds logic to delete staging tables and streams, but only when they correspond to a binding which was removed in the current task restart. This ensures that we shouldn't accidentally be deleting the entities of another Snowflake capture task, and while it could fail and leave things lingering in certain rare edge cases it should in general be reliable.
Most notably this commit fixes a bug which caused discovery to fail for tables with multiple-column primary keys in which the order of columns making up the key did not match their order in the source table.
See #745 for an explanation of what this does and why it might be helpful, but this should reduce memory usage significantly. This change subtly alters how certain column types get serialized, but a subsequent commit will add more tests covering various datatype edge cases and some translation logic to make sure values get serialized reasonably and in ways that match the discovery schema generation.
Elaborates on the basic datatype and timestamp test cases, fixes a really dumb oversight where basic integers were being stringified on capture, and adds support for `VARIANT`, `OBJECT`, and `ARRAY` types which might even be correct so long as the stringified values coming from the Snowflake client library are always valid JSON.
Since the unique IDs were previously produced by hashing a `(CaptureName, SourceSchema, SourceTable)` tuple they would be the same for each backfill-counter iteration of a given binding. This introduced an unnecessary risk that cleanup of old staged entities could interfere with initialization when the backfill counter is bumped, and the simplest solution was to not try and have the IDs be deterministic at all. Instead, we now generate the IDs using a random nonce in place of the aforementioned hash, and keep track of the ID as part of the stream state. Since we already needed that information in the stream state to allow cleanup to work properly this is a fairly simple change, although it required a bit of refactoring of all the places where IDs were previously generated implicitly from the source table name.
Tweaks related to the introduction of a random element into the staging UIDs and providing a reasonable state key during tests.
a68549d
to
56ee5db
Compare
65ad99c
to
56ee5db
Compare
PTAL. The main changes since the previous round of PR feedback are:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - one question about the reduce: merge
discovery output
Copied from #1214
This makes the key structure consistent with other captures where the key has multiple elements.
Description:
This PR adds a Snowflake CDC connector.
I'll come back and flesh out the PR description in the morning and make sure it's got docs and CI setup and whatnot, but I believe the core connector implementation is now complete so I might as well get this uploaded and start the code review ball rolling.
Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)
Notes for reviewers:
(anything that might help someone review this PR)
This change is