Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dekaf: Fixes for release #1965

Merged
merged 4 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,12 @@ fn constraint_for_projection(
r#type: constraint::Type::FieldOptional.into(),
reason: "The root document may be materialized".to_string(),
}
} else if projection.field == "_meta" && matches!(endpoint_config.deletions, DeletionMode::CDC)
} else if projection.field == "_is_deleted"
&& matches!(endpoint_config.deletions, DeletionMode::CDC)
{
materialize::response::validated::Constraint {
r#type: constraint::Type::FieldForbidden.into(),
reason: "Cannot materialize to '_meta' when using CDC deletions mode".to_string(),
reason: "Cannot materialize input data to '_is_deleted' when using CDC deletions mode as it will be generated by Dekaf".to_string(),
}
} else if projection.field == "flow_published_at"
|| !projection.ptr.strip_prefix("/").unwrap().contains("/")
Expand Down
30 changes: 15 additions & 15 deletions crates/dekaf/src/log_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ impl StatsAggregator {
pub fn take(&mut self) -> Option<BTreeMap<String, ops::stats::Binding>> {
if self.0.iter().any(|(_, v)| {
v.left
.is_some_and(|s| s.bytes_total > 0 || s.docs_total > 0)
.is_some_and(|s| s.bytes_total > 0 && s.docs_total > 0)
|| v.right
.is_some_and(|s| s.bytes_total > 0 || s.docs_total > 0)
|| v.out.is_some_and(|s| s.bytes_total > 0 || s.docs_total > 0)
.is_some_and(|s| s.bytes_total > 0 && s.docs_total > 0)
|| v.out.is_some_and(|s| s.bytes_total > 0 && s.docs_total > 0)
}) {
Some(std::mem::take(&mut self.0))
} else {
Expand Down Expand Up @@ -492,23 +492,22 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
}

pub fn send_stats(&self, collection_name: String, stats: ops::stats::Binding) {
if stats
let is_any_stats_invalid = stats
.left
.is_some_and(|s| s.bytes_total == 0 || s.docs_total == 0)
Copy link
Contributor Author

@jshearer jshearer Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that this was too restrictive and was complaining too much -- it's fine to send_stats() stats where both bytes_total and docs_total are 0 because of the filter above in StatsAggregator::take

Copy link
Member

@jgraettinger jgraettinger Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this filtering required in the first place? How does it come to be that bytes_total != 0 but docs_total == 0, or vice versa? I don't see a satisfying explanation here, of how we observed such documents in production in the first place and how this change ensures they can never happen again. okay, i now see that this pr addressed the root cause

Can you instead make this an assert()-ion rather than a tracing::error?

.is_some_and(|s| (s.bytes_total == 0) != (s.docs_total == 0))
|| stats
.right
.is_some_and(|s| s.bytes_total == 0 || s.docs_total == 0)
.is_some_and(|s| (s.bytes_total == 0) != (s.docs_total == 0))
|| stats
.out
.is_some_and(|s| s.bytes_total == 0 || s.docs_total == 0)
{
tracing::error!(
?stats,
"Invalid stats document emitted! Cannot emit 0 for `bytes_total` or `docs_total`!"
);
} else {
self.send_message(TaskWriterMessage::Stats((collection_name, stats)))
}
.is_some_and(|s| (s.bytes_total == 0) != (s.docs_total == 0));

assert!(!is_any_stats_invalid,
"Invalid stats document emitted! Cannot emit 0 for just one of `bytes_total` or `docs_total`! {:?}",
stats
);

self.send_message(TaskWriterMessage::Stats((collection_name, stats)));
}

fn send_message(&self, msg: TaskWriterMessage) {
Expand Down Expand Up @@ -872,6 +871,7 @@ mod tests {
}

#[tokio::test]
#[should_panic]
async fn test_partial_stats() {
setup(|logs, stats| async move {
{
Expand Down
53 changes: 31 additions & 22 deletions crates/dekaf/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ use crate::log_appender::{self, GazetteWriter, TaskForwarder};
use futures::Future;
use lazy_static::lazy_static;
use rand::Rng;
use tracing::{level_filters::LevelFilter, Instrument};
use tracing::Instrument;
use tracing::{level_filters::LevelFilter, Level};
use tracing_subscriber::filter::Targets;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};

// These are accessible anywhere inside the call stack of a future wrapped with [`forward_logs()`].
// The relationship between LogForwarder and log journal is one-to-one. That means that all logs
// from the point at which you call `forward_logs()` downwards will get forwarded to the same journal.
tokio::task_local! {
static TASK_FORWARDER: TaskForwarder<GazetteWriter>;
static LOG_LEVEL: std::cell::Cell<ops::LogLevel>;
static LOG_LEVEL: std::cell::Cell<&'static tracing_subscriber::filter::Targets>;
}

pub fn install() {
// Build a tracing_subscriber::Filter which uses our dynamic log level.
let log_filter = tracing_subscriber::filter::DynFilterFn::new(move |metadata, _cx| {
let log_filter = tracing_subscriber::filter::DynFilterFn::new(move |metadata, ctx| {
if metadata
.fields()
.iter()
Expand All @@ -24,20 +26,9 @@ pub fn install() {
return false;
}

let cur_level = match metadata.level().as_str() {
"TRACE" => ops::LogLevel::Trace as i32,
"DEBUG" => ops::LogLevel::Debug as i32,
"INFO" => ops::LogLevel::Info as i32,
"WARN" => ops::LogLevel::Warn as i32,
"ERROR" => ops::LogLevel::Error as i32,
_ => ops::LogLevel::UndefinedLevel as i32,
};

cur_level
<= LOG_LEVEL
.try_with(|log_level| log_level.get())
.unwrap_or(ops::LogLevel::Info)
.into()
LOG_LEVEL
.try_with(|filter| filter.get().enabled(&metadata, ctx.to_owned()))
.unwrap_or_else(|_| metadata.level() <= &tracing::metadata::Level::INFO)
});

// We want to be able to control Dekaf's own logging output via the RUST_LOG environment variable like usual.
Expand All @@ -51,6 +42,7 @@ pub fn install() {

let registry = tracing_subscriber::registry()
.with(tracing_record_hierarchical::HierarchicalRecord::default())
.with(fmt_layer)
.with(
ops::tracing::Layer::new(
|log| {
Expand All @@ -59,8 +51,7 @@ pub fn install() {
std::time::SystemTime::now,
)
.with_filter(log_filter),
)
.with(fmt_layer);
);

registry.init();
}
Expand All @@ -73,6 +64,22 @@ lazy_static! {
producer_id[0] |= 0x01;
gazette::uuid::Producer::from_bytes(producer_id)
};
static ref ERROR_FILTER: Targets = "error".parse().unwrap();
static ref WARN_FILTER: Targets = "warn".parse().unwrap();
static ref INFO_FILTER: Targets = "warn,dekaf=info".parse().unwrap();
static ref DEBUG_FILTER: Targets = "debug,simple_crypt=warn,aws_configure=warn,h2=warn".parse().unwrap();
static ref TRACE_FILTER: Targets = "trace,simple_crypt=warn,aws_configure=warn,h2=warn".parse().unwrap();

}

fn build_log_filter(level: ops::LogLevel) -> &'static tracing_subscriber::filter::Targets {
match level {
ops::LogLevel::Error => &ERROR_FILTER,
ops::LogLevel::Warn => &WARN_FILTER,
ops::LogLevel::Info | ops::LogLevel::UndefinedLevel => &INFO_FILTER,
ops::LogLevel::Debug => &DEBUG_FILTER,
ops::LogLevel::Trace => &TRACE_FILTER,
}
}

/// Capture all log messages emitted by the passed future and all of its descendants, and writes them out
Expand All @@ -90,7 +97,7 @@ where
let forwarder = TaskForwarder::new(PRODUCER.to_owned(), writer);

LOG_LEVEL.scope(
ops::LogLevel::Info.into(),
std::cell::Cell::new(build_log_filter(ops::LogLevel::Info)),
TASK_FORWARDER.scope(
forwarder,
fut.instrument(tracing::info_span!(
Expand All @@ -111,7 +118,7 @@ pub fn propagate_task_forwarder<F, O>(fut: F) -> impl Future<Output = O>
where
F: Future<Output = O>,
{
let current_level = LOG_LEVEL.get();
let current_level = LOG_LEVEL.with(|l| l.clone());
let current_forwarder = TASK_FORWARDER.get();

LOG_LEVEL.scope(
Expand All @@ -125,5 +132,7 @@ pub fn get_log_forwarder() -> TaskForwarder<GazetteWriter> {
}

pub fn set_log_level(level: ops::LogLevel) {
LOG_LEVEL.with(|cell| cell.set(level))
LOG_LEVEL.with(|current_level| {
current_level.set(build_log_filter(level));
})
}
27 changes: 1 addition & 26 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,31 +85,6 @@ impl Read {
30,
);

let extractors = match auth {
SessionAuthentication::User(_) => {
if collection.extractors.len() != 1 {
anyhow::bail!("Expecting only one extractor!");
}
vec![(
collection.value_schema.clone(),
collection
.extractors
.first()
.expect("Just checked above")
.to_owned(),
)]
}
SessionAuthentication::Task(_) => {
let avro::Schema::Record(root_schema) = &collection.value_schema else {
anyhow::bail!("Invalid schema");
};
let field_schemas = root_schema.fields.iter().cloned().map(|f| f.schema);
field_schemas
.zip(collection.extractors.clone().into_iter())
.collect_vec()
}
};

Ok(Self {
offset,
last_write_head: offset,
Expand All @@ -122,7 +97,7 @@ impl Read {
stream,
uuid_ptr: collection.uuid_ptr.clone(),
value_schema_id,
extractors,
extractors: collection.extractors.clone(),

journal_name: partition.spec.name.clone(),
collection_name: collection.name.to_owned(),
Expand Down

This file was deleted.

This file was deleted.

14 changes: 8 additions & 6 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use crate::{
connector, dekaf_shard_template_id, utils, App, SessionAuthentication, TaskAuth, UserAuth,
connector::{self, DeletionMode},
dekaf_shard_template_id,
utils::{self, CustomizableExtractor},
App, SessionAuthentication, TaskAuth, UserAuth,
};
use anyhow::{anyhow, bail, Context};
use avro::shape_to_avro;
use futures::{StreamExt, TryStreamExt};
use gazette::{
broker::{self, journal_spec},
journal, uuid,
};
use itertools::Itertools;
use models::RawValue;
use proto_flow::flow;

Expand Down Expand Up @@ -88,7 +93,7 @@ pub struct Collection {
pub spec: flow::CollectionSpec,
pub uuid_ptr: doc::Pointer,
pub value_schema: avro::Schema,
pub extractors: Vec<utils::CustomizableExtractor>,
pub extractors: Vec<(avro::Schema, utils::CustomizableExtractor)>,
}

/// Partition is a collection journal which is mapped into a stable Kafka partition order.
Expand Down Expand Up @@ -204,10 +209,7 @@ impl Collection {
auth.deletions(),
)?
} else {
(
avro::shape_to_avro(collection_schema_shape.clone()),
vec![doc::Extractor::new(doc::Pointer::empty(), &doc::SerPolicy::noop()).into()],
)
utils::build_LEGACY_field_extractors(collection_schema_shape.clone(), auth.deletions())?
};

let key_schema = avro::key_to_avro(&key_ptr, collection_schema_shape);
Expand Down
Loading
Loading