Skip to content

Commit

Permalink
dekaf: Fix missing support for _meta/is_deleted for old-style conne…
Browse files Browse the repository at this point in the history
…ctions

This got lost in the shuffle of introducing field selection, so this temporarily adds it back
until we can get everyone using new style tasks and we can get rid of this logic for good.

---

Also fix CDC deletions mode for new style connections

The existing behavior would override `_meta` with an object just containing `{"is_deleted": 0}`.
Instead let's not try to retain backwards compatibility here and just add a new `_is_deleted` field
  • Loading branch information
jshearer committed Feb 24, 2025
1 parent c1e8f1c commit e18dd24
Show file tree
Hide file tree
Showing 10 changed files with 507 additions and 74 deletions.
5 changes: 3 additions & 2 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,12 @@ pub fn constraint_for_projection(
r#type: constraint::Type::FieldOptional.into(),
reason: "The root document may be materialized".to_string(),
}
} else if projection.field == "_meta" && matches!(endpoint_config.deletions, DeletionMode::CDC)
} else if projection.field == "_is_deleted"
&& matches!(endpoint_config.deletions, DeletionMode::CDC)
{
materialize::response::validated::Constraint {
r#type: constraint::Type::FieldForbidden.into(),
reason: "Cannot materialize to '_meta' when using CDC deletions mode".to_string(),
reason: "Cannot materialize input data to '_is_deleted' when using CDC deletions mode as it will be generated by Dekaf".to_string(),
}
} else if projection.field == "flow_published_at"
|| !projection.ptr.strip_prefix("/").unwrap().contains("/")
Expand Down
27 changes: 1 addition & 26 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,31 +85,6 @@ impl Read {
30,
);

let extractors = match auth {
SessionAuthentication::User(_) => {
if collection.extractors.len() != 1 {
anyhow::bail!("Expecting only one extractor!");
}
vec![(
collection.value_schema.clone(),
collection
.extractors
.first()
.expect("Just checked above")
.to_owned(),
)]
}
SessionAuthentication::Task(_) => {
let avro::Schema::Record(root_schema) = &collection.value_schema else {
anyhow::bail!("Invalid schema");
};
let field_schemas = root_schema.fields.iter().cloned().map(|f| f.schema);
field_schemas
.zip(collection.extractors.clone().into_iter())
.collect_vec()
}
};

Ok(Self {
offset,
last_write_head: offset,
Expand All @@ -122,7 +97,7 @@ impl Read {
stream,
uuid_ptr: collection.uuid_ptr.clone(),
value_schema_id,
extractors,
extractors: collection.extractors.clone(),

journal_name: partition.spec.name.clone(),
collection_name: collection.name.to_owned(),
Expand Down
14 changes: 8 additions & 6 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use crate::{
connector, dekaf_shard_template_id, utils, App, SessionAuthentication, TaskAuth, UserAuth,
connector::{self, DeletionMode},
dekaf_shard_template_id,
utils::{self, CustomizableExtractor},
App, SessionAuthentication, TaskAuth, UserAuth,
};
use anyhow::{anyhow, bail, Context};
use avro::shape_to_avro;
use futures::{StreamExt, TryStreamExt};
use gazette::{
broker::{self, journal_spec},
journal, uuid,
};
use itertools::Itertools;
use models::RawValue;
use proto_flow::flow;

Expand Down Expand Up @@ -88,7 +93,7 @@ pub struct Collection {
pub spec: flow::CollectionSpec,
pub uuid_ptr: doc::Pointer,
pub value_schema: avro::Schema,
pub extractors: Vec<utils::CustomizableExtractor>,
pub extractors: Vec<(avro::Schema, utils::CustomizableExtractor)>,
}

/// Partition is a collection journal which is mapped into a stable Kafka partition order.
Expand Down Expand Up @@ -204,10 +209,7 @@ impl Collection {
auth.deletions(),
)?
} else {
(
avro::shape_to_avro(collection_schema_shape.clone()),
vec![doc::Extractor::new(doc::Pointer::empty(), &doc::SerPolicy::noop()).into()],
)
utils::build_LEGACY_field_extractors(collection_schema_shape.clone(), auth.deletions())?
};

let key_schema = avro::key_to_avro(&key_ptr, collection_schema_shape);
Expand Down
131 changes: 99 additions & 32 deletions crates/dekaf/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use crate::connector::DeletionMode;
use avro::{located_shape_to_avro, shape_to_avro};
use doc::shape::location;
use itertools::Itertools;
use lazy_static::lazy_static;
use proto_flow::flow;
use std::{borrow::Cow, iter};

lazy_static! {
static ref META_OP_PTR: doc::Pointer = doc::Pointer::from_str("/_meta/op");
static ref META_IS_DELETED_PTR: doc::Pointer = doc::Pointer::from_str("/_meta/is_deleted");
}

#[derive(Debug, Clone)]
pub enum CustomizableExtractor {
Extractor(doc::Extractor),
RootExtractorWithIsDeleted,
IsDeleted,
}

Expand All @@ -33,7 +36,26 @@ impl CustomizableExtractor {
None => 0,
};

Err(Cow::Owned(serde_json::json!({"is_deleted": deletion})))
Err(Cow::Owned(serde_json::json!(deletion)))
}
CustomizableExtractor::RootExtractorWithIsDeleted => {
let deletion = match META_OP_PTR.query(doc) {
Some(n) => match n.as_node() {
doc::Node::String(s) if s == "d" => 1,
_ => 0,
},
None => 0,
};

let mut full_doc = doc.to_debug_json_value();

if let Some(meta_is_deleted) = META_IS_DELETED_PTR.create_value(&mut full_doc) {
*meta_is_deleted = serde_json::json!(deletion);

Err(Cow::Owned(full_doc))
} else {
Ok(doc)
}
}
}
}
Expand All @@ -50,10 +72,10 @@ pub fn build_field_extractors(
fields: flow::FieldSelection,
projections: Vec<flow::Projection>,
deletions: DeletionMode,
) -> anyhow::Result<(avro::Schema, Vec<CustomizableExtractor>)> {
) -> anyhow::Result<(avro::Schema, Vec<(avro::Schema, CustomizableExtractor)>)> {
let policy = doc::SerPolicy::noop();

let (mut fields, mut extractors) = fields
let mut extractor_schemas = fields
.keys
.into_iter()
.chain(fields.values.into_iter())
Expand Down Expand Up @@ -98,57 +120,102 @@ pub fn build_field_extractors(
);
}
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();
.collect::<Result<Vec<_>, _>>()?;

