Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-snowflake: Initial implementation #1137

Merged
merged 17 commits into from
Jan 31, 2024
Merged

Conversation

willdonnelly
Copy link
Member

@willdonnelly willdonnelly commented Dec 19, 2023

Description:

This PR adds a Snowflake CDC connector.

I'll come back and flesh out the PR description in the morning and make sure it's got docs and CI setup and whatnot, but I believe the core connector implementation is now complete so I might as well get this uploaded and start the code review ball rolling.

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)


This change is Reviewable

@willdonnelly willdonnelly requested a review from a team December 19, 2023 01:04
Copy link
Member

@williamhbaker williamhbaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a quick look and made some comments on this draft - you're probably already aware of these things, but here it is anyway just in case.

This isn't quite finished, I still need to implement the desired
"only emit partial-progress checkpoints when the current document
has a different collection key from the previous (and before the
current document is emitted)" behavior and it could do with more
testing.

However the development history of this connector was incredibly
long and ugly -- this was started back in the summer and it's
nearly Christmastime now, and kept getting picked up and put
down again. Now that the overall design and architecture is
finally at a place I'm happy with, it seemed like a good point
to make a clean break and discard all that noise.
This commit adds an `ORDER BY` clause to staging table readout
queries, where the ordering is supposed to be over the column(s)
which are used as the collection key. This ensures that the table
contents will be captured in a consistent order even across
connector restarts.

Unfortunately it's not always guaranteed that we'll have a useful
set of columns as the collection key, because the user *could* just
use `[/_meta/seq, /_meta/off]` as the collection key and that's a
not-entirely-unreasonable choice if one wants a pure "change log"
sort of capture. So if the collection key isn't made up of valid
column names (where "valid" means top-level properties which aren't
`_meta`) then we fall back to omitting the `ORDER BY` clause and
hoping that either we don't get restarted halfway through a table
or that if we do Snowflake gives us the same result order after the
restart.

This commit does not implement the whole "only emit checkpoints
in between rows with different collection-key values" but it moves
the partial-progress checkpoint logic around so there's an obvious
place to add that next.
Adds logic so that partial-progress checkpoints will only be
emitted in between rows with different values for the `ORDER BY`
columns. If the collection key doesn't describe a set of usable
ordering columns the behavior will be the same as before -- just
emitting a checkpoint after exactly K documents each time.
This commit adds a new test `TestLargeCapture` which runs a
capture of a single table multiple times, inserting new rows
at a couple of points and at others "rewinding" the state
checkpoint to a position partway through a staging table from
the previous capture (this simulates the runtime killing and
restarting the task without having persisted all of its output).

In the real world this sort of scenario would involve millions
of rows and here we're using hundreds, but aside from that this
should be a reasonable test of the correctness properties involved.
The streams and staging tables created by `source-snowflake` have
as part of their name a hashed ID which uniquely identifies the
source table to which they belong. With this change, the IDs will
also incorporate the Flow capture task name, so that multiple
capture tasks from the same database can coexist peacefully.

One corollary of this is that the connector cannot simply assume
that a stream or staging table which it doesn't recognize is old
junk from a deleted binding which should be cleaned up, so for
now that assumption has been disabled and old streams/tables of
a deleted binding will linger forever. This will be improved soon.
Adds logic to delete staging tables and streams, but only when
they correspond to a binding which was removed in the current
task restart. This ensures that we shouldn't accidentally be
deleting the entities of another Snowflake capture task, and
while it could fail and leave things lingering in certain rare
edge cases it should in general be reliable.
Most notably this commit fixes a bug which caused discovery to
fail for tables with multiple-column primary keys in which the
order of columns making up the key did not match their order in
the source table.
See #745 for an
explanation of what this does and why it might be helpful,
but this should reduce memory usage significantly.

This change subtly alters how certain column types get
serialized, but a subsequent commit will add more tests
covering various datatype edge cases and some translation
logic to make sure values get serialized reasonably and in
ways that match the discovery schema generation.
Elaborates on the basic datatype and timestamp test cases,
fixes a really dumb oversight where basic integers were being
stringified on capture, and adds support for `VARIANT`, `OBJECT`,
and `ARRAY` types which might even be correct so long as the
stringified values coming from the Snowflake client library
are always valid JSON.
Since the unique IDs were previously produced by hashing a
`(CaptureName, SourceSchema, SourceTable)` tuple they would
be the same for each backfill-counter iteration of a given
binding.

This introduced an unnecessary risk that cleanup of old staged
entities could interfere with initialization when the backfill
counter is bumped, and the simplest solution was to not try and
have the IDs be deterministic at all.

Instead, we now generate the IDs using a random nonce in place
of the aforementioned hash, and keep track of the ID as part of
the stream state. Since we already needed that information in
the stream state to allow cleanup to work properly this is a
fairly simple change, although it required a bit of refactoring
of all the places where IDs were previously generated implicitly
from the source table name.
Tweaks related to the introduction of a random element into the
staging UIDs and providing a reasonable state key during tests.
@willdonnelly willdonnelly marked this pull request as ready for review January 30, 2024 16:07
@willdonnelly willdonnelly force-pushed the wgd/source-snowflake branch 2 times, most recently from 65ad99c to 56ee5db Compare January 30, 2024 16:21
@willdonnelly
Copy link
Member Author

PTAL. The main changes since the previous round of PR feedback are:

  • The per-binding state in checkpoints is now keyed using the runtime-provided StateKey property.
  • Using StateKey meant that the keys can no longer be reliably decomposed into the source table schema/name, which was previously something we used as part of the cleanup logic when bindings were deleted. To remedy this, the unique ID associated with the staging resources of a particular source table is now stored in the per-binding state as well.
  • And since we're explicitly keeping track of that unique ID now, there is no need to implicitly compute it based on the source table name. In fact it's better to have it incorporate a random nonce, because this doesn't have the same issues with potential cleanup/initialization conflicts when incrementing the backfill counter. So we do that now.
  • Also instead of simply deleting the state entries for removed bindings, we now set a boolean to mark them as disabled. This eliminates some theoretical edge cases where staging resources might fail to ever be cleaned up if the first and only attempt to delete them failed, at the cost of slightly larger state checkpoints. If that ever becomes an issue, we can reintroduce some logic to prune disabled entries after verifying that all associated Snowflake entities have been deleted.

Copy link
Member

@williamhbaker williamhbaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - one question about the reduce: merge discovery output

This makes the key structure consistent with other captures where
the key has multiple elements.
@willdonnelly willdonnelly merged commit 68cf3d5 into main Jan 31, 2024
44 of 45 checks passed
@willdonnelly willdonnelly deleted the wgd/source-snowflake branch January 31, 2024 17:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants