diff --git a/source-bigquery-batch/.snapshots/TestBinaryTypes-Capture b/source-bigquery-batch/.snapshots/TestBinaryTypes-Capture new file mode 100644 index 0000000000..117e93d3c3 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestBinaryTypes-Capture @@ -0,0 +1,14 @@ +# ================================ +# Collection "acmeCo/test/binarytypes_537491": 6 Documents +# ================================ +{"_meta":{"polled":"","index":999},"bool_val":true,"bytes_val":"aGVsbG8gd29ybGQ=","id":1} +{"_meta":{"polled":"","index":999},"bool_val":false,"bytes_val":"AAECAw==","id":2} +{"_meta":{"polled":"","index":999},"bool_val":null,"bytes_val":null,"id":3} +{"_meta":{"polled":"","index":999},"bool_val":true,"bytes_val":null,"id":4} +{"_meta":{"polled":"","index":999},"bool_val":false,"bytes_val":"qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqg==","id":5} +{"_meta":{"polled":"","index":999},"bool_val":true,"bytes_val":"SGVsbG8sIOS4lueVjCE=","id":6} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"binarytypes_537491":{"CursorNames":["id"],"CursorValues":[6],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery b/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery new file mode 100644 index 0000000000..64410793c1 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "binarytypes_537491", + "schema": "testdata", + "table": "binarytypes_537491", + "cursor": [ + "id" + ] + }, + "resource_path": [ + "binarytypes_537491" + ], + "collection": { + "name": "acmeCo/test/binarytypes_537491", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "binarytypes_537491" + } + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Capture b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Capture new file mode 100644 index 0000000000..faac47173c --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Capture @@ -0,0 +1,18 @@ +# ================================ +# Collection "acmeCo/test/capturewithdatetimecursor_877736": 10 Documents +# ================================ +{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00.000000"} +{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00.000000"} +{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00.000000"} +{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00.000000"} +{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00.000000"} +{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00.000000"} +{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00.000000"} +{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00.000000"} +{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00.000000"} +{"_meta":{"polled":"","index":999},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00.000000"} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"capturewithdatetimecursor_877736":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00.000000"],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery new file mode 100644 index 0000000000..c04a2060b4 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "capturewithdatetimecursor_877736", + "schema": "testdata", + "table": "capturewithdatetimecursor_877736", + "cursor": [ + "updated_at" + ] + }, + "resource_path": [ + "capturewithdatetimecursor_877736" + ], + "collection": { + "name": "acmeCo/test/capturewithdatetimecursor_877736", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "capturewithdatetimecursor_877736" + } + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Capture b/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Capture new file mode 100644 index 0000000000..ae9c0d67b0 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Capture @@ -0,0 +1,18 @@ +# ================================ +# Collection "acmeCo/test/capturewithemptypoll_890703": 10 Documents +# ================================ +{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00Z"} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"capturewithemptypoll_890703":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00Z"],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Discovery new file mode 100644 index 0000000000..58d00e01d5 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithEmptyPoll-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "capturewithemptypoll_890703", + "schema": "testdata", + "table": "capturewithemptypoll_890703", + "cursor": [ + "updated_at" + ] + }, + "resource_path": [ + "capturewithemptypoll_890703" + ], + "collection": { + "name": "acmeCo/test/capturewithemptypoll_890703", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "capturewithemptypoll_890703" + } + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Capture b/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Capture new file mode 100644 index 0000000000..239a6c891f --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Capture @@ -0,0 +1,18 @@ +# ================================ +# Collection "acmeCo/test/capturewithmodifications_786099": 10 Documents +# ================================ +{"_meta":{"polled":"","index":999},"data":"Initial value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} +{"_meta":{"polled":"","index":999},"data":"Initial value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} +{"_meta":{"polled":"","index":999},"data":"Initial value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} +{"_meta":{"polled":"","index":999},"data":"Initial value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} +{"_meta":{"polled":"","index":999},"data":"Initial value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} +{"_meta":{"polled":"","index":999},"data":"Modified value for row 3","id":3,"updated_at":"2025-02-13T12:15:00Z"} +{"_meta":{"polled":"","index":999},"data":"Modified value for row 4","id":4,"updated_at":"2025-02-13T12:16:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:20:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:21:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:22:00Z"} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"capturewithmodifications_786099":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:22:00Z"],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Discovery new file mode 100644 index 0000000000..d5e972743b --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithModifications-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "capturewithmodifications_786099", + "schema": "testdata", + "table": "capturewithmodifications_786099", + "cursor": [ + "updated_at" + ] + }, + "resource_path": [ + "capturewithmodifications_786099" + ], + "collection": { + "name": "acmeCo/test/capturewithmodifications_786099", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "capturewithmodifications_786099" + } + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture1 b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture1 new file mode 100644 index 0000000000..033e59308c --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture1 @@ -0,0 +1,13 @@ +# ================================ +# Collection "acmeCo/test/capturewithnullcursor_662607": 5 Documents +# ================================ +{"_meta":{"polled":"","index":999},"data":"Another NULL cursor","id":2,"sort_col":null} +{"_meta":{"polled":"","index":999},"data":"Third NULL cursor","id":4,"sort_col":null} +{"_meta":{"polled":"","index":999},"data":"Value with NULL cursor","id":0,"sort_col":null} +{"_meta":{"polled":"","index":999},"data":"Value with cursor 10","id":1,"sort_col":10} +{"_meta":{"polled":"","index":999},"data":"Value with cursor 20","id":3,"sort_col":20} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"capturewithnullcursor_662607":{"CursorNames":["sort_col"],"CursorValues":[20],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture2 b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture2 new file mode 100644 index 0000000000..eff1455dc6 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Capture2 @@ -0,0 +1,10 @@ +# ================================ +# Collection "acmeCo/test/capturewithnullcursor_662607": 2 Documents +# ================================ +{"_meta":{"polled":"","index":999},"data":"Final value cursor 30","id":9,"sort_col":30} +{"_meta":{"polled":"","index":999},"data":"Value with cursor 25","id":7,"sort_col":25} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"capturewithnullcursor_662607":{"CursorNames":["sort_col"],"CursorValues":[30],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Discovery new file mode 100644 index 0000000000..3cacc6ed0f --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithNullCursor-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "capturewithnullcursor_662607", + "schema": "testdata", + "table": "capturewithnullcursor_662607", + "cursor": [ + "sort_col" + ] + }, + "resource_path": [ + "capturewithnullcursor_662607" + ], + "collection": { + "name": "acmeCo/test/capturewithnullcursor_662607", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "capturewithnullcursor_662607" + } + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Capture b/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Capture new file mode 100644 index 0000000000..fab387e5c1 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Capture @@ -0,0 +1,20 @@ +# ================================ +# Collection "acmeCo/test/capturewithtwocolumncursor_321285": 12 Documents +# ================================ +{"_meta":{"polled":"","index":999},"col1":1,"col2":1,"data":"Value for row 0","id":0} +{"_meta":{"polled":"","index":999},"col1":1,"col2":2,"data":"Value for row 1","id":1} +{"_meta":{"polled":"","index":999},"col1":1,"col2":3,"data":"Value for row 2","id":2} +{"_meta":{"polled":"","index":999},"col1":2,"col2":1,"data":"Value for row 3","id":3} +{"_meta":{"polled":"","index":999},"col1":2,"col2":2,"data":"Value for row 4","id":4} +{"_meta":{"polled":"","index":999},"col1":2,"col2":3,"data":"Value for row 5","id":5} +{"_meta":{"polled":"","index":999},"col1":3,"col2":1,"data":"Value for row 8","id":8} +{"_meta":{"polled":"","index":999},"col1":3,"col2":2,"data":"Value for row 9","id":9} +{"_meta":{"polled":"","index":999},"col1":3,"col2":3,"data":"Value for row 10","id":10} +{"_meta":{"polled":"","index":999},"col1":4,"col2":1,"data":"Value for row 11","id":11} +{"_meta":{"polled":"","index":999},"col1":4,"col2":2,"data":"Value for row 12","id":12} +{"_meta":{"polled":"","index":999},"col1":4,"col2":3,"data":"Value for row 13","id":13} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"capturewithtwocolumncursor_321285":{"CursorNames":["col1","col2"],"CursorValues":[4,3],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Discovery new file mode 100644 index 0000000000..0ffa4330f0 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithTwoColumnCursor-Discovery @@ -0,0 +1,59 @@ +Binding 0: +{ + "resource_config_json": { + "name": "capturewithtwocolumncursor_321285", + "schema": "testdata", + "table": "capturewithtwocolumncursor_321285", + "cursor": [ + "col1", + "col2" + ] + }, + "resource_path": [ + "capturewithtwocolumncursor_321285" + ], + "collection": { + "name": "acmeCo/test/capturewithtwocolumncursor_321285", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "capturewithtwocolumncursor_321285" + } + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Capture b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Capture new file mode 100644 index 0000000000..b4be37a09b --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Capture @@ -0,0 +1,18 @@ +# ================================ +# Collection "acmeCo/test/capturewithupdatedatcursor_792371": 10 Documents +# ================================ +{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00Z"} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"capturewithupdatedatcursor_792371":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00Z"],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery new file mode 100644 index 0000000000..cb2c8da87a --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "capturewithupdatedatcursor_792371", + "schema": "testdata", + "table": "capturewithupdatedatcursor_792371", + "cursor": [ + "updated_at" + ] + }, + "resource_path": [ + "capturewithupdatedatcursor_792371" + ], + "collection": { + "name": "acmeCo/test/capturewithupdatedatcursor_792371", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "capturewithupdatedatcursor_792371" + } + diff --git a/source-bigquery-batch/.snapshots/TestCompositeTypes-Capture b/source-bigquery-batch/.snapshots/TestCompositeTypes-Capture new file mode 100644 index 0000000000..178d51f206 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCompositeTypes-Capture @@ -0,0 +1,11 @@ +# ================================ +# Collection "acmeCo/test/compositetypes_483529": 3 Documents +# ================================ +{"_meta":{"polled":"","index":999},"array_struct":[[1,"first"],[2,"second"]],"id":1,"int_array":[1,2,3],"string_array":["a","b","c"],"struct_val":["Alice",25,[90.5,85,92.3]]} +{"_meta":{"polled":"","index":999},"array_struct":null,"id":2,"int_array":null,"string_array":null,"struct_val":["Bob",30,null]} +{"_meta":{"polled":"","index":999},"array_struct":null,"id":3,"int_array":null,"string_array":null,"struct_val":null} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"compositetypes_483529":{"CursorNames":["id"],"CursorValues":[3],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery b/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery new file mode 100644 index 0000000000..c639196250 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "compositetypes_483529", + "schema": "testdata", + "table": "compositetypes_483529", + "cursor": [ + "id" + ] + }, + "resource_path": [ + "compositetypes_483529" + ], + "collection": { + "name": "acmeCo/test/compositetypes_483529", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "compositetypes_483529" + } + diff --git a/source-bigquery-batch/.snapshots/TestDatetimeCursor-Capture b/source-bigquery-batch/.snapshots/TestDatetimeCursor-Capture deleted file mode 100644 index 9b04811c44..0000000000 --- a/source-bigquery-batch/.snapshots/TestDatetimeCursor-Capture +++ /dev/null @@ -1,11 +0,0 @@ -# ================================ -# Collection "acmeCo/test/datetime_cursor_132448": 3 Documents -# ================================ -{"_meta":{"polled":"","index":999},"data":"Value for row \"2023-08-10T07:54:54.123\"","id":"2023-08-10T07:54:54.123000"} -{"_meta":{"polled":"","index":999},"data":"Value for row \"2024-10-23T03:22:31.456\"","id":"2024-10-23T03:22:31.456000"} -{"_meta":{"polled":"","index":999},"data":"Value for row \"2024-10-23T03:23:00.789\"","id":"2024-10-23T03:23:00.789000"} -# ================================ -# Final State Checkpoint -# ================================ -{"bindingStateV1":{"datetime_cursor_132448":{"CursorNames":["id"],"CursorValues":["2024-10-23T03:23:00.789000"],"LastPolled":""}}} - diff --git a/source-bigquery-batch/.snapshots/TestFullRefresh-Capture1 b/source-bigquery-batch/.snapshots/TestFullRefresh-Capture1 new file mode 100644 index 0000000000..660f21e432 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestFullRefresh-Capture1 @@ -0,0 +1,12 @@ +# ================================ +# Collection "acmeCo/test/fullrefresh_902536": 4 Documents +# ================================ +{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0} +{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1} +{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2} +{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"fullrefresh_902536":{"CursorNames":null,"CursorValues":null,"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestBasicCapture-Capture b/source-bigquery-batch/.snapshots/TestFullRefresh-Capture2 similarity index 70% rename from source-bigquery-batch/.snapshots/TestBasicCapture-Capture rename to source-bigquery-batch/.snapshots/TestFullRefresh-Capture2 index bbca97f8ca..c4faa2476b 100644 --- a/source-bigquery-batch/.snapshots/TestBasicCapture-Capture +++ b/source-bigquery-batch/.snapshots/TestFullRefresh-Capture2 @@ -1,5 +1,5 @@ # ================================ -# Collection "acmeCo/test/basic_capture_826935": 10 Documents +# Collection "acmeCo/test/fullrefresh_902536": 8 Documents # ================================ {"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0} {"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1} @@ -9,10 +9,8 @@ {"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5} {"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6} {"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7} -{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8} -{"_meta":{"polled":"","index":999},"data":"Value for row 9","id":9} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"basic_capture_826935":{"CursorNames":["id"],"CursorValues":[9],"LastPolled":""}}} +{"bindingStateV1":{"fullrefresh_902536":{"LastPolled":""}}} diff --git a/source-bigquery-batch/.snapshots/TestBasicCapture-Discovery b/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery similarity index 61% rename from source-bigquery-batch/.snapshots/TestBasicCapture-Discovery rename to source-bigquery-batch/.snapshots/TestFullRefresh-Discovery index 7c35d378b4..51b8e6eebe 100644 --- a/source-bigquery-batch/.snapshots/TestBasicCapture-Discovery +++ b/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery @@ -1,14 +1,15 @@ Binding 0: { "resource_config_json": { - "name": "basic_capture_826935", - "template": "{{/* Default query template which adapts to cursor field selection */}}\n{{- if not .CursorFields -}}\n SELECT * FROM `testdata`.`basic_capture_826935`;\n{{- else -}}\n SELECT * FROM `testdata`.`basic_capture_826935`\n {{- if not .IsFirstQuery -}}\n\t{{- range $i, $k := $.CursorFields -}}\n\t {{- if eq $i 0}} WHERE ({{else}}) OR ({{end -}}\n {{- range $j, $n := $.CursorFields -}}\n\t\t{{- if lt $j $i -}}\n\t\t {{$n}} = @p{{$j}} AND {{end -}}\n\t {{- end -}}\n\t {{$k}} \u003e @p{{$i}}\n\t{{- end -}})\n {{- end}}\n ORDER BY {{range $i, $k := $.CursorFields}}{{if gt $i 0}}, {{end}}{{$k}}{{end -}};\n{{- end}}" + "name": "fullrefresh_902536", + "schema": "testdata", + "table": "fullrefresh_902536" }, "resource_path": [ - "basic_capture_826935" + "fullrefresh_902536" ], "collection": { - "name": "acmeCo/test/basic_capture_826935", + "name": "acmeCo/test/fullrefresh_902536", "read_schema_json": { "type": "object", "required": [ @@ -49,6 +50,6 @@ Binding 0: ], "projections": null }, - "state_key": "basic_capture_826935" + "state_key": "fullrefresh_902536" } diff --git a/source-bigquery-batch/.snapshots/TestIntegerTypes-Capture b/source-bigquery-batch/.snapshots/TestIntegerTypes-Capture new file mode 100644 index 0000000000..e51173e80f --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestIntegerTypes-Capture @@ -0,0 +1,13 @@ +# ================================ +# Collection "acmeCo/test/integertypes_795898": 5 Documents +# ================================ +{"_meta":{"polled":"","index":999},"bigint_val":-9223372036854775808,"byteint_val":-9223372036854775808,"id":1,"int_val":-9223372036854775808,"integer_val":-9223372036854775808,"smallint_val":-9223372036854775808,"tinyint_val":-9223372036854775808} +{"_meta":{"polled":"","index":999},"bigint_val":9223372036854775807,"byteint_val":9223372036854775807,"id":2,"int_val":9223372036854775807,"integer_val":9223372036854775807,"smallint_val":9223372036854775807,"tinyint_val":9223372036854775807} +{"_meta":{"polled":"","index":999},"bigint_val":0,"byteint_val":0,"id":3,"int_val":0,"integer_val":0,"smallint_val":0,"tinyint_val":0} +{"_meta":{"polled":"","index":999},"bigint_val":null,"byteint_val":null,"id":4,"int_val":null,"integer_val":null,"smallint_val":null,"tinyint_val":null} +{"_meta":{"polled":"","index":999},"bigint_val":789,"byteint_val":34,"id":5,"int_val":42,"integer_val":-456,"smallint_val":123,"tinyint_val":-12} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"integertypes_795898":{"CursorNames":["id"],"CursorValues":[5],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery b/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery new file mode 100644 index 0000000000..2de58074d5 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "integertypes_795898", + "schema": "testdata", + "table": "integertypes_795898", + "cursor": [ + "id" + ] + }, + "resource_path": [ + "integertypes_795898" + ], + "collection": { + "name": "acmeCo/test/integertypes_795898", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "integertypes_795898" + } + diff --git a/source-bigquery-batch/.snapshots/TestJSONType-Capture b/source-bigquery-batch/.snapshots/TestJSONType-Capture new file mode 100644 index 0000000000..24d2591be9 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestJSONType-Capture @@ -0,0 +1,16 @@ +# ================================ +# Collection "acmeCo/test/jsontype_519262": 8 Documents +# ================================ +{"_meta":{"polled":"","index":999},"id":1,"json_val":{"age":30,"name":"Alice"}} +{"_meta":{"polled":"","index":999},"id":2,"json_val":[1,2,3,"four",true,null]} +{"_meta":{"polled":"","index":999},"id":3,"json_val":{"orders":[{"id":1,"items":["apple","banana"]},{"id":2,"items":["orange"]}],"user":{"email":"bob@example.com","name":"Bob"}}} +{"_meta":{"polled":"","index":999},"id":4,"json_val":null} +{"_meta":{"polled":"","index":999},"id":5,"json_val":null} +{"_meta":{"polled":"","index":999},"id":6,"json_val":{}} +{"_meta":{"polled":"","index":999},"id":7,"json_val":[]} +{"_meta":{"polled":"","index":999},"id":8,"json_val":{"message":"Hello, 世界!\nNew line\"Quotes\"\\Backslash"}} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"jsontype_519262":{"CursorNames":["id"],"CursorValues":[8],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestJSONType-Discovery b/source-bigquery-batch/.snapshots/TestJSONType-Discovery new file mode 100644 index 0000000000..aa38f09f59 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestJSONType-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "jsontype_519262", + "schema": "testdata", + "table": "jsontype_519262", + "cursor": [ + "id" + ] + }, + "resource_path": [ + "jsontype_519262" + ], + "collection": { + "name": "acmeCo/test/jsontype_519262", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "jsontype_519262" + } + diff --git a/source-bigquery-batch/.snapshots/TestNumericTypes-Capture b/source-bigquery-batch/.snapshots/TestNumericTypes-Capture new file mode 100644 index 0000000000..6ae3aeb965 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestNumericTypes-Capture @@ -0,0 +1,13 @@ +# ================================ +# Collection "acmeCo/test/numerictypes_559424": 5 Documents +# ================================ +{"_meta":{"polled":"","index":999},"bigdecimal_val":"99999999999999999999999999999999999999/1000000000","bignumeric_val":"99999999999999999999999999999999999999/1000000000","decimal_val":"99999999999999999999999999999999999999/1000000000","float64_val":1.7976931348623157e+308,"id":1,"numeric_val":"99999999999999999999999999999999999999/1000000000"} +{"_meta":{"polled":"","index":999},"bigdecimal_val":"-99999999999999999999999999999999999999/1000000000","bignumeric_val":"-99999999999999999999999999999999999999/1000000000","decimal_val":"-99999999999999999999999999999999999999/1000000000","float64_val":-1.7976931348623157e+308,"id":2,"numeric_val":"-99999999999999999999999999999999999999/1000000000"} +{"_meta":{"polled":"","index":999},"bigdecimal_val":"0","bignumeric_val":"0","decimal_val":"0","float64_val":0,"id":3,"numeric_val":"0"} +{"_meta":{"polled":"","index":999},"bigdecimal_val":"5632716/625","bignumeric_val":"6172839/5000","decimal_val":"197253/250","float64_val":3.14159,"id":4,"numeric_val":"15432/125"} +{"_meta":{"polled":"","index":999},"bigdecimal_val":"10000","bignumeric_val":"1000","decimal_val":"100","float64_val":12345,"id":5,"numeric_val":"42"} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"numerictypes_559424":{"CursorNames":["id"],"CursorValues":[5],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery b/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery new file mode 100644 index 0000000000..dcaa9b2d8b --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "numerictypes_559424", + "schema": "testdata", + "table": "numerictypes_559424", + "cursor": [ + "id" + ] + }, + "resource_path": [ + "numerictypes_559424" + ], + "collection": { + "name": "acmeCo/test/numerictypes_559424", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "numerictypes_559424" + } + diff --git a/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Capture b/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Capture new file mode 100644 index 0000000000..73972489f4 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Capture @@ -0,0 +1,18 @@ +# ================================ +# Collection "acmeCo/test/querytemplateoverride_638679": 10 Documents +# ================================ +{"_meta":{"polled":"","index":999},"data":"Value for row 0","id":0,"updated_at":"2025-02-13T12:00:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1,"updated_at":"2025-02-13T12:01:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2,"updated_at":"2025-02-13T12:02:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3,"updated_at":"2025-02-13T12:03:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4,"updated_at":"2025-02-13T12:04:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5,"updated_at":"2025-02-13T12:10:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6,"updated_at":"2025-02-13T12:11:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7,"updated_at":"2025-02-13T12:12:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8,"updated_at":"2025-02-13T12:13:00Z"} +{"_meta":{"polled":"","index":999},"data":"Value for row 9","id":9,"updated_at":"2025-02-13T12:14:00Z"} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"querytemplateoverride_638679":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13T12:14:00Z"],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Discovery b/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Discovery new file mode 100644 index 0000000000..77e37c3f86 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestQueryTemplateOverride-Discovery @@ -0,0 +1,57 @@ +Binding 0: +{ + "resource_config_json": { + "name": "query_template_override", + "cursor": [ + "updated_at" + ], + "template": "SELECT * FROM testdata.querytemplateoverride_638679 {{if not .IsFirstQuery}} WHERE updated_at \u003e @p0 {{end}} ORDER BY updated_at" + }, + "resource_path": [ + "querytemplateoverride_638679" + ], + "collection": { + "name": "acmeCo/test/querytemplateoverride_638679", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "querytemplateoverride_638679" + } + diff --git a/source-bigquery-batch/.snapshots/TestSimpleCapture-Capture b/source-bigquery-batch/.snapshots/TestSimpleCapture-Capture new file mode 100644 index 0000000000..9999ea6c69 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestSimpleCapture-Capture @@ -0,0 +1,16 @@ +# ================================ +# Collection "acmeCo/test/simplecapture_140272": 8 Documents +# ================================ +{"_meta":{"polled":"","index":999},"data":"Value for row 1","id":1} +{"_meta":{"polled":"","index":999},"data":"Value for row 2","id":2} +{"_meta":{"polled":"","index":999},"data":"Value for row 3","id":3} +{"_meta":{"polled":"","index":999},"data":"Value for row 4","id":4} +{"_meta":{"polled":"","index":999},"data":"Value for row 5","id":5} +{"_meta":{"polled":"","index":999},"data":"Value for row 6","id":6} +{"_meta":{"polled":"","index":999},"data":"Value for row 7","id":7} +{"_meta":{"polled":"","index":999},"data":"Value for row 8","id":8} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"simplecapture_140272":{"CursorNames":["id"],"CursorValues":[8],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestDatetimeCursor-Discovery b/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery similarity index 57% rename from source-bigquery-batch/.snapshots/TestDatetimeCursor-Discovery rename to source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery index 52e05fa0cc..52a9c52966 100644 --- a/source-bigquery-batch/.snapshots/TestDatetimeCursor-Discovery +++ b/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery @@ -1,18 +1,23 @@ Binding 0: { "resource_config_json": { - "name": "datetime_cursor_132448", - "template": "{{/* Default query template which adapts to cursor field selection */}}\n{{- if not .CursorFields -}}\n SELECT * FROM `testdata`.`datetime_cursor_132448`;\n{{- else -}}\n SELECT * FROM `testdata`.`datetime_cursor_132448`\n {{- if not .IsFirstQuery -}}\n\t{{- range $i, $k := $.CursorFields -}}\n\t {{- if eq $i 0}} WHERE ({{else}}) OR ({{end -}}\n {{- range $j, $n := $.CursorFields -}}\n\t\t{{- if lt $j $i -}}\n\t\t {{$n}} = @p{{$j}} AND {{end -}}\n\t {{- end -}}\n\t {{$k}} \u003e @p{{$i}}\n\t{{- end -}})\n {{- end}}\n ORDER BY {{range $i, $k := $.CursorFields}}{{if gt $i 0}}, {{end}}{{$k}}{{end -}};\n{{- end}}" + "name": "simplecapture_140272", + "schema": "testdata", + "table": "simplecapture_140272", + "cursor": [ + "id" + ] }, "resource_path": [ - "datetime_cursor_132448" + "simplecapture_140272" ], "collection": { - "name": "acmeCo/test/datetime_cursor_132448", + "name": "acmeCo/test/simplecapture_140272", "read_schema_json": { "type": "object", "required": [ - "_meta" + "_meta", + "id" ], "properties": { "_meta": { @@ -36,16 +41,18 @@ Binding 0: "polled", "index" ] + }, + "id": { + "type": "integer" } }, "x-infer-schema": true }, "key": [ - "/_meta/polled", - "/_meta/index" + "/id" ], "projections": null }, - "state_key": "datetime_cursor_132448" + "state_key": "simplecapture_140272" } diff --git a/source-bigquery-batch/.snapshots/TestSpec b/source-bigquery-batch/.snapshots/TestSpec index 3b30751260..f7dbdb8616 100644 --- a/source-bigquery-batch/.snapshots/TestSpec +++ b/source-bigquery-batch/.snapshots/TestSpec @@ -56,12 +56,17 @@ "description": "The unique name of this resource.", "order": 0 }, - "template": { + "schema": { "type": "string", - "title": "Query Template", - "description": "The query template (pkg.go.dev/text/template) which will be rendered and then executed.", - "multiline": true, - "order": 3 + "title": "Schema Name", + "description": "The name of the schema in which the captured table lives. The query template must be overridden if this is unset.", + "order": 1 + }, + "table": { + "type": "string", + "title": "Table Name", + "description": "The name of the table to be captured. The query template must be overridden if this is unset.", + "order": 2 }, "cursor": { "items": { @@ -70,20 +75,26 @@ "type": "array", "title": "Cursor Columns", "description": "The names of columns which should be persisted between query executions as a cursor.", - "order": 2 + "order": 3 }, "poll": { "type": "string", "title": "Polling Schedule", "description": "When and how often to execute the fetch query (overrides the connector default setting). Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day.", - "order": 1, + "order": 4, "pattern": "^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$" + }, + "template": { + "type": "string", + "title": "Query Template Override", + "description": "Optionally overrides the query template which will be rendered and then executed. Consult documentation for examples.", + "multiline": true, + "order": 5 } }, "type": "object", "required": [ - "name", - "template" + "name" ], "title": "BigQuery Batch Resource Spec" }, diff --git a/source-bigquery-batch/.snapshots/TestStringTypes-Capture b/source-bigquery-batch/.snapshots/TestStringTypes-Capture new file mode 100644 index 0000000000..4d7d0aceda --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestStringTypes-Capture @@ -0,0 +1,14 @@ +# ================================ +# Collection "acmeCo/test/stringtypes_339419": 6 Documents +# ================================ +{"_meta":{"polled":"","index":999},"id":1,"string_len_val":"","string_val":""} +{"_meta":{"polled":"","index":999},"id":2,"string_len_val":null,"string_val":null} +{"_meta":{"polled":"","index":999},"id":3,"string_len_val":"Unicode: ñ, é, ü","string_val":"Hello, 世界!"} +{"_meta":{"polled":"","index":999},"id":4,"string_len_val":"Path: C:\\Program Files\\","string_val":"He said \"Hello\""} +{"_meta":{"polled":"","index":999},"id":5,"string_len_val":"Col1\tCol2\tCol3","string_val":"Line 1\nLine 2"} +{"_meta":{"polled":"","index":999},"id":6,"string_len_val":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb","string_val":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"stringtypes_339419":{"CursorNames":["id"],"CursorValues":[6],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestStringTypes-Discovery b/source-bigquery-batch/.snapshots/TestStringTypes-Discovery new file mode 100644 index 0000000000..c3e109de52 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestStringTypes-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "stringtypes_339419", + "schema": "testdata", + "table": "stringtypes_339419", + "cursor": [ + "id" + ] + }, + "resource_path": [ + "stringtypes_339419" + ], + "collection": { + "name": "acmeCo/test/stringtypes_339419", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "stringtypes_339419" + } + diff --git a/source-bigquery-batch/.snapshots/TestTemporalTypes-Capture b/source-bigquery-batch/.snapshots/TestTemporalTypes-Capture new file mode 100644 index 0000000000..43d2a09cd3 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestTemporalTypes-Capture @@ -0,0 +1,13 @@ +# ================================ +# Collection "acmeCo/test/temporaltypes_137023": 5 Documents +# ================================ +{"_meta":{"polled":"","index":999},"date_val":"2025-02-18","datetime_val":"2025-02-18T15:04:05.999999","id":1,"time_val":"15:04:05.999999000","timestamp_val":"2025-02-18T15:04:05.999999Z"} +{"_meta":{"polled":"","index":999},"date_val":"0001-01-01","datetime_val":"0001-01-01T00:00:00.000000","id":2,"time_val":"00:00:00","timestamp_val":"0001-01-01T00:00:00Z"} +{"_meta":{"polled":"","index":999},"date_val":null,"datetime_val":null,"id":3,"time_val":null,"timestamp_val":null} +{"_meta":{"polled":"","index":999},"date_val":"9999-12-31","datetime_val":"9999-12-31T23:59:59.999999","id":4,"time_val":"23:59:59.999999000","timestamp_val":"9999-12-31T23:59:59.999999Z"} +{"_meta":{"polled":"","index":999},"date_val":"2025-02-18","datetime_val":"2025-02-18T15:04:05.000000","id":5,"time_val":"15:04:05","timestamp_val":"2025-02-18T23:04:05Z"} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"temporaltypes_137023":{"CursorNames":["id"],"CursorValues":[5],"LastPolled":""}}} + diff --git a/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery b/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery new file mode 100644 index 0000000000..7e67e0b464 --- /dev/null +++ b/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery @@ -0,0 +1,58 @@ +Binding 0: +{ + "resource_config_json": { + "name": "temporaltypes_137023", + "schema": "testdata", + "table": "temporaltypes_137023", + "cursor": [ + "id" + ] + }, + "resource_path": [ + "temporaltypes_137023" + ], + "collection": { + "name": "acmeCo/test/temporaltypes_137023", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-bigquery-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "temporaltypes_137023" + } + diff --git a/source-bigquery-batch/driver.go b/source-bigquery-batch/driver.go index ab7e5e5592..041b34a8cb 100644 --- a/source-bigquery-batch/driver.go +++ b/source-bigquery-batch/driver.go @@ -65,29 +65,36 @@ type BatchSQLDriver struct { Connect func(ctx context.Context, cfg *Config) (*bigquery.Client, error) GenerateResource func(resourceName, schemaName, tableName, tableType string) (*Resource, error) ExcludedSystemSchemas []string + SelectQueryTemplate func(res *Resource) (string, error) } // Resource represents the capture configuration of a single resource binding. type Resource struct { Name string `json:"name" jsonschema:"title=Name,description=The unique name of this resource." jsonschema_extras:"order=0"` - Template string `json:"template" jsonschema:"title=Query Template,description=The query template (pkg.go.dev/text/template) which will be rendered and then executed." jsonschema_extras:"multiline=true,order=3"` - Cursor []string `json:"cursor,omitempty" jsonschema:"title=Cursor Columns,description=The names of columns which should be persisted between query executions as a cursor." jsonschema_extras:"order=2"` - PollSchedule string `json:"poll,omitempty" jsonschema:"title=Polling Schedule,description=When and how often to execute the fetch query (overrides the connector default setting). Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day." jsonschema_extras:"order=1,pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"` + SchemaName string `json:"schema,omitempty" jsonschema:"title=Schema Name,description=The name of the schema in which the captured table lives. The query template must be overridden if this is unset." jsonschema_extras:"order=1"` + TableName string `json:"table,omitempty" jsonschema:"title=Table Name,description=The name of the table to be captured. The query template must be overridden if this is unset." jsonschema_extras:"order=2"` + Cursor []string `json:"cursor,omitempty" jsonschema:"title=Cursor Columns,description=The names of columns which should be persisted between query executions as a cursor." jsonschema_extras:"order=3"` + PollSchedule string `json:"poll,omitempty" jsonschema:"title=Polling Schedule,description=When and how often to execute the fetch query (overrides the connector default setting). Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day." jsonschema_extras:"order=4,pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"` + Template string `json:"template,omitempty" jsonschema:"title=Query Template Override,description=Optionally overrides the query template which will be rendered and then executed. Consult documentation for examples." jsonschema_extras:"multiline=true,order=5"` } // Validate checks that the resource spec possesses all required properties. func (r Resource) Validate() error { var requiredProperties = [][]string{ {"name", r.Name}, - {"template", r.Template}, } for _, req := range requiredProperties { if req[1] == "" { return fmt.Errorf("missing '%s'", req[0]) } } - if _, err := template.New("query").Parse(r.Template); err != nil { - return fmt.Errorf("error parsing template: %w", err) + if r.Template == "" && (r.SchemaName == "" || r.TableName == "") { + return fmt.Errorf("must specify schema+table name or else a template override") + } + if r.Template != "" { + if _, err := template.New("query").Funcs(templateFuncs).Parse(r.Template); err != nil { + return fmt.Errorf("error parsing template: %w", err) + } } if slices.Contains(r.Cursor, "") { return fmt.Errorf("cursor column names can't be empty (got %q)", r.Cursor) @@ -444,7 +451,7 @@ func (drv *BatchSQLDriver) Pull(open *pc.Request_Open, stream *boilerplate.PullO return fmt.Errorf("parsing resource config: %w", err) } bindings = append(bindings, bindingInfo{ - resource: res, + resource: &res, index: idx, stateKey: boilerplate.StateKey(binding.StateKey), }) @@ -467,6 +474,7 @@ func (drv *BatchSQLDriver) Pull(open *pc.Request_Open, stream *boilerplate.PullO } var capture = &capture{ + Driver: drv, Config: &cfg, State: &state, DB: db, @@ -501,6 +509,7 @@ func updateResourceStates(prevState captureState, bindings []bindingInfo) (captu } type capture struct { + Driver *BatchSQLDriver Config *Config State *captureState DB *bigquery.Client @@ -510,7 +519,7 @@ type capture struct { } type bindingInfo struct { - resource Resource + resource *Resource index int stateKey boilerplate.StateKey } @@ -550,7 +559,11 @@ func (c *capture) worker(ctx context.Context, binding *bindingInfo) error { "poll": res.PollSchedule, }).Info("starting worker") - var queryTemplate, err = template.New("query").Parse(res.Template) + templateString, err := c.Driver.SelectQueryTemplate(res) + if err != nil { + return fmt.Errorf("error selecting query template: %w", err) + } + queryTemplate, err := template.New("query").Funcs(templateFuncs).Parse(templateString) if err != nil { return fmt.Errorf("error parsing template: %w", err) } @@ -566,12 +579,6 @@ func (c *capture) worker(ctx context.Context, binding *bindingInfo) error { return ctx.Err() } -func quoteIdentifier(name string) string { - return "`" + strings.ReplaceAll(name, "`", "\\`") + "`" -} - -func quoteColumnName(name string) string { return quoteIdentifier(name) } - func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template.Template) error { var res = binding.resource var stateKey = binding.stateKey @@ -584,11 +591,13 @@ func (c *capture) poll(ctx context.Context, binding *bindingInfo, tmpl *template var quotedCursorNames []string for _, cursorName := range cursorNames { - quotedCursorNames = append(quotedCursorNames, quoteColumnName(cursorName)) + quotedCursorNames = append(quotedCursorNames, quoteIdentifier(cursorName)) } var templateArg = map[string]any{ "IsFirstQuery": len(cursorValues) == 0, "CursorFields": quotedCursorNames, + "SchemaName": res.SchemaName, + "TableName": res.TableName, } // Polling schedule can be configured per binding. If unset, falls back to the diff --git a/source-bigquery-batch/main.go b/source-bigquery-batch/main.go index 333b1fcbaf..f7bc9c52df 100644 --- a/source-bigquery-batch/main.go +++ b/source-bigquery-batch/main.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "strings" + "text/template" "time" "cloud.google.com/go/bigquery" @@ -114,11 +115,17 @@ func executeQuery(ctx context.Context, client *bigquery.Client, query string) er return nil } -const tableQueryTemplateTemplate = `{{/* Default query template which adapts to cursor field selection */}} -{{- if not .CursorFields -}} - SELECT * FROM %[1]s; +func selectQueryTemplate(res *Resource) (string, error) { + if res.Template != "" { + return res.Template, nil + } + return tableQueryTemplate, nil +} + +const tableQueryTemplate = `{{if not .CursorFields -}} + SELECT * FROM {{quoteTableName .SchemaName .TableName}}; {{- else -}} - SELECT * FROM %[1]s + SELECT * FROM {{quoteTableName .SchemaName .TableName}} {{- if not .IsFirstQuery -}} {{- range $i, $k := $.CursorFields -}} {{- if eq $i 0}} WHERE ({{else}}) OR ({{end -}} @@ -132,29 +139,37 @@ const tableQueryTemplateTemplate = `{{/* Default query template which adapts to ORDER BY {{range $i, $k := $.CursorFields}}{{if gt $i 0}}, {{end}}{{$k}}{{end -}}; {{- end}}` +var templateFuncs = template.FuncMap{ + "quoteTableName": quoteTableName, + "quoteIdentifier": quoteIdentifier, +} + func quoteTableName(schema, table string) string { return quoteIdentifier(schema) + "." + quoteIdentifier(table) } +func quoteIdentifier(name string) string { + return "`" + strings.ReplaceAll(name, "`", "\\`") + "`" +} + func generateBigQueryResource(resourceName, schemaName, tableName, tableType string) (*Resource, error) { - var queryTemplate string - if strings.EqualFold(tableType, "BASE TABLE") { - queryTemplate = fmt.Sprintf(tableQueryTemplateTemplate, quoteTableName(schemaName, tableName)) - } else { + if !strings.EqualFold(tableType, "BASE TABLE") { return nil, fmt.Errorf("discovery will not autogenerate resource configs for entities of type %q, but you may add them manually", tableType) } return &Resource{ - Name: resourceName, - Template: queryTemplate, + Name: resourceName, + SchemaName: schemaName, + TableName: tableName, }, nil } var bigqueryDriver = &BatchSQLDriver{ - DocumentationURL: "https://go.estuary.dev/source-bigquery-batch", - ConfigSchema: generateConfigSchema(), - Connect: connectBigQuery, - GenerateResource: generateBigQueryResource, + DocumentationURL: "https://go.estuary.dev/source-bigquery-batch", + ConfigSchema: generateConfigSchema(), + Connect: connectBigQuery, + GenerateResource: generateBigQueryResource, + SelectQueryTemplate: selectQueryTemplate, ExcludedSystemSchemas: []string{ "information_schema", }, diff --git a/source-bigquery-batch/main_test.go b/source-bigquery-batch/main_test.go index 2f4b10e5bd..15dcf887f4 100644 --- a/source-bigquery-batch/main_test.go +++ b/source-bigquery-batch/main_test.go @@ -3,25 +3,29 @@ package main import ( "bytes" "context" + "crypto/sha256" + "encoding/binary" "encoding/json" "flag" "fmt" "io" + "math/big" "os" "regexp" "strings" - "sync/atomic" "testing" "text/template" "time" "cloud.google.com/go/bigquery" + "cloud.google.com/go/civil" "github.com/bradleyjkemp/cupaloy" st "github.com/estuary/connectors/source-boilerplate/testing" pc "github.com/estuary/flow/go/protocols/capture" pf "github.com/estuary/flow/go/protocols/flow" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "google.golang.org/api/option" ) @@ -53,133 +57,6 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func TestSpec(t *testing.T) { - response, err := bigqueryDriver.Spec(context.Background(), &pc.Request_Spec{}) - require.NoError(t, err) - - formatted, err := json.MarshalIndent(response, "", " ") - require.NoError(t, err) - cupaloy.SnapshotT(t, string(formatted)) -} - -func TestQueryTemplate(t *testing.T) { - res, err := bigqueryDriver.GenerateResource("foobar", "testdata", "foobar", "BASE TABLE") - require.NoError(t, err) - - tmpl, err := template.New("query").Parse(res.Template) - require.NoError(t, err) - - for _, tc := range []struct { - Name string - IsFirst bool - Cursor []string - }{ - {Name: "FirstNoCursor", IsFirst: true, Cursor: nil}, - {Name: "SubsequentNoCursor", IsFirst: false, Cursor: nil}, - {Name: "FirstOneCursor", IsFirst: true, Cursor: []string{"`ka`"}}, - {Name: "SubsequentOneCursor", IsFirst: false, Cursor: []string{"`ka`"}}, - {Name: "FirstTwoCursor", IsFirst: true, Cursor: []string{"`ka`", "`kb`"}}, - {Name: "SubsequentTwoCursor", IsFirst: false, Cursor: []string{"`ka`", "`kb`"}}, - {Name: "FirstThreeCursor", IsFirst: true, Cursor: []string{"`ka`", "`kb`", "`kc`"}}, - {Name: "SubsequentThreeCursor", IsFirst: false, Cursor: []string{"`ka`", "`kb`", "`kc`"}}, - } { - t.Run(tc.Name, func(t *testing.T) { - var buf = new(strings.Builder) - require.NoError(t, tmpl.Execute(buf, map[string]any{ - "IsFirstQuery": tc.IsFirst, - "CursorFields": tc.Cursor, - })) - cupaloy.SnapshotT(t, buf.String()) - }) - } -} - -func TestBasicCapture(t *testing.T) { - var ctx, cs = context.Background(), testCaptureSpec(t) - var client = testBigQueryClient(ctx, t) - var uniqueID = "826935" - var tableName = fmt.Sprintf("testdata.basic_capture_%s", uniqueID) - - createTestTable(ctx, t, client, tableName, "(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING)") - cs.Bindings = discoverStreams(ctx, t, cs, regexp.MustCompile(uniqueID)) - - t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) - - setCursorColumns(t, cs.Bindings[0], "id") - - t.Run("Capture", func(t *testing.T) { - // Spawn a worker thread which will insert 10 rows of data as distinct inserts. - // BigQuery insert latency is highly variable on test time-scales, so we signal - // when the insertions are all done so the capturing code knows when to stop. - var insertsDone atomic.Bool - go func() { - for i := 0; i < 10; i++ { - executeSetupQuery(ctx, t, client, fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1)", tableName), i, fmt.Sprintf("Value for row %d", i)) - log.WithField("i", i).Debug("inserted row") - } - time.Sleep(5 * time.Second) - insertsDone.Store(true) - }() - - // Run the capture over and over for 5 seconds each time until all inserts have finished. - for !insertsDone.Load() { - var captureCtx, cancelCapture = context.WithCancel(ctx) - time.AfterFunc(5*time.Second, func() { - cancelCapture() - }) - cs.Capture(captureCtx, t, nil) - } - cupaloy.SnapshotT(t, cs.Summary()) - }) -} - -func TestDatetimeCursor(t *testing.T) { - var ctx, cs = context.Background(), testCaptureSpec(t) - var client = testBigQueryClient(ctx, t) - var uniqueID = "132448" - var tableName = fmt.Sprintf("testdata.datetime_cursor_%s", uniqueID) - - createTestTable(ctx, t, client, tableName, "(id DATETIME, data STRING)") - for _, x := range []string{"2023-08-10T07:54:54.123", "2024-10-23T03:22:31.456", "2024-10-23T03:23:00.789"} { - executeSetupQuery(ctx, t, client, fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1)", tableName), x, fmt.Sprintf("Value for row %q", x)) - } - - cs.Bindings = discoverStreams(ctx, t, cs, regexp.MustCompile(uniqueID)) - - t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) - - setShutdownAfterQuery(t, true) - setCursorColumns(t, cs.Bindings[0], "id") - setQueryLimit(t, cs.Bindings[0], 1) - - t.Run("Capture", func(t *testing.T) { - var deadline = time.Now().Add(10 * time.Second) - for time.Now().Before(deadline) { - cs.Capture(ctx, t, nil) - } - cupaloy.SnapshotT(t, cs.Summary()) - }) -} - -func testBigQueryClient(ctx context.Context, t testing.TB) *bigquery.Client { - t.Helper() - if os.Getenv("TEST_DATABASE") != "yes" { - t.Skipf("skipping %q capture: ${TEST_DATABASE} != \"yes\"", t.Name()) - } - - var credentialsPath = strings.ReplaceAll(*testCredentialsPath, "~", os.Getenv("HOME")) - credentialsJSON, err := os.ReadFile(credentialsPath) - require.NoError(t, err) - - var clientOpts = []option.ClientOption{ - option.WithCredentialsJSON([]byte(credentialsJSON)), - } - client, err := bigquery.NewClient(ctx, *projectID, clientOpts...) - require.NoError(t, err) - t.Cleanup(func() { client.Close() }) - return client -} - func testCaptureSpec(t testing.TB) *st.CaptureSpec { t.Helper() if os.Getenv("TEST_DATABASE") != "yes" { @@ -213,7 +90,7 @@ func testCaptureSpec(t testing.TB) *st.CaptureSpec { } } -func discoverStreams(ctx context.Context, t testing.TB, cs *st.CaptureSpec, matchers ...*regexp.Regexp) []*pf.CaptureSpec_Binding { +func discoverBindings(ctx context.Context, t testing.TB, cs *st.CaptureSpec, matchers ...*regexp.Regexp) []*pf.CaptureSpec_Binding { t.Helper() var discovery = cs.Discover(ctx, t, matchers...) @@ -235,30 +112,50 @@ func discoverStreams(ctx context.Context, t testing.TB, cs *st.CaptureSpec, matc return bindings } -func executeSetupQuery(ctx context.Context, t testing.TB, client *bigquery.Client, query string, args ...interface{}) { +func testBigQueryClient(t testing.TB) *bigquery.Client { t.Helper() - log.WithFields(log.Fields{"query": query, "args": args}).Debug("executing setup query") - var q = client.Query(query) - var params []bigquery.QueryParameter - for idx, val := range args { - params = append(params, bigquery.QueryParameter{ - Name: fmt.Sprintf("p%d", idx), - Value: val, - }) + if os.Getenv("TEST_DATABASE") != "yes" { + t.Skipf("skipping %q capture: ${TEST_DATABASE} != \"yes\"", t.Name()) } - q.Parameters = params - var job, err = q.Run(ctx) + + var credentialsPath = strings.ReplaceAll(*testCredentialsPath, "~", os.Getenv("HOME")) + credentialsJSON, err := os.ReadFile(credentialsPath) require.NoError(t, err) - status, err := job.Wait(ctx) + + var clientOpts = []option.ClientOption{ + option.WithCredentialsJSON([]byte(credentialsJSON)), + } + client, err := bigquery.NewClient(context.Background(), *projectID, clientOpts...) require.NoError(t, err) - require.NoError(t, status.Err()) + t.Cleanup(func() { client.Close() }) + return client +} + +func testTableName(t *testing.T, uniqueID string) (name, id string) { + t.Helper() + const testSchemaName = "testdata" + var baseName = strings.ToLower(strings.TrimPrefix(t.Name(), "Test")) + for _, str := range []string{"/", "=", "(", ")"} { + baseName = strings.ReplaceAll(baseName, str, "_") + } + return fmt.Sprintf("%s.%s_%s", testSchemaName, baseName, uniqueID), uniqueID } func createTestTable(ctx context.Context, t testing.TB, client *bigquery.Client, tableName, tableDef string) { t.Helper() - executeSetupQuery(ctx, t, client, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) - t.Cleanup(func() { executeSetupQuery(ctx, t, client, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) }) - executeSetupQuery(ctx, t, client, fmt.Sprintf("CREATE TABLE %s%s", tableName, tableDef)) + require.NoError(t, executeSetupQuery(ctx, t, client, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))) + require.NoError(t, executeSetupQuery(ctx, t, client, fmt.Sprintf("CREATE TABLE %s%s", tableName, tableDef))) + // A note on table cleanup: + // + // Typically in SQL database tests we drop tables at the end of each test. But we + // can't reliably do that in BigQuery. + // + // BigQuery has a rate limit of 5 table update operations per 10 seconds per table. + // DML operations (such as test data inserts) count against this limit but aren't + // subject to it (weird policy, but okay), which means that in general we can't + // reliably drop a table immediately after running a test. So we don't bother, + // because we _can_ reliably drop+recreate it at the start of a test, and that + // is sufficient for our purposes. } func summarizeBindings(t testing.TB, bindings []*pf.CaptureSpec_Binding) string { @@ -277,6 +174,47 @@ func summarizeBindings(t testing.TB, bindings []*pf.CaptureSpec_Binding) string return summary.String() } +func executeSetupQuery(ctx context.Context, t testing.TB, client *bigquery.Client, query string, args ...interface{}) error { + t.Helper() + log.WithFields(log.Fields{"query": query, "args": args}).Debug("executing setup query") + var q = client.Query(query) + var params []bigquery.QueryParameter + for idx, val := range args { + params = append(params, bigquery.QueryParameter{ + Name: fmt.Sprintf("p%d", idx), + Value: val, + }) + } + q.Parameters = params + var job, err = q.Run(ctx) + if err != nil { + return err + } + status, err := job.Wait(ctx) + if err != nil { + return err + } + return status.Err() +} + +func parallelSetupQueries(ctx context.Context, t testing.TB, client *bigquery.Client, query string, args [][]any) error { + t.Helper() + var eg, setupCtx = errgroup.WithContext(ctx) + for idx := range args { + var idx = idx + eg.Go(func() error { + return executeSetupQuery(setupCtx, t, client, query, args[idx]...) + }) + } + return eg.Wait() +} + +func setShutdownAfterQuery(t testing.TB, setting bool) { + var oldSetting = TestShutdownAfterQuery + TestShutdownAfterQuery = setting + t.Cleanup(func() { TestShutdownAfterQuery = oldSetting }) +} + func setCursorColumns(t testing.TB, binding *pf.CaptureSpec_Binding, cursor ...string) { var res Resource require.NoError(t, json.Unmarshal(binding.ResourceConfigJson, &res)) @@ -295,8 +233,791 @@ func setQueryLimit(t testing.TB, binding *pf.CaptureSpec_Binding, limit int) { binding.ResourceConfigJson = resourceConfigBytes } -func setShutdownAfterQuery(t testing.TB, setting bool) { - var oldSetting = TestShutdownAfterQuery - TestShutdownAfterQuery = setting - t.Cleanup(func() { TestShutdownAfterQuery = oldSetting }) +func uniqueTableID(t testing.TB, extra ...string) string { + t.Helper() + var h = sha256.New() + h.Write([]byte(t.Name())) + for _, x := range extra { + h.Write([]byte{':'}) + h.Write([]byte(x)) + } + var x = binary.BigEndian.Uint32(h.Sum(nil)[0:4]) + return fmt.Sprintf("%d", (x%900000)+100000) +} + +// TestSpec verifies the connector's response to the Spec RPC against a snapshot. +func TestSpec(t *testing.T) { + response, err := bigqueryDriver.Spec(context.Background(), &pc.Request_Spec{}) + require.NoError(t, err) + + formatted, err := json.MarshalIndent(response, "", " ") + require.NoError(t, err) + cupaloy.SnapshotT(t, string(formatted)) +} + +// TestQueryTemplate is a unit test which verifies that the default query template produces +// the expected output for initial/subsequent polling queries with different cursors. +func TestQueryTemplate(t *testing.T) { + res, err := bigqueryDriver.GenerateResource("foobar", "testdata", "foobar", "BASE TABLE") + require.NoError(t, err) + + tmplString, err := bigqueryDriver.SelectQueryTemplate(res) + require.NoError(t, err) + + tmpl, err := template.New("query").Funcs(templateFuncs).Parse(tmplString) + require.NoError(t, err) + + for _, tc := range []struct { + Name string + IsFirst bool + Cursor []string + }{ + {Name: "FirstNoCursor", IsFirst: true, Cursor: nil}, + {Name: "SubsequentNoCursor", IsFirst: false, Cursor: nil}, + {Name: "FirstOneCursor", IsFirst: true, Cursor: []string{"`ka`"}}, + {Name: "SubsequentOneCursor", IsFirst: false, Cursor: []string{"`ka`"}}, + {Name: "FirstTwoCursor", IsFirst: true, Cursor: []string{"`ka`", "`kb`"}}, + {Name: "SubsequentTwoCursor", IsFirst: false, Cursor: []string{"`ka`", "`kb`"}}, + {Name: "FirstThreeCursor", IsFirst: true, Cursor: []string{"`ka`", "`kb`", "`kc`"}}, + {Name: "SubsequentThreeCursor", IsFirst: false, Cursor: []string{"`ka`", "`kb`", "`kc`"}}, + } { + t.Run(tc.Name, func(t *testing.T) { + var buf = new(strings.Builder) + require.NoError(t, tmpl.Execute(buf, map[string]any{ + "IsFirstQuery": tc.IsFirst, + "CursorFields": tc.Cursor, + "SchemaName": res.SchemaName, + "TableName": res.TableName, + })) + cupaloy.SnapshotT(t, buf.String()) + }) + } +} + +// TestSimpleCapture exercises the simplest use-case of a capture first doing an initial +// backfill and subsequently capturing new rows using ["id"] as the cursor. +func TestSimpleCapture(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING)") + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "id") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + require.NoError(t, parallelSetupQueries(ctx, t, control, fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1)", tableName), [][]any{ + {1, "Value for row 1"}, {2, "Value for row 2"}, + {3, "Value for row 3"}, {4, "Value for row 4"}, + })) + cs.Capture(ctx, t, nil) + + require.NoError(t, parallelSetupQueries(ctx, t, control, fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1)", tableName), [][]any{ + {5, "Value for row 5"}, {6, "Value for row 6"}, + {7, "Value for row 7"}, {8, "Value for row 8"}, + })) + cs.Capture(ctx, t, nil) + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestIntegerTypes exercises discovery and capture of the integer types +// INT, SMALLINT, INTEGER, BIGINT, TINYINT, and BYTEINT. In BigQuery these +// types are all aliases for each other, but it's worth being thorough. +func TestIntegerTypes(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, `( + id INTEGER PRIMARY KEY NOT ENFORCED, + int_val INT, + smallint_val SMALLINT, + integer_val INTEGER, + bigint_val BIGINT, + tinyint_val TINYINT, + byteint_val BYTEINT + )`) + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "id") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2, @p3, @p4, @p5, @p6)", tableName), + [][]any{ + // Test minimum values + {1, -9223372036854775808, -9223372036854775808, -9223372036854775808, + -9223372036854775808, -9223372036854775808, -9223372036854775808}, + // Test maximum values + {2, 9223372036854775807, 9223372036854775807, 9223372036854775807, + 9223372036854775807, 9223372036854775807, 9223372036854775807}, + // Test zero values + {3, 0, 0, 0, 0, 0, 0}, + // Test NULL values + {4, bigquery.NullInt64{}, bigquery.NullInt64{}, bigquery.NullInt64{}, bigquery.NullInt64{}, bigquery.NullInt64{}, bigquery.NullInt64{}}, + // Test some typical values + {5, 42, 123, -456, 789, -12, 34}, + })) + cs.Capture(ctx, t, nil) + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestNumericTypes exercises discovery and capture of the numeric types +// numeric types NUMERIC, DECIMAL, BIGNUMERIC, BIGDECIMAL, and FLOAT64. +func TestNumericTypes(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, `( + id INTEGER PRIMARY KEY NOT ENFORCED, + numeric_val NUMERIC, + decimal_val DECIMAL, + bignumeric_val BIGNUMERIC, + bigdecimal_val BIGDECIMAL, + float64_val FLOAT64 + )`) + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "id") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + // Helper function to create *big.Rat values + mustRat := func(s string) *big.Rat { + r, ok := new(big.Rat).SetString(s) + require.True(t, ok, "Failed to parse rational number: %s", s) + return r + } + + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2, @p3, @p4, @p5)", tableName), + [][]any{ + // Test maximum precision values for NUMERIC/DECIMAL (38 digits, 9 decimal places). + // There is no convenient way to represent the maximum possible BIGNUMERIC value via + // the particular way we're feeding in query parameters, so we just use a NUMERIC input. + {1, mustRat("99999999999999999999999999999.999999999"), + mustRat("99999999999999999999999999999.999999999"), + mustRat("99999999999999999999999999999.999999999"), + mustRat("99999999999999999999999999999.999999999"), + 1.7976931348623157e+308}, + // Test minimum values + {2, mustRat("-99999999999999999999999999999.999999999"), + mustRat("-99999999999999999999999999999.999999999"), + mustRat("-99999999999999999999999999999.999999999"), + mustRat("-99999999999999999999999999999.999999999"), + -1.7976931348623157e+308}, + // Test zero values + {3, new(big.Rat), new(big.Rat), new(big.Rat), new(big.Rat), 0.0}, + // Test some typical values with decimals + {4, mustRat("123.456"), mustRat("789.012"), + mustRat("1234.5678"), mustRat("9012.3456"), + 3.14159}, + // Test some whole numbers + {5, mustRat("42"), mustRat("100"), + mustRat("1000"), mustRat("10000"), + 12345.0}, + })) + cs.Capture(ctx, t, nil) + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestStringTypes exercises discovery and capture of the string types +// STRING and STRING(L). +func TestStringTypes(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, `( + id INTEGER PRIMARY KEY NOT ENFORCED, + string_val STRING, + string_len_val STRING(50) + )`) + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "id") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + // Test empty strings + {1, "", ""}, + // Test NULL values + {2, bigquery.NullString{}, bigquery.NullString{}}, + // Test strings with special characters + {3, "Hello, 世界!", "Unicode: ñ, é, ü"}, + // Test strings with quotes and backslashes + {4, "He said \"Hello\"", "Path: C:\\Program Files\\"}, + // Test strings with newlines and tabs + {5, "Line 1\nLine 2", "Col1\tCol2\tCol3"}, + // Test long strings + {6, strings.Repeat("a", 100), strings.Repeat("b", 50)}, + })) + cs.Capture(ctx, t, nil) + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestBinaryTypes exercises discovery and capture of the binary types +// BOOLEAN and BYTES. +func TestBinaryTypes(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, `( + id INTEGER PRIMARY KEY NOT ENFORCED, + bool_val BOOLEAN, + bytes_val BYTES + )`) + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "id") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + // Test basic boolean values + {1, true, []byte("hello world")}, + {2, false, []byte{0x00, 0x01, 0x02, 0x03}}, + // Test NULL values + {3, bigquery.NullBool{}, []byte(nil)}, + // Test empty bytes + {4, true, []byte{}}, + // Test larger byte array + {5, false, bytes.Repeat([]byte{0xAA}, 100)}, + // Test UTF-8 encoded bytes + {6, true, []byte("Hello, 世界!")}, + })) + cs.Capture(ctx, t, nil) + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestTemporalTypes exercises discovery and capture of the temporal types +// DATE, DATETIME, TIME, and TIMESTAMP. +func TestTemporalTypes(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, `( + id INTEGER PRIMARY KEY NOT ENFORCED, + date_val DATE, + datetime_val DATETIME, + time_val TIME, + timestamp_val TIMESTAMP + )`) + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "id") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2, @p3, @p4)", tableName), + [][]any{ + // Test current time values + {1, civil.Date{Year: 2025, Month: 2, Day: 18}, + "2025-02-18 15:04:05.999999", + civil.TimeOf(time.Date(0, 0, 0, 15, 4, 5, 999999000, time.UTC)), + time.Date(2025, 2, 18, 15, 4, 5, 999999000, time.UTC)}, + // Test minimum values (year 1) + {2, civil.Date{Year: 1, Month: 1, Day: 1}, + "0001-01-01 00:00:00.000000", + civil.TimeOf(time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC)), + time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)}, + // Test NULL values + {3, bigquery.NullDate{}, bigquery.NullDateTime{}, + bigquery.NullTime{}, bigquery.NullTimestamp{}}, + // Test maximum values (year 9999) + {4, civil.Date{Year: 9999, Month: 12, Day: 31}, + "9999-12-31 23:59:59.999999", + civil.TimeOf(time.Date(0, 0, 0, 23, 59, 59, 999999000, time.UTC)), + time.Date(9999, 12, 31, 23, 59, 59, 999999000, time.UTC)}, + // Test timezone handling + {5, civil.Date{Year: 2025, Month: 2, Day: 18}, + "2025-02-18 15:04:05.000000", + civil.TimeOf(time.Date(0, 0, 0, 15, 4, 5, 0, time.UTC)), + time.Date(2025, 2, 18, 15, 4, 5, 0, time.FixedZone("PST", -8*3600))}, + })) + cs.Capture(ctx, t, nil) + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestCompositeTypes exercises discovery and capture of the composite types +// ARRAY and STRUCT. +func TestCompositeTypes(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, `( + id INTEGER PRIMARY KEY NOT ENFORCED, + string_array ARRAY, + int_array ARRAY, + struct_val STRUCT< + name STRING, + age INT64, + scores ARRAY + >, + array_struct ARRAY> + )`) + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "id") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + type person struct { + Name string + Age int64 + Scores []float64 + } + type label struct { + ID int64 + Label string + } + + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2, @p3, @p4)", tableName), + [][]any{ + // Test basic array and struct values + {1, []string{"a", "b", "c"}, + []int64{1, 2, 3}, + &person{"Alice", 25, []float64{90.5, 85.0, 92.3}}, + []label{{1, "first"}, {2, "second"}}, + }, + // Test empty arrays + {2, []string{}, + []int64{}, + &person{"Bob", 30, []float64{}}, + []label{}, + }, + // Test NULL values + {3, []string(nil), []int64(nil), (*person)(nil), []label(nil)}, + })) + cs.Capture(ctx, t, nil) + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestJSONType exercises discovery and capture of the JSON column type. +func TestJSONType(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, `( + id INTEGER PRIMARY KEY NOT ENFORCED, + json_val JSON + )`) + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "id") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1)", tableName), + [][]any{ + // Test simple JSON object + {1, bigquery.NullJSON{JSONVal: `{"name": "Alice", "age": 30}`, Valid: true}}, + // Test JSON array + {2, bigquery.NullJSON{JSONVal: `[1, 2, 3, "four", true, null]`, Valid: true}}, + // Test nested JSON + {3, bigquery.NullJSON{JSONVal: `{ + "user": {"name": "Bob", "email": "bob@example.com"}, + "orders": [ + {"id": 1, "items": ["apple", "banana"]}, + {"id": 2, "items": ["orange"]} + ] + }`, Valid: true}}, + // Test SQL NULL and JSON null + {4, bigquery.NullJSON{}}, + {5, bigquery.NullJSON{JSONVal: `null`, Valid: true}}, + // Test empty JSON object and array + {6, bigquery.NullJSON{JSONVal: `{}`, Valid: true}}, + {7, bigquery.NullJSON{JSONVal: `[]`, Valid: true}}, + // Test JSON with special characters + {8, bigquery.NullJSON{JSONVal: `{"message": "Hello, 世界!\nNew line\"Quotes\"\\Backslash"}`, Valid: true}}, + })) + cs.Capture(ctx, t, nil) + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestFullRefresh exercises the scenario of a table without a configured cursor, +// which causes the capture to perform a full refresh every time it runs. +func TestFullRefresh(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING)") + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + setShutdownAfterQuery(t, true) + + // Sorting is required for test stability because full-refresh captures are unordered. + // The multiset of row-captures will be the same across all runs, so this works. + cs.Validator = &st.SortedCaptureValidator{} + + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1)", tableName), [][]any{ + {0, "Value for row 0"}, {1, "Value for row 1"}, + {2, "Value for row 2"}, {3, "Value for row 3"}, + })) + cs.Capture(ctx, t, nil) + t.Run("Capture1", func(t *testing.T) { cupaloy.SnapshotT(t, cs.Summary()); cs.Reset() }) + + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1)", tableName), [][]any{ + {4, "Value for row 4"}, {5, "Value for row 5"}, + {6, "Value for row 6"}, {7, "Value for row 7"}, + })) + cs.Capture(ctx, t, nil) + t.Run("Capture2", func(t *testing.T) { cupaloy.SnapshotT(t, cs.Summary()); cs.Reset() }) +} + +// TestCaptureWithUpdatedAtCursor exercises the use-case of a capture using a +// non-primary-key updated_at column (of type TIMESTAMP) as the cursor. +func TestCaptureWithUpdatedAtCursor(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING, updated_at TIMESTAMP)") + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "updated_at") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + baseTime := time.Date(2025, 2, 13, 12, 0, 0, 0, time.UTC) + + // Initial set of rows with sequential timestamps + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {0, "Value for row 0", baseTime.Add(0 * time.Minute)}, + {1, "Value for row 1", baseTime.Add(1 * time.Minute)}, + {2, "Value for row 2", baseTime.Add(2 * time.Minute)}, + {3, "Value for row 3", baseTime.Add(3 * time.Minute)}, + {4, "Value for row 4", baseTime.Add(4 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + // Second set of rows with later timestamps + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {5, "Value for row 5", baseTime.Add(10 * time.Minute)}, + {6, "Value for row 6", baseTime.Add(11 * time.Minute)}, + {7, "Value for row 7", baseTime.Add(12 * time.Minute)}, + {8, "Value for row 8", baseTime.Add(13 * time.Minute)}, + {9, "Value for row 9", baseTime.Add(14 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestCaptureWithDatetimeCursor exercises the use-case of a capture using a +// DATETIME updated_at column as the cursor. +func TestCaptureWithDatetimeCursor(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING, updated_at DATETIME)") + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "updated_at") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + // Initial set of rows with sequential timestamps + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {0, "Value for row 0", "2025-02-13 12:00:00.000000"}, + {1, "Value for row 1", "2025-02-13 12:01:00.000000"}, + {2, "Value for row 2", "2025-02-13 12:02:00.000000"}, + {3, "Value for row 3", "2025-02-13 12:03:00.000000"}, + {4, "Value for row 4", "2025-02-13 12:04:00.000000"}, + })) + cs.Capture(ctx, t, nil) + + // Second set of rows with later timestamps + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {5, "Value for row 5", "2025-02-13 12:10:00.000000"}, + {6, "Value for row 6", "2025-02-13 12:11:00.000000"}, + {7, "Value for row 7", "2025-02-13 12:12:00.000000"}, + {8, "Value for row 8", "2025-02-13 12:13:00.000000"}, + {9, "Value for row 9", "2025-02-13 12:14:00.000000"}, + })) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestCaptureWithTwoColumnCursor exercises the use-case of a capture using a +// multiple-column compound cursor. +func TestCaptureWithTwoColumnCursor(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, `( + id INTEGER PRIMARY KEY NOT ENFORCED, + col1 INTEGER, + col2 INTEGER, + data STRING + )`) + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "col1", "col2") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + + // First batch of rows with ascending values in both cursor columns + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2, @p3)", tableName), + [][]any{ + {0, 1, 1, "Value for row 0"}, + {1, 1, 2, "Value for row 1"}, + {2, 1, 3, "Value for row 2"}, + {3, 2, 1, "Value for row 3"}, + {4, 2, 2, "Value for row 4"}, + {5, 2, 3, "Value for row 5"}, + })) + cs.Capture(ctx, t, nil) + + // Second batch testing cursor behavior + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2, @p3)", tableName), + [][]any{ + {6, 0, 9, "Value ignored because col1 is too small"}, + {7, 2, 0, "Value ignored because col2 is too small"}, + {8, 3, 1, "Value for row 8"}, + {9, 3, 2, "Value for row 9"}, + {10, 3, 3, "Value for row 10"}, + {11, 4, 1, "Value for row 11"}, + {12, 4, 2, "Value for row 12"}, + {13, 4, 3, "Value for row 13"}, + })) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestCaptureWithModifications exercises the use-case of a capture using an updated_at +// cursor where some rows are modified and deleted between captures. +func TestCaptureWithModifications(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING, updated_at TIMESTAMP)") + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "updated_at") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + baseTime := time.Date(2025, 2, 13, 12, 0, 0, 0, time.UTC) + + // Initial set of rows with sequential timestamps + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {0, "Initial value for row 0", baseTime.Add(0 * time.Minute)}, + {1, "Initial value for row 1", baseTime.Add(1 * time.Minute)}, + {2, "Initial value for row 2", baseTime.Add(2 * time.Minute)}, + {3, "Initial value for row 3", baseTime.Add(3 * time.Minute)}, + {4, "Initial value for row 4", baseTime.Add(4 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + // Update and delete some rows, as well as inserting new ones + require.NoError(t, executeSetupQuery(ctx, t, control, + fmt.Sprintf("UPDATE %s SET data = @p0, updated_at = @p1 WHERE id = @p2", tableName), + "Modified value for row 3", baseTime.Add(15*time.Minute), 3)) + require.NoError(t, executeSetupQuery(ctx, t, control, + fmt.Sprintf("UPDATE %s SET data = @p0, updated_at = @p1 WHERE id = @p2", tableName), + "Modified value for row 4", baseTime.Add(16*time.Minute), 4)) + require.NoError(t, executeSetupQuery(ctx, t, control, + fmt.Sprintf("DELETE FROM %s WHERE id IN (1, 2)", tableName))) + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {5, "Value for row 5", baseTime.Add(20 * time.Minute)}, + {6, "Value for row 6", baseTime.Add(21 * time.Minute)}, + {7, "Value for row 7", baseTime.Add(22 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestCaptureWithEmptyPoll exercises the scenario where a polling interval finds no new rows. +func TestCaptureWithEmptyPoll(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING, updated_at TIMESTAMP)") + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "updated_at") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + baseTime := time.Date(2025, 2, 13, 12, 0, 0, 0, time.UTC) + + // First batch of rows + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {0, "Value for row 0", baseTime.Add(0 * time.Minute)}, + {1, "Value for row 1", baseTime.Add(1 * time.Minute)}, + {2, "Value for row 2", baseTime.Add(2 * time.Minute)}, + {3, "Value for row 3", baseTime.Add(3 * time.Minute)}, + {4, "Value for row 4", baseTime.Add(4 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + // No changes + cs.Capture(ctx, t, nil) + + // Second batch of rows with later timestamps + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {5, "Value for row 5", baseTime.Add(10 * time.Minute)}, + {6, "Value for row 6", baseTime.Add(11 * time.Minute)}, + {7, "Value for row 7", baseTime.Add(12 * time.Minute)}, + {8, "Value for row 8", baseTime.Add(13 * time.Minute)}, + {9, "Value for row 9", baseTime.Add(14 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) +} + +// TestCaptureWithNullCursor exercises the handling of NULL values in cursor columns. +func TestCaptureWithNullCursor(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING, sort_col INTEGER)") + + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + setCursorColumns(t, cs.Bindings[0], "sort_col") + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + setShutdownAfterQuery(t, true) + + // Sorting is required for test stability because null-cursored rows are unordered. + cs.Validator = &st.SortedCaptureValidator{} + + // First batch with mix of NULL and non-NULL cursor values + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {0, "Value with NULL cursor", bigquery.NullInt64{}}, + {1, "Value with cursor 10", 10}, + {2, "Another NULL cursor", bigquery.NullInt64{}}, + {3, "Value with cursor 20", 20}, + {4, "Third NULL cursor", bigquery.NullInt64{}}, + })) + cs.Capture(ctx, t, nil) + t.Run("Capture1", func(t *testing.T) { cupaloy.SnapshotT(t, cs.Summary()); cs.Reset() }) + + // Second batch testing NULL handling after initial cursor + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {5, "Late NULL cursor", bigquery.NullInt64{}}, // Will not be captured + {6, "Value with cursor 15", 15}, // Will not be captured (cursor 20 is already seen) + {7, "Value with cursor 25", 25}, + {8, "Another late NULL", bigquery.NullInt64{}}, // Will not be captured + {9, "Final value cursor 30", 30}, + })) + cs.Capture(ctx, t, nil) + t.Run("Capture2", func(t *testing.T) { cupaloy.SnapshotT(t, cs.Summary()); cs.Reset() }) +} + +// TestQueryTemplateOverride exercises a capture configured with an explicit query template +// in the resource spec rather than a blank template and specified table name/schema. +// +// This is the behavior of preexisting bindings which were created before the table/schema +// change in February 2025. +func TestQueryTemplateOverride(t *testing.T) { + var ctx, cs, control = context.Background(), testCaptureSpec(t), testBigQueryClient(t) + var tableName, uniqueID = testTableName(t, uniqueTableID(t)) + createTestTable(ctx, t, control, tableName, "(id INTEGER PRIMARY KEY NOT ENFORCED, data STRING, updated_at TIMESTAMP)") + + // Create a binding with a query template override instead of table/schema + var res = Resource{ + Name: "query_template_override", + Template: fmt.Sprintf(`SELECT * FROM %[1]s {{if not .IsFirstQuery}} WHERE updated_at > @p0 {{end}} ORDER BY updated_at`, tableName), + Cursor: []string{"updated_at"}, + } + bs, err := json.Marshal(res) + require.NoError(t, err) + cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(uniqueID)) + cs.Bindings[0].ResourceConfigJson = bs + + t.Run("Discovery", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) }) + + t.Run("Capture", func(t *testing.T) { + setShutdownAfterQuery(t, true) + baseTime := time.Date(2025, 2, 13, 12, 0, 0, 0, time.UTC) + + // Initial rows + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {0, "Value for row 0", baseTime.Add(0 * time.Minute)}, + {1, "Value for row 1", baseTime.Add(1 * time.Minute)}, + {2, "Value for row 2", baseTime.Add(2 * time.Minute)}, + {3, "Value for row 3", baseTime.Add(3 * time.Minute)}, + {4, "Value for row 4", baseTime.Add(4 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + // More rows with later timestamps + require.NoError(t, parallelSetupQueries(ctx, t, control, + fmt.Sprintf("INSERT INTO %s VALUES (@p0, @p1, @p2)", tableName), + [][]any{ + {5, "Value for row 5", baseTime.Add(10 * time.Minute)}, + {6, "Value for row 6", baseTime.Add(11 * time.Minute)}, + {7, "Value for row 7", baseTime.Add(12 * time.Minute)}, + {8, "Value for row 8", baseTime.Add(13 * time.Minute)}, + {9, "Value for row 9", baseTime.Add(14 * time.Minute)}, + })) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) + }) }