if matches!(deletions, DeletionMode::CDC) {
let mut shape = doc::Shape::nothing();
shape.type_ = json::schema::types::INTEGER;

// In order to maintain backwards compatibility, when CDC deletions mode is
// enabled we should emit {"_meta": {"is_deleted": 1}} instead of a root-level field
let avro_field = avro::RecordField {
schema: shape_to_avro(shape),
name: "is_deleted".to_string(),
name: "_is_deleted".to_string(),
doc: None,
aliases: None,
default: None,
order: apache_avro::schema::RecordFieldOrder::Ascending,
position: 0,
position: extractor_schemas.len(),
custom_attributes: Default::default(),
};

let meta_field = avro::RecordField {
name: "_meta".to_string(),
schema: avro::Schema::Record(avro::RecordSchema {
name: "root._meta.is_deleted".into(),
aliases: None,
doc: None,
fields: vec![avro_field],
lookup: Default::default(),
attributes: Default::default(),
}),
doc: None,
aliases: None,
default: None,
order: apache_avro::schema::RecordFieldOrder::Ascending,
position: fields.len(),
custom_attributes: Default::default(),
};

fields.push(meta_field);
extractors.push(CustomizableExtractor::IsDeleted);
extractor_schemas.push((avro_field, CustomizableExtractor::IsDeleted));
}

let schema = avro::Schema::Record(avro::RecordSchema {
name: "root".into(),
aliases: None,
doc: None,
fields: fields,
fields: extractor_schemas
.iter()
.map(|(field, _)| field.clone())
.collect_vec(),
lookup: Default::default(),
attributes: Default::default(),
});

Ok((schema, extractors))
Ok((
schema,
extractor_schemas
.into_iter()
.map(|(field, extractor)| (field.schema, extractor))
.collect_vec(),
))
}

pub fn build_LEGACY_field_extractors(
mut schema: doc::Shape,
deletions: DeletionMode,
) -> anyhow::Result<(avro::Schema, Vec<(avro::Schema, CustomizableExtractor)>)> {
if matches!(deletions, DeletionMode::CDC) {
if let Some(meta) = schema
.object
.properties
.iter_mut()
.find(|prop| prop.name.to_string() == "_meta".to_string())
{
if let Err(idx) = meta
.shape
.object
.properties
.binary_search_by(|prop| prop.name.to_string().cmp(&"is_deleted".to_string()))
{
meta.shape.object.properties.insert(
idx,
doc::shape::ObjProperty {
name: "is_deleted".into(),
is_required: true,
shape: doc::Shape {
type_: json::schema::types::INTEGER,
..doc::Shape::nothing()
},
},
);
} else {
tracing::warn!(
"This collection's schema already has a /_meta/is_deleted location!"
);
}
} else {
return Err(anyhow::anyhow!("Schema missing /_meta"));
}

let schema = avro::shape_to_avro(schema.clone());

Ok((
schema.clone(),
vec![(schema, CustomizableExtractor::RootExtractorWithIsDeleted)],
))
} else {
let schema = avro::shape_to_avro(schema.clone());

Ok((
schema.clone(),
vec![(
schema,
CustomizableExtractor::Extractor(doc::Extractor::new(
doc::Pointer::empty(),
&doc::SerPolicy::noop(),
)),
)],
))
}
}
Loading

0 comments on commit e18dd24

Please sign in to comment.