diff --git a/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery b/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery index 9eb2219bf..51c2c75a7 100644 --- a/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery index 27fd4ce4c..934873c25 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery +++ b/source-bigquery-batch/.snapshots/TestCaptureWithDatetimeCursor-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery index 84c269948..5d7192104 100644 --- a/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery +++ b/source-bigquery-batch/.snapshots/TestCaptureWithUpdatedAtCursor-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery b/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery index 68c6a0cee..ceba593a6 100644 --- a/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestCompositeTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery b/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery index aeeeea119..7091ad1cb 100644 --- a/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery +++ b/source-bigquery-batch/.snapshots/TestFullRefresh-Discovery @@ -52,8 +52,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery b/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery index 541d173ac..17db01e69 100644 --- a/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestJSONType-Discovery b/source-bigquery-batch/.snapshots/TestJSONType-Discovery index b58b294da..939897b75 100644 --- a/source-bigquery-batch/.snapshots/TestJSONType-Discovery +++ b/source-bigquery-batch/.snapshots/TestJSONType-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery b/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery index 29a335b33..77e692a05 100644 --- a/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestNumericTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery b/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery index 031fda8d3..aa87744be 100644 --- a/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery +++ b/source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestStringTypes-Discovery b/source-bigquery-batch/.snapshots/TestStringTypes-Discovery index feebe01d7..8218205ba 100644 --- a/source-bigquery-batch/.snapshots/TestStringTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestStringTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery b/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery index 64a562d56..69d5b8832 100644 --- a/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery +++ b/source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery @@ -55,8 +55,7 @@ Binding 0: "type": "object", "required": [ "polled", - "index", - "row_id" + "index" ] }, "id": { diff --git a/source-bigquery-batch/driver.go b/source-bigquery-batch/driver.go index 220d3434a..6ffc38a02 100644 --- a/source-bigquery-batch/driver.go +++ b/source-bigquery-batch/driver.go @@ -125,13 +125,16 @@ var ( fallbackKeyOld = []string{"/_meta/polled", "/_meta/index"} ) -func generateCollectionSchema(keyColumns []string, columnTypes map[string]*jsonschema.Schema) (json.RawMessage, error) { +func generateCollectionSchema(cfg *Config, keyColumns []string, columnTypes map[string]*jsonschema.Schema) (json.RawMessage, error) { // Generate schema for the metadata via reflection var reflector = jsonschema.Reflector{ ExpandedStruct: true, DoNotReference: true, } var metadataSchema = reflector.ReflectFromType(reflect.TypeOf(documentMetadata{})) + if !cfg.Advanced.parsedFeatureFlags["keyless_row_id"] { // Don't include row_id as required on old captures with keyless_row_id off + metadataSchema.Required = slices.DeleteFunc(metadataSchema.Required, func(s string) bool { return s == "row_id" }) + } metadataSchema.Definitions = nil metadataSchema.AdditionalProperties = nil @@ -167,13 +170,6 @@ func generateCollectionSchema(keyColumns []string, columnTypes map[string]*jsons return json.RawMessage(bs), nil } -var minimalSchema = func() json.RawMessage { - var schema, err = generateCollectionSchema(nil, nil) - if err != nil { - panic(err) - } - return schema -}() // Spec returns metadata about the capture connector. func (drv *BatchSQLDriver) Spec(ctx context.Context, req *pc.Request_Spec) (*pc.Response_Spec, error) { @@ -252,15 +248,19 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove return nil, fmt.Errorf("error serializing resource spec: %w", err) } - // Try to generate a useful collection schema, but on error fall back to the - // minimal schema with a fallback collection key which is always present. - var collectionSchema = minimalSchema + // Start with a minimal schema and a fallback collection key, which will be + // replaced with more useful versions if we have sufficient information. + collectionSchema, err := generateCollectionSchema(&cfg, nil, nil) + if err != nil { + return nil, fmt.Errorf("error generating minimal collection schema: %w", err) + } var collectionKey = fallbackKey if !cfg.Advanced.parsedFeatureFlags["keyless_row_id"] { collectionKey = fallbackKeyOld } + if tableKey, ok := keysByTable[tableID]; ok { - if generatedSchema, err := generateCollectionSchema(tableKey.Columns, tableKey.ColumnTypes); err == nil { + if generatedSchema, err := generateCollectionSchema(&cfg, tableKey.Columns, tableKey.ColumnTypes); err == nil { collectionSchema = generatedSchema collectionKey = nil for _, colName := range tableKey.Columns {