Skip to content

Commit

Permalink
source-mysql-batch: Advanced option 'Discover Views'
Browse files Browse the repository at this point in the history
Implements an advanced option for returning views from discovery.
  • Loading branch information
willdonnelly committed Feb 25, 2025
1 parent 602a74b commit cb71cbd
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 12 deletions.
45 changes: 45 additions & 0 deletions source-mysql-batch/.snapshots/TestCaptureFromView-Capture
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# ================================
# Collection "acmeCo/test/test_capturefromview_194890": 23 Documents
# ================================
{"_meta":{"polled":"<TIMESTAMP>","index":0,"row_id":0},"id":0,"name":"Row 0","updated_at":"2025-02-13 12:00:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":1,"row_id":1},"id":1,"name":"Row 1","updated_at":"2025-02-13 12:01:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":2,"row_id":2},"id":2,"name":"Row 2","updated_at":"2025-02-13 12:02:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":3,"row_id":3},"id":3,"name":"Row 3","updated_at":"2025-02-13 12:03:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":4,"row_id":4},"id":4,"name":"Row 4","updated_at":"2025-02-13 12:04:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":5,"row_id":5},"id":5,"name":"Row 5","updated_at":"2025-02-13 12:05:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":6,"row_id":6},"id":6,"name":"Row 6","updated_at":"2025-02-13 12:06:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":7,"row_id":7},"id":7,"name":"Row 7","updated_at":"2025-02-13 12:07:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":8,"row_id":8},"id":8,"name":"Row 8","updated_at":"2025-02-13 12:08:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":9,"row_id":9},"id":9,"name":"Row 9","updated_at":"2025-02-13 12:09:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":0,"row_id":10},"id":10,"name":"Row 10","updated_at":"2025-02-13 12:10:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":1,"row_id":11},"id":11,"name":"Row 11","updated_at":"2025-02-13 12:11:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":2,"row_id":12},"id":12,"name":"Row 12","updated_at":"2025-02-13 12:12:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":3,"row_id":13},"id":13,"name":"Row 13","updated_at":"2025-02-13 12:13:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":4,"row_id":14},"id":14,"name":"Row 14","updated_at":"2025-02-13 12:14:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":5,"row_id":15},"id":15,"name":"Row 15","updated_at":"2025-02-13 12:15:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":6,"row_id":16},"id":16,"name":"Row 16","updated_at":"2025-02-13 12:16:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":7,"row_id":17},"id":17,"name":"Row 17","updated_at":"2025-02-13 12:17:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":8,"row_id":18},"id":18,"name":"Row 18","updated_at":"2025-02-13 12:18:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":9,"row_id":19},"id":19,"name":"Row 19","updated_at":"2025-02-13 12:19:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":0,"row_id":20},"id":2,"name":"Row 2","updated_at":"2025-02-13 12:20:00","visible":0}
{"_meta":{"polled":"<TIMESTAMP>","index":1,"row_id":21},"id":3,"name":"Row 3","updated_at":"2025-02-13 12:20:00","visible":1}
{"_meta":{"polled":"<TIMESTAMP>","index":2,"row_id":22},"id":4,"name":"Row 4","updated_at":"2025-02-13 12:20:00","visible":0}
# ================================
# Collection "acmeCo/test/test_capturefromview_227836": 11 Documents
# ================================
{"_meta":{"polled":"<TIMESTAMP>","index":0,"row_id":0},"id":0,"name":"Row 0","updated_at":"2025-02-13 12:00:00"}
{"_meta":{"polled":"<TIMESTAMP>","index":1,"row_id":1},"id":2,"name":"Row 2","updated_at":"2025-02-13 12:02:00"}
{"_meta":{"polled":"<TIMESTAMP>","index":2,"row_id":2},"id":4,"name":"Row 4","updated_at":"2025-02-13 12:04:00"}
{"_meta":{"polled":"<TIMESTAMP>","index":3,"row_id":3},"id":6,"name":"Row 6","updated_at":"2025-02-13 12:06:00"}
{"_meta":{"polled":"<TIMESTAMP>","index":4,"row_id":4},"id":8,"name":"Row 8","updated_at":"2025-02-13 12:08:00"}
{"_meta":{"polled":"<TIMESTAMP>","index":0,"row_id":5},"id":10,"name":"Row 10","updated_at":"2025-02-13 12:10:00"}
{"_meta":{"polled":"<TIMESTAMP>","index":1,"row_id":6},"id":12,"name":"Row 12","updated_at":"2025-02-13 12:12:00"}
{"_meta":{"polled":"<TIMESTAMP>","index":2,"row_id":7},"id":14,"name":"Row 14","updated_at":"2025-02-13 12:14:00"}
{"_meta":{"polled":"<TIMESTAMP>","index":3,"row_id":8},"id":16,"name":"Row 16","updated_at":"2025-02-13 12:16:00"}
{"_meta":{"polled":"<TIMESTAMP>","index":4,"row_id":9},"id":18,"name":"Row 18","updated_at":"2025-02-13 12:18:00"}
{"_meta":{"polled":"<TIMESTAMP>","index":0,"row_id":10},"id":3,"name":"Row 3","updated_at":"2025-02-13 12:20:00"}
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"test_capturefromview_194890":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13 12:20:00"],"DocumentCount":23,"LastPolled":"<TIMESTAMP>"},"test_capturefromview_227836":{"CursorNames":["updated_at"],"CursorValues":["2025-02-13 12:20:00"],"DocumentCount":11,"LastPolled":"<TIMESTAMP>"}}}

