From eb7a1fcadb675c42a2a9b962df8a6902372a8a5b Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Tue, 25 Feb 2025 19:06:14 -0600 Subject: [PATCH] source-bigquery-batch: Option 'Discover Views' --- .../.snapshots/TestCaptureFromView-Capture | 45 ++++++ .../TestCaptureFromView-DiscoveryWithViews | 137 ++++++++++++++++++ .../TestCaptureFromView-DiscoveryWithoutViews | 69 +++++++++ source-bigquery-batch/.snapshots/TestSpec | 5 + source-bigquery-batch/driver.go | 4 +- source-bigquery-batch/main.go | 29 ++-- source-bigquery-batch/main_test.go | 83 ++++++++++- 7 files changed, 357 insertions(+), 15 deletions(-) create mode 100644 source-bigquery-batch/.snapshots/TestCaptureFromView-Capture create mode 100644 source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithViews create mode 100644 source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithoutViews diff --git a/source-bigquery-batch/.snapshots/TestCaptureFromView-Capture b/source-bigquery-batch/.snapshots/TestCaptureFromView-Capture new file mode 100644 index 000000000..917721419 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureFromView-Capture @@ -0,0 +1,45 @@ +# ================================ +# Collection "acmeCo/test/capturefromview_194890": 23 Documents +# ================================ +{"_meta":{"polled":"","index":0,"row_id":0},"id":0,"name":"Row 0","updated_at":"2025-02-13T12:00:00Z","visible":true} +{"_meta":{"polled":"","index":1,"row_id":1},"id":1,"name":"Row 1","updated_at":"2025-02-13T12:01:00Z","visible":false} +{"_meta":{"polled":"","index":2,"row_id":2},"id":2,"name":"Row 2","updated_at":"2025-02-13T12:02:00Z","visible":true} +{"_meta":{"polled":"","index":3,"row_id":3},"id":3,"name":"Row 3","updated_at":"2025-02-13T12:03:00Z","visible":false} +{"_meta":{"polled":"","index":4,"row_id":4},"id":4,"name":"Row 4","updated_at":"2025-02-13T12:04:00Z","visible":true} +{"_meta":{"polled":"","index":5,"row_id":5},"id":5,"name":"Row 5","updated_at":"2025-02-13T12:05:00Z","visible":false} +{"_meta":{"polled":"","index":6,"row_id":6},"id":6,"name":"Row 6","updated_at":"2025-02-13T12:06:00Z","visible":true} +{"_meta":{"polled":"","index":7,"row_id":7},"id":7,"name":"Row 7","updated_at":"2025-02-13T12:07:00Z","visible":false} +{"_meta":{"polled":"","index":8,"row_id":8},"id":8,"name":"Row 8","updated_at":"2025-02-13T12:08:00Z","visible":true} +{"_meta":{"polled":"","index":9,"row_id":9},"id":9,"name":"Row 9","updated_at":"2025-02-13T12:09:00Z","visible":false} +{"_meta":{"polled":"","index":0,"row_id":10},"id":10,"name":"Row 10","updated_at":"2025-02-13T12:10:00Z","visible":true} +{"_meta":{"polled":"","index":1,"row_id":11},"id":11,"name":"Row 11","updated_at":"2025-02-13T12:11:00Z","visible":false} +{"_meta":{"polled":"","index":2,"row_id":12},"id":12,"name":"Row 12","updated_at":"2025-02-13T12:12:00Z","visible":true} +{"_meta":{"polled":"","index":3,"row_id":13},"id":13,"name":"Row 13","updated_at":"2025-02-13T12:13:00Z","visible":false} +{"_meta":{"polled":"","index":4,"row_id":14},"id":14,"name":"Row 14","updated_at":"2025-02-13T12:14:00Z","visible":true} +{"_meta":{"polled":"","index":5,"row_id":15},"id":15,"name":"Row 15","updated_at":"2025-02-13T12:15:00Z","visible":false} +{"_meta":{"polled":"","index":6,"row_id":16},"id":16,"name":"Row 16","updated_at":"2025-02-13T12:16:00Z","visible":true} +{"_meta":{"polled":"","index":7,"row_id":17},"id":17,"name":"Row 17","updated_at":"2025-02-13T12:17:00Z","visible":false} +{"_meta":{"polled":"","index":8,"row_id":18},"id":18,"name":"Row 18","updated_at":"2025-02-13T12:18:00Z","visible":true} +{"_meta":{"polled":"","index":9,"row_id":19},"id":19,"name":"Row 19","updated_at":"2025-02-13T12:19:00Z","visible":false} +{"_meta":{"polled":"","index":0,"row_id":20},"id":3,"name":"Row 3","updated_at":"2025-02-13T12:20:00Z","visible":true} +{"_meta":{"polled":"","index":1,"row_id":21},"id":4,"name":"Row 4","updated_at":"2025-02-13T12:20:00Z","visible":false} +{"_meta":{"polled":"","index":2,"row_id":22},"id":2,"name":"Row 2","updated_at":"2025-02-13T12:20:00Z","visible":false} +# ================================ +# Collection "acmeCo/test/capturefromview_227836": 11 Documents +# ================================ +{"_meta":{"polled":"","index":0,"row_id":0},"id":0,"name":"Row 0","updated_at":"2025-02-13T12:00:00Z"} +{"_meta":{"polled":"","index":1,"row_id":1},"id":2,"name":"Row 2","updated_at":"2025-02-13T12:02:00Z"} +{"_meta":{"polled":"","index":2,"row_id":2},"id":4,"name":"Row 4","updated_at":"2025-02-13T12:04:00Z"} +{"_meta":{"polled":"","index":3,"row_id":3},"id":6,"name":"Row 6","updated_at":"2025-02-13T12:06:00Z"} +{"_meta":{"polled":"","index":4,"row_id":4},"id":8,"name":"Row 8","updated_at":"2025-02-13T12:08:00Z"} +{"_meta":{"polled":"","index":0,"row_id":5},"id":10,"name":"Row 10","updated_at":"2025-02-13T12:10:00Z"} +{"_meta":{"polled":"","index":1,"row_id":6},"id":12,"name":"Row 12","updated_at":"2025-02-13T12:12:00Z"} +{"_meta":{"polled":"","index":2,"row_id":7},"id":14,"name":"Row 14","updated_at":"2025-02-13T12:14:00Z"} +{"_meta":{"polled":"","index":3,"row_id":8},"id":16,"name":"Row 16","updated_at":"2025-02-13T12:16:00Z"} +{"_meta":{"polled":"","index":4,"row_id":9},"id":18,"name":"Row 18","updated_at":"2025-02-13T12:18:00Z"} +{"_meta":{"polled":"","index":0,"row_id":10},"id":3,"name":"Row 3","updated_at":"2025-02-13T12:20:00Z"} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"capturefromview_194890":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:20:00Z"],"DocumentCount":23,"LastPolled":""},"capturefromview_227836":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:20:00Z"],"DocumentCount":11,"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithViews b/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithViews new file mode 100644 index 000000000..72993b328 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithViews @@ -0,0 +1,137 @@ +Binding 0: +{ + "resource_config_json": { + "name": "capturefromview_194890", + "schema": "testdata", + "table": "capturefromview_194890" + }, + "resource_path": [ + "capturefromview_194890" + ], + "collection": { + "name": "acmeCo/test/capturefromview_194890", + "read_schema_json": { + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" + } + }, + "type": "object", + "required": [ + "polled", + "index", + "row_id" + ] + } + }, + "x-infer-schema": true + }, + "key": [ + "/_meta/polled", + "/_meta/index" + ], + "projections": null + }, + "state_key": "capturefromview_194890" + } +Binding 1: +{ + "resource_config_json": { + "name": "capturefromview_227836", + "schema": "testdata", + "table": "capturefromview_227836" + }, + "resource_path": [ + "capturefromview_227836" + ], + "collection": { + "name": "acmeCo/test/capturefromview_227836", + "read_schema_json": { + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" + } + }, + "type": "object", + "required": [ + "polled", + "index", + "row_id" + ] + } + }, + "x-infer-schema": true + }, + "key": [ + "/_meta/polled", + "/_meta/index" + ], + "projections": null + }, + "state_key": "capturefromview_227836" + } + diff --git a/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithoutViews b/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithoutViews new file mode 100644 index 000000000..9880da201 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureFromView-DiscoveryWithoutViews @@ -0,0 +1,69 @@ +Binding 0: +{ + "resource_config_json": { + "name": "capturefromview_194890", + "schema": "testdata", + "table": "capturefromview_194890" + }, + "resource_path": [ + "capturefromview_194890" + ], + "collection": { + "name": "acmeCo/test/capturefromview_194890", + "read_schema_json": { + "type": "object", + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + }, + "row_id": { + "type": "integer", + "title": "Row ID", + "description": "Row ID of the Document" + }, + "op": { + "type": "string", + "enum": [ + "c", + "u", + "d" + ], + "title": "Change Operation", + "description": "Operation type (c: Create / u: Update / d: Delete)", + "default": "u" + } + }, + "type": "object", + "required": [ + "polled", + "index", + "row_id" + ] + } + }, + "x-infer-schema": true + }, + "key": [ + "/_meta/polled", + "/_meta/index" + ], + "projections": null + }, + "state_key": "capturefromview_194890" + } + diff --git a/source-bigquery-batch/.snapshots/TestSpec b/source-bigquery-batch/.snapshots/TestSpec index 26e6e9c27..da9d3d11b 100644 --- a/source-bigquery-batch/.snapshots/TestSpec +++ b/source-bigquery-batch/.snapshots/TestSpec @@ -25,6 +25,11 @@ }, "advanced": { "properties": { + "discover_views": { + "type": "boolean", + "title": "Discover Views", + "description": "When set views will be automatically discovered as resources. If unset only tables will be discovered." + }, "poll": { "type": "string", "title": "Default Polling Schedule", diff --git a/source-bigquery-batch/driver.go b/source-bigquery-batch/driver.go index 5e257ad3d..220d3434a 100644 --- a/source-bigquery-batch/driver.go +++ b/source-bigquery-batch/driver.go @@ -63,7 +63,7 @@ type BatchSQLDriver struct { ConfigSchema json.RawMessage Connect func(ctx context.Context, cfg *Config) (*bigquery.Client, error) - GenerateResource func(resourceName, schemaName, tableName, tableType string) (*Resource, error) + GenerateResource func(cfg *Config, resourceName, schemaName, tableName, tableType string) (*Resource, error) ExcludedSystemSchemas []string SelectQueryTemplate func(res *Resource) (string, error) } @@ -238,7 +238,7 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove var tableID = table.Schema + "." + table.Name var recommendedName = recommendedCatalogName(table.Name) - var res, err = drv.GenerateResource(recommendedName, table.Schema, table.Name, table.Type) + var res, err = drv.GenerateResource(&cfg, recommendedName, table.Schema, table.Name, table.Type) if err != nil { log.WithFields(log.Fields{ "reason": err, diff --git a/source-bigquery-batch/main.go b/source-bigquery-batch/main.go index 9f00d7f4a..46e65d668 100644 --- a/source-bigquery-batch/main.go +++ b/source-bigquery-batch/main.go @@ -35,8 +35,9 @@ type Config struct { } type advancedConfig struct { - PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset." jsonschema_extras:"pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"` - FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."` + DiscoverViews bool `json:"discover_views,omitempty" jsonschema:"title=Discover Views,description=When set views will be automatically discovered as resources. If unset only tables will be discovered."` + PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset." jsonschema_extras:"pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"` + FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."` parsedFeatureFlags map[string]bool // Parsed feature flags setting with defaults applied } @@ -168,16 +169,22 @@ func quoteIdentifier(name string) string { return "`" + strings.ReplaceAll(name, "`", "\\`") + "`" } -func generateBigQueryResource(resourceName, schemaName, tableName, tableType string) (*Resource, error) { - if !strings.EqualFold(tableType, "BASE TABLE") { - return nil, fmt.Errorf("discovery will not autogenerate resource configs for entities of type %q, but you may add them manually", tableType) +func generateBigQueryResource(cfg *Config, resourceName, schemaName, tableName, tableType string) (*Resource, error) { + if strings.EqualFold(tableType, "BASE TABLE") { + return &Resource{ + Name: resourceName, + SchemaName: schemaName, + TableName: tableName, + }, nil } - - return &Resource{ - Name: resourceName, - SchemaName: schemaName, - TableName: tableName, - }, nil + if strings.EqualFold(tableType, "VIEW") && cfg.Advanced.DiscoverViews { + return &Resource{ + Name: resourceName, + SchemaName: schemaName, + TableName: tableName, + }, nil + } + return nil, fmt.Errorf("unsupported entity type %q", tableType) } var bigqueryDriver = &BatchSQLDriver{ diff --git a/source-bigquery-batch/main_test.go b/source-bigquery-batch/main_test.go index 723f1870b..65d9dfac6 100644 --- a/source-bigquery-batch/main_test.go +++ b/source-bigquery-batch/main_test.go @@ -260,8 +260,7 @@ func TestSpec(t *testing.T) { // TestQueryTemplate is a unit test which verifies that the default query template produces // the expected output for initial/subsequent polling queries with different cursors. func TestQueryTemplate(t *testing.T) { - res, err := bigqueryDriver.GenerateResource("foobar", "testdata", "foobar", "BASE TABLE") - require.NoError(t, err) + var res = &Resource{Name: "foobar", SchemaName: "testdata", TableName: "foobar"} tmplString, err := bigqueryDriver.SelectQueryTemplate(res) require.NoError(t, err) @@ -1224,3 +1223,83 @@ func TestFeatureFlagKeylessRowID(t *testing.T) { }) } } + +// TestCaptureFromView exercises discovery and capture from a view with an updated_at cursor. +func TestCaptureFromView(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var baseTableName, tableID = testTableName(t, uniqueTableID(t)) + var viewName, viewID = testTableName(t, uniqueTableID(t, "view")) + + // Create base table and view + createTestTable(ctx, t, control, baseTableName, `( + id INTEGER NOT NULL, + name STRING, + visible BOOL, + updated_at TIMESTAMP + )`) + require.NoError(t, executeSetupQuery(ctx, t, control, fmt.Sprintf(` + CREATE VIEW %s AS + SELECT id, name, updated_at + FROM %s + WHERE visible = true`, viewName, baseTableName))) + t.Cleanup(func() { + _ = executeSetupQuery(ctx, t, control, fmt.Sprintf("DROP VIEW IF EXISTS %s", viewName)) + }) + + // By default views should not be discovered. + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(tableID), regexp.MustCompile(viewID)) + t.Run("DiscoveryWithoutViews", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + // Enable view discovery and re-discover bindings, then set a cursor for capturing the view. + cs.EndpointSpec.(*Config).Advanced.DiscoverViews = true + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(tableID), regexp.MustCompile(viewID)) + t.Run("DiscoveryWithViews", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + // Update both the base table and the view incrementally using the updated_at column. + setCursorColumns(t, cs.Bindings[0], "updated_at") + setCursorColumns(t, cs.Bindings[1], "updated_at") + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + baseTime := time.Date(2025, 2, 13, 12, 0, 0, 0, time.UTC) + + // First batch: Insert rows into base table, some visible in view + var firstBatch [][]any + for i := 0; i < 10; i++ { + firstBatch = append(firstBatch, []any{ + i, + fmt.Sprintf("Row %d", i), + i%2 == 0, // Even numbered rows are visible + baseTime.Add(time.Duration(i) * time.Minute), + }) + } + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (id, name, visible, updated_at) VALUES (@p0, @p1, @p2, @p3)", baseTableName), + firstBatch)) + cs.Capture(ctx, t, nil) + + // Second batch: More rows with later timestamps + var secondBatch [][]any + for i := 10; i < 20; i++ { + secondBatch = append(secondBatch, []any{ + i, + fmt.Sprintf("Row %d", i), + i%2 == 0, // Even numbered rows are visible + baseTime.Add(time.Duration(i) * time.Minute), + }) + } + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s (id, name, visible, updated_at) VALUES (@p0, @p1, @p2, @p3)", baseTableName), + secondBatch)) + cs.Capture(ctx, t, nil) + + // Update some rows to change their visibility + updateTime := baseTime.Add(20 * time.Minute) + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("UPDATE %s SET visible = NOT visible, updated_at = @p0 WHERE id IN (@p1, @p2, @p3)", baseTableName), + [][]any{{updateTime, 2, 3, 4}})) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) +}