Skip to content

Commit

Permalink
source-mysql-batch: Require row_id only when keyless_row_id
Browse files Browse the repository at this point in the history
  • Loading branch information
willdonnelly committed Feb 26, 2025
1 parent e88e311 commit 90ebe2d
Show file tree
Hide file tree
Showing 23 changed files with 35 additions and 58 deletions.
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestAsyncCapture-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestBinaryTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down Expand Up @@ -122,8 +121,7 @@ Binding 1:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestDateAndTimeTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestEnumAndSetTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
}
},
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestFullRefresh-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestIntegerTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestJSONType-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestKeylessCapture-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
}
},
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestNumericTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestSimpleCapture-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestSpatialTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
3 changes: 1 addition & 2 deletions source-mysql-batch/.snapshots/TestStringTypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Binding 0:
"type": "object",
"required": [
"polled",
"index",
"row_id"
"index"
]
},
"id": {
Expand Down
24 changes: 12 additions & 12 deletions source-mysql-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,16 @@ var (
fallbackKeyOld = []string{"/_meta/polled", "/_meta/index"}
)

func generateCollectionSchema(keyColumns []string, columnTypes map[string]*jsonschema.Schema) (json.RawMessage, error) {
func generateCollectionSchema(cfg *Config, keyColumns []string, columnTypes map[string]*jsonschema.Schema) (json.RawMessage, error) {
// Generate schema for the metadata via reflection
var reflector = jsonschema.Reflector{
ExpandedStruct: true,
DoNotReference: true,
}
var metadataSchema = reflector.ReflectFromType(reflect.TypeOf(documentMetadata{}))
if !cfg.Advanced.parsedFeatureFlags["keyless_row_id"] { // Don't include row_id as required on old captures with keyless_row_id off
metadataSchema.Required = slices.DeleteFunc(metadataSchema.Required, func(s string) bool { return s == "row_id" })
}
metadataSchema.Definitions = nil
metadataSchema.AdditionalProperties = nil

Expand Down Expand Up @@ -168,13 +171,6 @@ func generateCollectionSchema(keyColumns []string, columnTypes map[string]*jsons
return json.RawMessage(bs), nil
}

var minimalSchema = func() json.RawMessage {
var schema, err = generateCollectionSchema(nil, nil)
if err != nil {
panic(err)
}
return schema
}()

// Spec returns metadata about the capture connector.
func (drv *BatchSQLDriver) Spec(ctx context.Context, req *pc.Request_Spec) (*pc.Response_Spec, error) {
Expand Down Expand Up @@ -253,15 +249,19 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove
return nil, fmt.Errorf("error serializing resource spec: %w", err)
}

// Try to generate a useful collection schema, but on error fall back to the
// minimal schema with a fallback collection key which is always present.
var collectionSchema = minimalSchema
// Start with a minimal schema and a fallback collection key, which will be
// replaced with more useful versions if we have sufficient information.
collectionSchema, err := generateCollectionSchema(&cfg, nil, nil)
if err != nil {
return nil, fmt.Errorf("error generating minimal collection schema: %w", err)
}
var collectionKey = fallbackKey
if !cfg.Advanced.parsedFeatureFlags["keyless_row_id"] {
collectionKey = fallbackKeyOld
}

if tableKey, ok := keysByTable[tableID]; ok {
if generatedSchema, err := generateCollectionSchema(tableKey.Columns, tableKey.ColumnTypes); err == nil {
if generatedSchema, err := generateCollectionSchema(&cfg, tableKey.Columns, tableKey.ColumnTypes); err == nil {
collectionSchema = generatedSchema
collectionKey = nil
for _, colName := range tableKey.Columns {
Expand Down

0 comments on commit 90ebe2d

Please sign in to comment.