140 changes: 140 additions & 0 deletions source-mysql-batch/.snapshots/TestCaptureFromView-DiscoveryWithViews
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
Binding 0:
{
"resource_config_json": {
"name": "test_capturefromview_194890",
"schema": "test",
"table": "capturefromview_194890"
},
"resource_path": [
"test_capturefromview_194890"
],
"collection": {
"name": "acmeCo/test/test_capturefromview_194890",
"read_schema_json": {
"type": "object",
"required": [
"_meta",
"id"
],
"properties": {
"_meta": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://github.com/estuary/connectors/source-mysql-batch/document-metadata",
"properties": {
"polled": {
"type": "string",
"format": "date-time",
"title": "Polled Timestamp",
"description": "The time at which the update query which produced this document as executed."
},
"index": {
"type": "integer",
"title": "Result Index",
"description": "The index of this document within the query execution which produced it."
},
"row_id": {
"type": "integer",
"title": "Row ID",
"description": "Row ID of the Document"
},
"op": {
"type": "string",
"enum": [
"c",
"u",
"d"
],
"title": "Change Operation",
"description": "Operation type (c: Create / u: Update / d: Delete)",
"default": "u"
}
},
"type": "object",
"required": [
"polled",
"index",
"row_id"
]
},
"id": {
"type": "integer"
}
},
"x-infer-schema": true
},
"key": [
"/id"
],
"projections": null
},
"state_key": "test_capturefromview_194890"
}
Binding 1:
{
"resource_config_json": {
"name": "test_capturefromview_227836",
"schema": "test",
"table": "capturefromview_227836"
},
"resource_path": [
"test_capturefromview_227836"
],
"collection": {
"name": "acmeCo/test/test_capturefromview_227836",
"read_schema_json": {
"type": "object",
"required": [
"_meta"
],
"properties": {
"_meta": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://github.com/estuary/connectors/source-mysql-batch/document-metadata",
"properties": {
"polled": {
"type": "string",
"format": "date-time",
"title": "Polled Timestamp",
"description": "The time at which the update query which produced this document as executed."
},
"index": {
"type": "integer",
"title": "Result Index",
"description": "The index of this document within the query execution which produced it."
},
"row_id": {
"type": "integer",
"title": "Row ID",
"description": "Row ID of the Document"
},
"op": {
"type": "string",
"enum": [
"c",
"u",
"d"
],
"title": "Change Operation",
"description": "Operation type (c: Create / u: Update / d: Delete)",
"default": "u"
}
},
"type": "object",
"required": [
"polled",
"index",
"row_id"
]
}
},
"x-infer-schema": true
},
"key": [
"/_meta/polled",
"/_meta/index"
],
"projections": null
},
"state_key": "test_capturefromview_227836"
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
Binding 0:
{
"resource_config_json": {
"name": "test_capturefromview_194890",
"schema": "test",
"table": "capturefromview_194890"
},
"resource_path": [
"test_capturefromview_194890"
],
"collection": {
"name": "acmeCo/test/test_capturefromview_194890",
"read_schema_json": {
"type": "object",
"required": [
"_meta",
"id"
],
"properties": {
"_meta": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://github.com/estuary/connectors/source-mysql-batch/document-metadata",
"properties": {
"polled": {
"type": "string",
"format": "date-time",
"title": "Polled Timestamp",
"description": "The time at which the update query which produced this document as executed."
},
"index": {
"type": "integer",
"title": "Result Index",
"description": "The index of this document within the query execution which produced it."
},
"row_id": {
"type": "integer",
"title": "Row ID",
"description": "Row ID of the Document"
},
"op": {
"type": "string",
"enum": [
"c",
"u",
"d"
],
"title": "Change Operation",
"description": "Operation type (c: Create / u: Update / d: Delete)",
"default": "u"
}
},
"type": "object",
"required": [
"polled",
"index",
"row_id"
]
},
"id": {
"type": "integer"
}
},
"x-infer-schema": true
},
"key": [
"/id"
],
"projections": null
},
"state_key": "test_capturefromview_194890"
}

