Skip to content

Commit

Permalink
source-bigquery-batch: Require row_id only when keyless_row_id
Browse files Browse the repository at this point in the history
  • Loading branch information
willdonnelly committed Feb 26, 2025
1 parent eb7a1fc commit 24d37d0
Show file tree
Hide file tree
Showing 12 changed files with 23 additions and 34 deletions.
3 changes: 1 addition & 2 deletions source-bigquery-batch/.snapshots/TestBinaryTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-bigquery-batch/.snapshots/TestFullRefresh-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-bigquery-batch/.snapshots/TestIntegerTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-bigquery-batch/.snapshots/TestJSONType-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-bigquery-batch/.snapshots/TestNumericTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-bigquery-batch/.snapshots/TestSimpleCapture-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-bigquery-batch/.snapshots/TestStringTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-bigquery-batch/.snapshots/TestTemporalTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
24 changes: 12 additions & 12 deletions source-bigquery-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,16 @@ var (
fallbackKeyOld = []string{"/_meta/polled", "/_meta/index"}
)

func generateCollectionSchema(keyColumns []string, columnTypes map[string]*jsonschema.Schema) (json.RawMessage, error) {
func generateCollectionSchema(cfg *Config, keyColumns []string, columnTypes map[string]*jsonschema.Schema) (json.RawMessage, error) {
// Generate schema for the metadata via reflection
var reflector = jsonschema.Reflector{
ExpandedStruct: true,
DoNotReference: true,
}
var metadataSchema = reflector.ReflectFromType(reflect.TypeOf(documentMetadata{}))
if !cfg.Advanced.parsedFeatureFlags["keyless_row_id"] { // Don't include row_id as required on old captures with keyless_row_id off
metadataSchema.Required = slices.DeleteFunc(metadataSchema.Required, func(s string) bool { return s == "row_id" })
}
metadataSchema.Definitions = nil
metadataSchema.AdditionalProperties = nil

Expand Down Expand Up @@ -167,13 +170,6 @@ func generateCollectionSchema(keyColumns []string, columnTypes map[string]*jsons
return json.RawMessage(bs), nil
}

var minimalSchema = func() json.RawMessage {
var schema, err = generateCollectionSchema(nil, nil)
if err != nil {
panic(err)
}
return schema
}()

// Spec returns metadata about the capture connector.
func (drv *BatchSQLDriver) Spec(ctx context.Context, req *pc.Request_Spec) (*pc.Response_Spec, error) {
Expand Down Expand Up @@ -252,15 +248,19 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove
return nil, fmt.Errorf("error serializing resource spec: %w", err)
}

// Try to generate a useful collection schema, but on error fall back to the
// minimal schema with a fallback collection key which is always present.
var collectionSchema = minimalSchema
// Start with a minimal schema and a fallback collection key, which will be
// replaced with more useful versions if we have sufficient information.
collectionSchema, err := generateCollectionSchema(&cfg, nil, nil)
if err != nil {
return nil, fmt.Errorf("error generating minimal collection schema: %w", err)
}
var collectionKey = fallbackKey
if !cfg.Advanced.parsedFeatureFlags["keyless_row_id"] {
collectionKey = fallbackKeyOld
}

if tableKey, ok := keysByTable[tableID]; ok {
if generatedSchema, err := generateCollectionSchema(tableKey.Columns, tableKey.ColumnTypes); err == nil {
if generatedSchema, err := generateCollectionSchema(&cfg, tableKey.Columns, tableKey.ColumnTypes); err == nil {
collectionSchema = generatedSchema
collectionKey = nil
for _, colName := range tableKey.Columns {
Expand Down

0 comments on commit 24d37d0

Please sign in to comment.