diff --git a/crates/dekaf/src/logging.rs b/crates/dekaf/src/logging.rs index f40676fcb45..4330e0698a3 100644 --- a/crates/dekaf/src/logging.rs +++ b/crates/dekaf/src/logging.rs @@ -2,6 +2,7 @@ use crate::log_appender::{self, GazetteWriter, TaskForwarder}; use futures::Future; use lazy_static::lazy_static; use rand::Rng; +use std::sync::Arc; use tracing::{level_filters::LevelFilter, Instrument}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer}; @@ -10,12 +11,12 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer}; // from the point at which you call `forward_logs()` downwards will get forwarded to the same journal. tokio::task_local! { static TASK_FORWARDER: TaskForwarder; - static LOG_LEVEL: std::cell::Cell; + static LOG_LEVEL: Arc> ; } pub fn install() { // Build a tracing_subscriber::Filter which uses our dynamic log level. - let log_filter = tracing_subscriber::filter::DynFilterFn::new(move |metadata, _cx| { + let log_filter = tracing_subscriber::filter::DynFilterFn::new(move |metadata, ctx| { if metadata .fields() .iter() @@ -24,20 +25,14 @@ pub fn install() { return false; } - let cur_level = match metadata.level().as_str() { - "TRACE" => ops::LogLevel::Trace as i32, - "DEBUG" => ops::LogLevel::Debug as i32, - "INFO" => ops::LogLevel::Info as i32, - "WARN" => ops::LogLevel::Warn as i32, - "ERROR" => ops::LogLevel::Error as i32, - _ => ops::LogLevel::UndefinedLevel as i32, - }; - - cur_level - <= LOG_LEVEL - .try_with(|log_level| log_level.get()) - .unwrap_or(ops::LogLevel::Info) - .into() + LOG_LEVEL + .try_with(|filter| { + filter + .read() + .map(move |filter| filter.enabled(&metadata, ctx.to_owned())) + .expect("Logging lock poisoned, this should never happen") + }) + .unwrap_or_else(|_| metadata.level() <= &tracing::metadata::Level::INFO) }); // We want to be able to control Dekaf's own logging output via the RUST_LOG environment variable like usual. @@ -75,6 +70,18 @@ lazy_static! { }; } +fn build_log_filter(level: ops::LogLevel) -> tracing_subscriber::filter::Targets { + let filter = match level { + ops::LogLevel::Error => "error", + ops::LogLevel::Warn => "warn", + ops::LogLevel::Info | ops::LogLevel::UndefinedLevel => "warn,dekaf=info", + ops::LogLevel::Debug => "debug,simple_crypt=warn,aws_config=warn,h2=warn", + ops::LogLevel::Trace => "trace,simple_crypt=warn,aws_config=warn,h2=warn", + }; + + filter.parse().expect("Filters should be correct") +} + /// Capture all log messages emitted by the passed future and all of its descendants, and writes them out /// based on the behavior of the provided writer. Initially, log messages will get buffered in a circular /// queue until such time as the forwarder is informed of the name of the journal to emit them into. Then, @@ -90,7 +97,9 @@ where let forwarder = TaskForwarder::new(PRODUCER.to_owned(), writer); LOG_LEVEL.scope( - ops::LogLevel::Info.into(), + Arc::new(std::sync::RwLock::new(build_log_filter( + ops::LogLevel::Info, + ))), TASK_FORWARDER.scope( forwarder, fut.instrument(tracing::info_span!( @@ -111,7 +120,7 @@ pub fn propagate_task_forwarder(fut: F) -> impl Future where F: Future, { - let current_level = LOG_LEVEL.get(); + let current_level = LOG_LEVEL.with(|l| l.clone()); let current_forwarder = TASK_FORWARDER.get(); LOG_LEVEL.scope( @@ -125,5 +134,8 @@ pub fn get_log_forwarder() -> TaskForwarder { } pub fn set_log_level(level: ops::LogLevel) { - LOG_LEVEL.with(|cell| cell.set(level)) + LOG_LEVEL.with(|cell| { + let mut guard = cell.write().expect("should not happen"); + *guard = build_log_filter(level); + }) }