Skip to content

Commit

Permalink
dekaf: Change send_stats sanity check to a crash
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Feb 25, 2025
1 parent 7017855 commit 14bc520
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 51 deletions.
20 changes: 10 additions & 10 deletions crates/dekaf/src/log_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,23 +492,22 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
}

pub fn send_stats(&self, collection_name: String, stats: ops::stats::Binding) {
if stats
let is_any_stats_invalid = stats
.left
.is_some_and(|s| (s.bytes_total == 0) != (s.docs_total == 0))
|| stats
.right
.is_some_and(|s| (s.bytes_total == 0) != (s.docs_total == 0))
|| stats
.out
.is_some_and(|s| (s.bytes_total == 0) != (s.docs_total == 0))
{
tracing::error!(
?stats,
"Invalid stats document emitted! Cannot emit 0 for just one of `bytes_total` or `docs_total`!"
);
} else {
self.send_message(TaskWriterMessage::Stats((collection_name, stats)))
}
.is_some_and(|s| (s.bytes_total == 0) != (s.docs_total == 0));

assert!(!is_any_stats_invalid,
"Invalid stats document emitted! Cannot emit 0 for just one of `bytes_total` or `docs_total`! {:?}",
stats
);

self.send_message(TaskWriterMessage::Stats((collection_name, stats)));
}

fn send_message(&self, msg: TaskWriterMessage) {
Expand Down Expand Up @@ -872,6 +871,7 @@ mod tests {
}

#[tokio::test]
#[should_panic]
async fn test_partial_stats() {
setup(|logs, stats| async move {
{
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 14bc520

Please sign in to comment.