4 changes: 2 additions & 2 deletions source-mysql-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type BatchSQLDriver struct {
ConfigSchema json.RawMessage

Connect func(ctx context.Context, cfg *Config) (*client.Conn, error)
GenerateResource func(resourceName, schemaName, tableName, tableType string) (*Resource, error)
GenerateResource func(cfg *Config, resourceName, schemaName, tableName, tableType string) (*Resource, error)
ExcludedSystemSchemas []string
SelectQueryTemplate func(res *Resource) (string, error)
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove
var tableID = table.Schema + "." + table.Name

var recommendedName = recommendedCatalogName(table.Schema, table.Name)
var res, err = drv.GenerateResource(recommendedName, table.Schema, table.Name, table.Type)
var res, err = drv.GenerateResource(&cfg, recommendedName, table.Schema, table.Name, table.Type)
if err != nil {
log.WithFields(log.Fields{
"reason": err,
Expand Down
17 changes: 9 additions & 8 deletions source-mysql-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Config struct {
}

type advancedConfig struct {
DiscoverViews bool `json:"discover_views,omitempty" jsonschema:"title=Discover Views,description=When set views will be automatically discovered as resources. If unset only tables will be discovered."`
PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset." jsonschema_extras:"pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"`
DiscoverSchemas []string `json:"discover_schemas,omitempty" jsonschema:"title=Discovery Schema Selection,description=If this is specified only tables in the selected schema(s) will be automatically discovered. Omit all entries to discover tables from all schemas."`
DBName string `json:"dbname,omitempty" jsonschema:"title=Database Name,description=The name of database to connect to. In general this shouldn't matter. The connector can discover and capture from all databases it's authorized to access."`
Expand Down Expand Up @@ -192,16 +193,16 @@ func quoteIdentifier(name string) string {
return "`" + strings.ReplaceAll(name, "`", "``") + "`"
}

func generateMySQLResource(resourceName, schemaName, tableName, tableType string) (*Resource, error) {
if !strings.EqualFold(tableType, "BASE TABLE") {
return nil, fmt.Errorf("discovery will not autogenerate resource configs for entities of type %q, but you may add them manually", tableType)
func generateMySQLResource(cfg *Config, resourceName, schemaName, tableName, tableType string) (*Resource, error) {
if strings.EqualFold(tableType, "BASE TABLE") || (strings.EqualFold(tableType, "VIEW") && cfg.Advanced.DiscoverViews) {
return &Resource{
Name: resourceName,
SchemaName: schemaName,
TableName: tableName,
}, nil
}
return nil, fmt.Errorf("discovery will not autogenerate resource configs for entities of type %q, but you may add them manually", tableType)

return &Resource{
Name: resourceName,
SchemaName: schemaName,
TableName: tableName,
}, nil
}

var mysqlDriver = &BatchSQLDriver{
Expand Down
71 changes: 69 additions & 2 deletions source-mysql-batch/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,11 @@ func TestSpec(t *testing.T) {
// TestQueryTemplate is a unit test which verifies that the default query template produces
// the expected output for initial/subsequent polling queries with different cursors.
func TestQueryTemplate(t *testing.T) {
res, err := mysqlDriver.GenerateResource("test_foobar", "test", "foobar", "BASE TABLE")
require.NoError(t, err)
var res = &Resource{
Name: "test_foobar",
SchemaName: "test",
TableName: "foobar",
}

tmpl, err := template.New("query").Funcs(templateFuncs).Parse(tableQueryTemplate)
require.NoError(t, err)
Expand Down Expand Up @@ -953,3 +956,67 @@ func TestFeatureFlagKeylessRowID(t *testing.T) {
})
}
}

// TestCaptureFromView exercises discovery and capture from a view with an updated_at cursor.
func TestCaptureFromView(t *testing.T) {
var ctx, cs, control = context.Background(), testCaptureSpec(t), testMySQLClient(t)
var baseTableName, tableID = testTableName(t, uniqueTableID(t))
var viewName, viewID = testTableName(t, uniqueTableID(t, "view"))

// Create base table and view
createTestTable(t, control, baseTableName, `(
id INTEGER PRIMARY KEY,
name TEXT,
visible BOOLEAN,
updated_at TIMESTAMP
)`)
executeControlQuery(t, control, fmt.Sprintf(`
CREATE VIEW %s AS
SELECT id, name, updated_at
FROM %s
WHERE visible = true`, viewName, baseTableName))
t.Cleanup(func() { executeControlQuery(t, control, fmt.Sprintf("DROP VIEW IF EXISTS %s", viewName)) })

// By default views should not be discovered.
cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(tableID), regexp.MustCompile(viewID))
t.Run("DiscoveryWithoutViews", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) })

// Enable view discovery and re-discover bindings, then set a cursor for capturing the view.
cs.EndpointSpec.(*Config).Advanced.DiscoverViews = true
cs.Bindings = discoverBindings(ctx, t, cs, regexp.MustCompile(tableID), regexp.MustCompile(viewID))
t.Run("DiscoveryWithViews", func(t *testing.T) { cupaloy.SnapshotT(t, summarizeBindings(t, cs.Bindings)) })

// Update both the base table and the view incrementally using the updated_at column.
setResourceCursor(t, cs.Bindings[0], "updated_at")
setResourceCursor(t, cs.Bindings[1], "updated_at")

t.Run("Capture", func(t *testing.T) {
setShutdownAfterQuery(t, true)
baseTime := time.Date(2025, 2, 13, 12, 0, 0, 0, time.UTC)

// First batch: Insert rows into base table, some visible in view
for i := 0; i < 10; i++ {
executeControlQuery(t, control, fmt.Sprintf("INSERT INTO %s (id, name, visible, updated_at) VALUES (?, ?, ?, ?)", baseTableName),
i, fmt.Sprintf("Row %d", i), i%2 == 0, // Even numbered rows are visible
baseTime.Add(time.Duration(i)*time.Minute).Format("2006-01-02 15:04:05"))
}
cs.Capture(ctx, t, nil)

// Second batch: More rows with later timestamps
for i := 10; i < 20; i++ {
executeControlQuery(t, control, fmt.Sprintf("INSERT INTO %s (id, name, visible, updated_at) VALUES (?, ?, ?, ?)", baseTableName),
i, fmt.Sprintf("Row %d", i), i%2 == 0, // Even numbered rows are visible
baseTime.Add(time.Duration(i)*time.Minute).Format("2006-01-02 15:04:05"))
}
cs.Capture(ctx, t, nil)

// Update some rows to change their visibility
executeControlQuery(t, control, fmt.Sprintf(`
UPDATE %s SET visible = NOT visible, updated_at = ?
WHERE id IN (2, 3, 4)`, baseTableName),
baseTime.Add(20*time.Minute).Format("2006-01-02 15:04:05"))
cs.Capture(ctx, t, nil)

cupaloy.SnapshotT(t, cs.Summary())
})
}

0 comments on commit cb71cbd

Please sign in to comment.