Skip to content

Commit

Permalink
materializations: integration test for merge-and-then-insert transact…
Browse files Browse the repository at this point in the history
…ions

Extends the materialization integration test fixture to include an additional
transaction for the "duplicate keys" scenario where all of the keys are new. The
progression is then 1) All keys are new for the initial transaction, 2) Updates
to existing keys for the next transaction 3) (New) A transaction with all new
keys.

This exercises the behavior some materializations implement where merge types of
queries are only run if some of the keys to store have updates, and otherwise a
more efficient direct insert is run. It is reflected in the test snapshots for
materializations that persist queries in their state, most notably for
databricks, where a bug in this behavior was fixed in the prior commit.

Since I had to update most of the test snapshots for this, I also included the
following changes:

* Updated the google sheets materialization test setup to use an encrypted
  config file like all the other materialization tests.
* Did a bit of cleaned up the CI spec file mostly related to the above.
* Added an empty transaction to the test fixture, which is run first. This
  simulates what happens if a materialization is configured with a "notBefore"
  setting, and processes some empty transactions at first due to skipping over
  data. This revealed that materialize-sqlserver still wasn't handling this
  correctly, so that is fixed as well.
  • Loading branch information
williamhbaker committed Feb 24, 2025
1 parent 7bde296 commit f715347
Show file tree
Hide file tree
Showing 23 changed files with 2,839 additions and 984 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ jobs:
]'), matrix.connector)
env:
GCP_SERVICE_ACCOUNT_KEY: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }}
ROCKSET_API_KEY: ${{ secrets.ROCKSET_API_KEY }}
MYSQL_DATABASE: test

run: CONNECTOR=${{ matrix.connector }} VERSION=local ./tests/run.sh;
Expand Down Expand Up @@ -286,8 +285,6 @@ jobs:
"materialize-redshift",
"materialize-s3-iceberg"
]'), matrix.connector)
env:
GCP_SERVICE_ACCOUNT_KEY: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }}
run: CONNECTOR=${{ matrix.connector }} VERSION=local tests/materialize/run.sh;

Expand Down
6 changes: 4 additions & 2 deletions materialize-sqlserver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,10 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)
}
}

if _, err := batches[lastBinding].ExecContext(ctx); err != nil {
return nil, fmt.Errorf("store batch insert on %q: %w", d.bindings[lastBinding].tempStoreTableName, err)
if lastBinding != -1 {
if _, err := batches[lastBinding].ExecContext(ctx); err != nil {
return nil, fmt.Errorf("store batch insert on %q: %w", d.bindings[lastBinding].tempStoreTableName, err)
}
}

return func(ctx context.Context, runtimeCheckpoint *protocol.Checkpoint) (*pf.ConnectorState, m.OpFuture) {
Expand Down
8 changes: 8 additions & 0 deletions tests/materialize/fixture.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[
[],
[
["tests/simple", { "id": 1, "canary": "amputation's"}],
["tests/simple", { "id": 2, "canary": "armament's"}],
Expand Down Expand Up @@ -81,5 +82,12 @@
["tests/deletions", { "id": 4, "_meta": {"op": "d"} }],

["tests/binary-key", { "id": "c2F5xY1uYXJhCg==", "counter": 1 }]
],
[
["tests/duplicated-keys", { "id": 6, "int": 11, "str": "str 11"}],
["tests/duplicated-keys", { "id": 7, "int": 12, "str": "str 12"}],
["tests/duplicated-keys", { "id": 8, "int": 13, "str": "str 13"}],
["tests/duplicated-keys", { "id": 9, "int": 14, "str": "str 14"}],
["tests/duplicated-keys", { "id": 10, "int": 15, "str": "str 15"}]
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func main() {
os.Exit(0)
}

rows, err := db.Query(fmt.Sprintf(`SELECT * FROM %q.%q ORDER BY id`, schema, tables[0]))
rows, err := db.Query(fmt.Sprintf(`SELECT * FROM %q.%q ORDER BY id, flow_published_at`, schema, tables[0]))
if err != nil {
log.Fatal(fmt.Errorf("running query: %w", err))
}
Expand Down
Loading

0 comments on commit f715347

Please sign in to comment.