Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SourcedSchema to capture protocol and support in the runtime #1946

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/doc/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use json::schema::build::build_schema;
pub fn build_bundle(bundle: &str) -> Result<Schema, json::schema::build::Error> {
let mut schema = build_schema(
url::Url::parse("schema://bundle").unwrap(),
&serde_json::from_str(bundle).unwrap(),
&serde_json::from_str(bundle).map_err(json::schema::build::Error::FormatErr)?,
)?;

// Tweak scope to remove a synthetic resource pointer that was previously
Expand Down
35 changes: 34 additions & 1 deletion crates/proto-flow/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ pub mod request {
pub checkpoints: u32,
}
}
/// Spec responds to Request.Spec.
/// Next tag: 9
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Response {
Expand All @@ -177,6 +179,8 @@ pub struct Response {
pub opened: ::core::option::Option<response::Opened>,
#[prost(message, optional, tag = "6")]
pub captured: ::core::option::Option<response::Captured>,
#[prost(message, optional, tag = "8")]
pub sourced_schema: ::core::option::Option<response::SourcedSchema>,
#[prost(message, optional, tag = "7")]
pub checkpoint: ::core::option::Option<response::Checkpoint>,
/// Reserved for internal use.
Expand All @@ -185,7 +189,6 @@ pub struct Response {
}
/// Nested message and enum types in `Response`.
pub mod response {
/// Spec responds to Request.Spec.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Spec {
Expand Down Expand Up @@ -313,6 +316,36 @@ pub mod response {
#[prost(string, tag = "2")]
pub doc_json: ::prost::alloc::string::String,
}
/// SourcedSchema notifies the runtime of a source-defined schema of the
/// indicated binding. It's not required that the connector know that the
/// schema has actually changed since a last SourcedSchema.
/// It's encouraged for connectors to emit SourcedSchema liberally,
/// such as on startup, or periodically, or upon encountering a previously
/// unseen column.
///
/// SourcedSchema may be a partial schema: it may schematize some
/// specific field(s) and not others that are in active use.
///
/// SourcedSchema should be maximally restrictive. It should disallow
/// `types` and `additionalProperties` which are not explicitly being
/// schematized. The platform will union a SourcedSchema with all other
/// SourcedSchema messages of the binding, as well as additional inference
/// updates required to fit Captured documents.
///
/// SourcedSchema is transactional. It may be interleaved with zero or more
/// Captured documents, and multiple SourcedSchema messages may be emitted
/// for a single binding, but an emitted SourcedSchema has no effect until
/// it's followed by a Checkpoint.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SourcedSchema {
/// Index of the Open binding for which the schema applies.
#[prost(uint32, tag = "1")]
pub binding: u32,
/// JSON schema of documents produced by this binding.
#[prost(string, tag = "2")]
pub schema_json: ::prost::alloc::string::String,
}
/// Checkpoint all preceding documents of this invocation since the last checkpoint.
/// The Flow runtime may begin to commit documents in a transaction.
/// Note that the runtime may include more than one checkpoint in a single transaction.
Expand Down
129 changes: 129 additions & 0 deletions crates/proto-flow/src/capture.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,9 @@ impl serde::Serialize for Response {
if self.captured.is_some() {
len += 1;
}
if self.sourced_schema.is_some() {
len += 1;
}
if self.checkpoint.is_some() {
len += 1;
}
Expand All @@ -1161,6 +1164,9 @@ impl serde::Serialize for Response {
if let Some(v) = self.captured.as_ref() {
struct_ser.serialize_field("captured", v)?;
}
if let Some(v) = self.sourced_schema.as_ref() {
struct_ser.serialize_field("sourcedSchema", v)?;
}
if let Some(v) = self.checkpoint.as_ref() {
struct_ser.serialize_field("checkpoint", v)?;
}
Expand All @@ -1185,6 +1191,8 @@ impl<'de> serde::Deserialize<'de> for Response {
"applied",
"opened",
"captured",
"sourced_schema",
"sourcedSchema",
"checkpoint",
"internal",
"$internal",
Expand All @@ -1198,6 +1206,7 @@ impl<'de> serde::Deserialize<'de> for Response {
Applied,
Opened,
Captured,
SourcedSchema,
Checkpoint,
Internal,
}
Expand Down Expand Up @@ -1227,6 +1236,7 @@ impl<'de> serde::Deserialize<'de> for Response {
"applied" => Ok(GeneratedField::Applied),
"opened" => Ok(GeneratedField::Opened),
"captured" => Ok(GeneratedField::Captured),
"sourcedSchema" | "sourced_schema" => Ok(GeneratedField::SourcedSchema),
"checkpoint" => Ok(GeneratedField::Checkpoint),
"$internal" | "internal" => Ok(GeneratedField::Internal),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
Expand Down Expand Up @@ -1254,6 +1264,7 @@ impl<'de> serde::Deserialize<'de> for Response {
let mut applied__ = None;
let mut opened__ = None;
let mut captured__ = None;
let mut sourced_schema__ = None;
let mut checkpoint__ = None;
let mut internal__ = None;
while let Some(k) = map_.next_key()? {
Expand Down Expand Up @@ -1294,6 +1305,12 @@ impl<'de> serde::Deserialize<'de> for Response {
}
captured__ = map_.next_value()?;
}
GeneratedField::SourcedSchema => {
if sourced_schema__.is_some() {
return Err(serde::de::Error::duplicate_field("sourcedSchema"));
}
sourced_schema__ = map_.next_value()?;
}
GeneratedField::Checkpoint => {
if checkpoint__.is_some() {
return Err(serde::de::Error::duplicate_field("checkpoint"));
Expand All @@ -1317,6 +1334,7 @@ impl<'de> serde::Deserialize<'de> for Response {
applied: applied__,
opened: opened__,
captured: captured__,
sourced_schema: sourced_schema__,
checkpoint: checkpoint__,
internal: internal__.unwrap_or_default(),
})
Expand Down Expand Up @@ -1982,6 +2000,117 @@ impl<'de> serde::Deserialize<'de> for response::Opened {
deserializer.deserialize_struct("capture.Response.Opened", FIELDS, GeneratedVisitor)
}
}
impl serde::Serialize for response::SourcedSchema {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut len = 0;
if self.binding != 0 {
len += 1;
}
if !self.schema_json.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("capture.Response.SourcedSchema", len)?;
if self.binding != 0 {
struct_ser.serialize_field("binding", &self.binding)?;
}
if !self.schema_json.is_empty() {
struct_ser.serialize_field("documentSchema", crate::as_raw_json(&self.schema_json)?)?;
}
struct_ser.end()
}
}
impl<'de> serde::Deserialize<'de> for response::SourcedSchema {
#[allow(deprecated)]
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
const FIELDS: &[&str] = &[
"binding",
"schema_json",
"documentSchema",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
Binding,
SchemaJson,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
where
D: serde::Deserializer<'de>,
{
struct GeneratedVisitor;

impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
type Value = GeneratedField;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(formatter, "expected one of: {:?}", &FIELDS)
}

#[allow(unused_variables)]
fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
where
E: serde::de::Error,
{
match value {
"binding" => Ok(GeneratedField::Binding),
"documentSchema" | "schema_json" => Ok(GeneratedField::SchemaJson),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
}
deserializer.deserialize_identifier(GeneratedVisitor)
}
}
struct GeneratedVisitor;
impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
type Value = response::SourcedSchema;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("struct capture.Response.SourcedSchema")
}

fn visit_map<V>(self, mut map_: V) -> std::result::Result<response::SourcedSchema, V::Error>
where
V: serde::de::MapAccess<'de>,
{
let mut binding__ = None;
let mut schema_json__ : Option<Box<serde_json::value::RawValue>> = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Binding => {
if binding__.is_some() {
return Err(serde::de::Error::duplicate_field("binding"));
}
binding__ =
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::SchemaJson => {
if schema_json__.is_some() {
return Err(serde::de::Error::duplicate_field("documentSchema"));
}
schema_json__ = Some(map_.next_value()?);
}
}
}
Ok(response::SourcedSchema {
binding: binding__.unwrap_or_default(),
schema_json: schema_json__.map(|r| Box::<str>::from(r).into()).unwrap_or_default(),
})
}
}
deserializer.deserialize_struct("capture.Response.SourcedSchema", FIELDS, GeneratedVisitor)
}
}
impl serde::Serialize for response::Spec {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
Expand Down
6 changes: 0 additions & 6 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ pub struct Projection {
/// Inference of this projection.
#[prost(message, optional, tag = "6")]
pub inference: ::core::option::Option<Inference>,
/// Inference of this projection from the collection's write schema.
/// This is set only when:
/// 1) The collection has separate read and write schemas, and
/// 2) The write schema defines constrains over this projection.
#[prost(message, optional, tag = "7")]
pub write_inference: ::core::option::Option<Inference>,
}
/// Inference details type information which is statically known
/// about a given document location.
Expand Down
18 changes: 0 additions & 18 deletions crates/proto-flow/src/flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4738,9 +4738,6 @@ impl serde::Serialize for Projection {
if self.inference.is_some() {
len += 1;
}
if self.write_inference.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.Projection", len)?;
if !self.ptr.is_empty() {
struct_ser.serialize_field("ptr", &self.ptr)?;
Expand All @@ -4760,9 +4757,6 @@ impl serde::Serialize for Projection {
if let Some(v) = self.inference.as_ref() {
struct_ser.serialize_field("inference", v)?;
}
if let Some(v) = self.write_inference.as_ref() {
struct_ser.serialize_field("writeInference", v)?;
}
struct_ser.end()
}
}
Expand All @@ -4781,8 +4775,6 @@ impl<'de> serde::Deserialize<'de> for Projection {
"is_primary_key",
"isPrimaryKey",
"inference",
"write_inference",
"writeInference",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -4793,7 +4785,6 @@ impl<'de> serde::Deserialize<'de> for Projection {
IsPartitionKey,
IsPrimaryKey,
Inference,
WriteInference,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -4821,7 +4812,6 @@ impl<'de> serde::Deserialize<'de> for Projection {
"isPartitionKey" | "is_partition_key" => Ok(GeneratedField::IsPartitionKey),
"isPrimaryKey" | "is_primary_key" => Ok(GeneratedField::IsPrimaryKey),
"inference" => Ok(GeneratedField::Inference),
"writeInference" | "write_inference" => Ok(GeneratedField::WriteInference),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand All @@ -4847,7 +4837,6 @@ impl<'de> serde::Deserialize<'de> for Projection {
let mut is_partition_key__ = None;
let mut is_primary_key__ = None;
let mut inference__ = None;
let mut write_inference__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Ptr => {
Expand Down Expand Up @@ -4886,12 +4875,6 @@ impl<'de> serde::Deserialize<'de> for Projection {
}
inference__ = map_.next_value()?;
}
GeneratedField::WriteInference => {
if write_inference__.is_some() {
return Err(serde::de::Error::duplicate_field("writeInference"));
}
write_inference__ = map_.next_value()?;
}
}
}
Ok(Projection {
Expand All @@ -4901,7 +4884,6 @@ impl<'de> serde::Deserialize<'de> for Projection {
is_partition_key: is_partition_key__.unwrap_or_default(),
is_primary_key: is_primary_key__.unwrap_or_default(),
inference: inference__,
write_inference: write_inference__,
})
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/proto-flow/tests/regression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ fn ex_projections() -> Vec<flow::Projection> {
item_types: vec!["null".to_string(), "integer".to_string()],
}),
}),
write_inference: None,
}]
}

Expand Down Expand Up @@ -457,6 +456,10 @@ fn ex_capture_response() -> capture::Response {
binding: 2,
doc_json: json!({"captured":"doc"}).to_string(),
}),
sourced_schema: Some(capture::response::SourcedSchema {
binding: 3,
schema_json: json!({"type": "string", "format": "date-time"}).to_string(),
}),
checkpoint: Some(capture::response::Checkpoint {
state: Some(ex_connector_state()),
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ expression: json_test(msg)
"binding": 2,
"doc": {"captured":"doc"}
},
"sourcedSchema": {
"binding": 3,
"documentSchema": {"format":"date-time","type":"string"}
},
"checkpoint": {
"state": {
"updated": {"state":"update"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ expression: proto_test(msg)
|01321608 0212127b 22636170 74757265| .2.....{"capture 000001b0
|64223a22 646f6322 7d3a180a 160a127b| d":"doc"}:.....{ 000001c0
|22737461 7465223a 22757064 61746522| "state":"update" 000001d0
|7d1001a2 06061202 48691801| }.......Hi.. 000001e0
000001ec
|7d100142 2a080312 267b2266 6f726d61| }..B*...&{"forma 000001e0
|74223a22 64617465 2d74696d 65222c22| t":"date-time"," 000001f0
|74797065 223a2273 7472696e 67227da2| type":"string"}. 00000200
|06061202 48691801| ....Hi.. 00000210
00000218
Loading
Loading