Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SourcedSchema to capture protocol and support in the runtime #1946

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

jgraettinger
Copy link
Member

@jgraettinger jgraettinger commented Feb 19, 2025

Description:

This PR adds SourcedSchema to the capture protocol, and uses it within the runtime to widen the inferred schema logs it produces. See individual commits.

Testing:

I've been testing this manually, by modifying a estuary/flow checkout:

diff --git a/tests/test_capture_hello_world.flow.yaml b/tests/test_capture_hello_world.flow.yaml
index 960c4b9410..6e8161250f 100644
--- a/tests/test_capture_hello_world.flow.yaml
+++ b/tests/test_capture_hello_world.flow.yaml
@@ -2,10 +2,12 @@
 captures:
   acmeCo/hello-world:
     endpoint:
-      connector:
-        image: "ghcr.io/estuary/source-hello-world:dev"
+      local:
+        command:
+          - /home/johnny/estuary/connectors/source-hello-world/source-hello-world
         config:
           rate: 2
+        protobuf: true
     bindings:
       - resource:

And source-hello-world in estuary/connectors (running go build .):

diff --git a/source-hello-world/main.go b/source-hello-world/main.go
index f550195a..cb8c5187 100644
--- a/source-hello-world/main.go
+++ b/source-hello-world/main.go
@@ -175,7 +175,9 @@ func (c *capture) Run() error {
                return err
        }
 
+       var count = 0
        var interval = time.Duration(float64(time.Second) / float64(c.Config.Rate))
+
        for {
                select {
                case <-c.Stream.Context().Done():
@@ -183,6 +185,7 @@ func (c *capture) Run() error {
                        return nil
                default:
                }
+               count += 1
 
                for idx, binding := range c.Bindings {
                        var cursor = c.State.Cursors[binding.stateKey]
@@ -198,6 +201,41 @@ func (c *capture) Run() error {
                                return err
                        }
                        c.State.Cursors[binding.stateKey]++
+
+                       if count%5 == 0 {
+                               c.Stream.Send(&pc.Response{
+                                       SourcedSchema: &pc.Response_SourcedSchema{
+                                               Binding: uint32(idx),
+                                               SchemaJson: json.RawMessage(`{
+                                                       "additionalProperties": false,
+                                                       "type": "object",
+                                                       "properties": {
+                                                               "foobar": {
+                                                                       "type": "string",
+                                                                       "format": "date-time",
+                                                                       "title": "My column",
+                                                                       "minLength": 5,
+                                                                       "maxLength": 5
+                                                               }
+                                                       }
+                                               }`),
+                                       },
+                               })
+                               c.Stream.Send(&pc.Response{
+                                       SourcedSchema: &pc.Response_SourcedSchema{
+                                               Binding: uint32(idx),
+                                               SchemaJson: json.RawMessage(`{
+                                                       "additionalProperties": false,
+                                                       "type": "object",
+                                                       "properties": {
+                                                               "ts": {
+                                                                       "type": ["string", "integer"]
+                                                               }
+                                                       }
+                                               }`),
+                                       },
+                               })
+                       }
                }

And then running:

RUST_LOG=info cargo run -p flowctl preview --source tests/test_capture_hello_world.flow.yaml

I see the expected outcome, of logs like:

2025-02-19T00:00:53.716044Z  INFO runtime::capture::protocol: inferred schema updated schema={
  "$schema": "https://json-schema.org/draft/2019-09/schema",
  "additionalProperties": false,
  "properties": {
    "_meta": {
      "additionalProperties": false,
      "properties": {
        "uuid": {
          "maxLength": 64,
          "minLength": 32,
          "type": "string"
        }
      },
      "required": [
        "uuid"
      ],
      "type": "object"
    },
    "foobar": {
      "format": "date-time",
      "maxLength": 5,
      "minLength": 5,
      "title": "My column",
      "type": "string"
    },
    "message": {
      "type": "string"
    },
    "ts": {
      "format": "date-time",
      "maxLength": 134,
      "minLength": 32,
      "type": "string"
    }
  },
  "required": [
    "message",
    "ts"
  ],
  "type": "object"
} collection_name=acmeCo/events binding=0

End-to-end testing will require a connector to robustly support the protocol.

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)


This change is Reviewable

…e it yet)"

This reverts commit f37f0a3.

We've switched to a design which doesn't require `write_inference`,
and are removing it. It has never been populated or used.
SourcedSchema allows capture connectors to tell the runtime of a
possible update to source-defined schema, drawn from the source
systems understanding of the binding schema. The runtime will apply
sourced schemas to the inference it generates for each binding.
Within a transaction, SourcedSchema messages are accumulated for each
binding using doc::Shape::union().

At the close of the transaction, SourcedSchemas are then intersected
with the write schema, and the result is used to pre-widen the inferred
schema of each binding.

If a binding had a SourcedSchema in a particular transaction, we always
emit an "inferred schema updated" log at transaction close, even if it
may not have actually changed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant