Skip to content

Commit

Permalink
source-redshift-batch: Require row_id only when keyless_row_id
Browse files Browse the repository at this point in the history
  • Loading branch information
willdonnelly committed Feb 26, 2025
1 parent 18f018b commit 73281a8
Show file tree
Hide file tree
Showing 12 changed files with 16 additions and 24 deletions.
3 changes: 1 addition & 2 deletions source-redshift-batch/.snapshots/TestBasicCapture-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"data": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"a_bool": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"data": {
Expand Down
3 changes: 1 addition & 2 deletions source-redshift-batch/.snapshots/TestFloatNaNs-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"a_double": {
Expand Down
3 changes: 1 addition & 2 deletions source-redshift-batch/.snapshots/TestKeyDiscovery
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"data": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"data": {
Expand Down
3 changes: 1 addition & 2 deletions source-redshift-batch/.snapshots/TestKeylessDiscovery
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"v_bigint": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"data": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"data": {
Expand Down
3 changes: 1 addition & 2 deletions source-redshift-batch/.snapshots/TestSchemaFilter-FilteredIn
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"data": {
Expand Down
3 changes: 1 addition & 2 deletions source-redshift-batch/.snapshots/TestSchemaFilter-Unfiltered
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"data": {
Expand Down
7 changes: 5 additions & 2 deletions source-redshift-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,16 @@ var (
fallbackKeyOld = []string{}
)

func generateCollectionSchema(keyColumns []string, columnTypes map[string]columnType, useSchemaInference bool) (json.RawMessage, error) {
func generateCollectionSchema(cfg *Config, keyColumns []string, columnTypes map[string]columnType, useSchemaInference bool) (json.RawMessage, error) {
// Generate schema for the metadata via reflection
var reflector = jsonschema.Reflector{
ExpandedStruct: true,
DoNotReference: true,
}
var metadataSchema = reflector.ReflectFromType(reflect.TypeOf(documentMetadata{}))
if !cfg.Advanced.parsedFeatureFlags["keyless_row_id"] { // Don't include row_id as required on old captures with keyless_row_id off
metadataSchema.Required = slices.DeleteFunc(metadataSchema.Required, func(s string) bool { return s == "row_id" })
}
metadataSchema.Definitions = nil
metadataSchema.AdditionalProperties = nil

Expand Down Expand Up @@ -285,7 +288,7 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove
columnTypes[column.Name] = column.DataType
}

generatedSchema, err := generateCollectionSchema(keyColumns, columnTypes, cfg.Advanced.parsedFeatureFlags["use_schema_inference"])
generatedSchema, err := generateCollectionSchema(&cfg, keyColumns, columnTypes, cfg.Advanced.parsedFeatureFlags["use_schema_inference"])
if err != nil {
log.WithFields(log.Fields{"table": tableID, "err": err}).Warn("unable to generate collection schema")
continue
Expand Down

0 comments on commit 73281a8

Please sign in to comment.