Skip to content

Commit

Permalink
dekaf: Improve log filtering
Browse files Browse the repository at this point in the history
Since we're now piping `tracing` logs into the task logs which is user facing, we'd like to make the logs we send as meaningful as possible. In order to do that, we need to be able to more narrowly specify which logs should get sent. In order to do that, I've swapped the simple log level filter for a slightly more advanced `tracing_subscriber::filter::Targets` which allows us to specify log filters such as `debug,simple_crypt=warn,aws_config=warn,h2=warn`.
  • Loading branch information
jshearer committed Feb 19, 2025
1 parent b575a43 commit 79e114a
Showing 1 changed file with 31 additions and 19 deletions.
50 changes: 31 additions & 19 deletions crates/dekaf/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::log_appender::{self, GazetteWriter, TaskForwarder};
use futures::Future;
use lazy_static::lazy_static;
use rand::Rng;
use std::sync::Arc;
use tracing::{level_filters::LevelFilter, Instrument};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};

Expand All @@ -10,12 +11,12 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
// from the point at which you call `forward_logs()` downwards will get forwarded to the same journal.
tokio::task_local! {
static TASK_FORWARDER: TaskForwarder<GazetteWriter>;
static LOG_LEVEL: std::cell::Cell<ops::LogLevel>;
static LOG_LEVEL: Arc<std::sync::RwLock<tracing_subscriber::filter::Targets>> ;
}

pub fn install() {
// Build a tracing_subscriber::Filter which uses our dynamic log level.
let log_filter = tracing_subscriber::filter::DynFilterFn::new(move |metadata, _cx| {
let log_filter = tracing_subscriber::filter::DynFilterFn::new(move |metadata, ctx| {
if metadata
.fields()
.iter()
Expand All @@ -24,20 +25,14 @@ pub fn install() {
return false;
}

let cur_level = match metadata.level().as_str() {
"TRACE" => ops::LogLevel::Trace as i32,
"DEBUG" => ops::LogLevel::Debug as i32,
"INFO" => ops::LogLevel::Info as i32,
"WARN" => ops::LogLevel::Warn as i32,
"ERROR" => ops::LogLevel::Error as i32,
_ => ops::LogLevel::UndefinedLevel as i32,
};

cur_level
<= LOG_LEVEL
.try_with(|log_level| log_level.get())
.unwrap_or(ops::LogLevel::Info)
.into()
LOG_LEVEL
.try_with(|filter| {
filter
.read()
.map(move |filter| filter.enabled(&metadata, ctx.to_owned()))
.expect("Logging lock poisoned, this should never happen")
})
.unwrap_or_else(|_| metadata.level() <= &tracing::metadata::Level::INFO)
});

// We want to be able to control Dekaf's own logging output via the RUST_LOG environment variable like usual.
Expand Down Expand Up @@ -75,6 +70,18 @@ lazy_static! {
};
}

fn build_log_filter(level: ops::LogLevel) -> tracing_subscriber::filter::Targets {
let filter = match level {
ops::LogLevel::Error => "error",
ops::LogLevel::Warn => "warn",
ops::LogLevel::Info | ops::LogLevel::UndefinedLevel => "warn,dekaf=info",
ops::LogLevel::Debug => "debug,simple_crypt=warn,aws_config=warn,h2=warn",
ops::LogLevel::Trace => "trace,simple_crypt=warn,aws_config=warn,h2=warn",
};

filter.parse().expect("Filters should be correct")
}

/// Capture all log messages emitted by the passed future and all of its descendants, and writes them out
/// based on the behavior of the provided writer. Initially, log messages will get buffered in a circular
/// queue until such time as the forwarder is informed of the name of the journal to emit them into. Then,
Expand All @@ -90,7 +97,9 @@ where
let forwarder = TaskForwarder::new(PRODUCER.to_owned(), writer);

LOG_LEVEL.scope(
ops::LogLevel::Info.into(),
Arc::new(std::sync::RwLock::new(build_log_filter(
ops::LogLevel::Info,
))),
TASK_FORWARDER.scope(
forwarder,
fut.instrument(tracing::info_span!(
Expand All @@ -111,7 +120,7 @@ pub fn propagate_task_forwarder<F, O>(fut: F) -> impl Future<Output = O>
where
F: Future<Output = O>,
{
let current_level = LOG_LEVEL.get();
let current_level = LOG_LEVEL.with(|l| l.clone());
let current_forwarder = TASK_FORWARDER.get();

LOG_LEVEL.scope(
Expand All @@ -125,5 +134,8 @@ pub fn get_log_forwarder() -> TaskForwarder<GazetteWriter> {
}

pub fn set_log_level(level: ops::LogLevel) {
LOG_LEVEL.with(|cell| cell.set(level))
LOG_LEVEL.with(|cell| {
let mut guard = cell.write().expect("should not happen");
*guard = build_log_filter(level);
})
}

0 comments on commit 79e114a

Please sign in to comment.