diff --git a/Cargo.lock b/Cargo.lock index 4d5d481d1d2fe..270265690b5b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1904,8 +1904,6 @@ version = "45.0.0" dependencies = [ "log", "tokio", - "tracing", - "tracing-futures", ] [[package]] @@ -6337,16 +6335,6 @@ dependencies = [ "valuable", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "tracing-log" version = "0.2.0" diff --git a/README.md b/README.md index c41de5175a5c1..158033d40599f 100644 --- a/README.md +++ b/README.md @@ -129,7 +129,6 @@ Optional features: - `backtrace`: include backtrace information in error messages - `pyarrow`: conversions between PyArrow and DataFusion types - `serde`: enable arrow-schema's `serde` feature -- `tracing`: propagates the current span across thread boundaries [apache avro]: https://avro.apache.org/ [apache parquet]: https://parquet.apache.org/ diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 56113ad0791a5..73cc3c1a61b94 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -61,7 +61,7 @@ async-trait = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } # note only use main datafusion crate for examples -datafusion = { workspace = true, default-features = true, features = ["avro", "tracing"] } +datafusion = { workspace = true, default-features = true, features = ["avro"] } datafusion-proto = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } diff --git a/datafusion-examples/examples/tracing.rs b/datafusion-examples/examples/tracing.rs index 855bbe2f15a11..614dac47a10ca 100644 --- a/datafusion-examples/examples/tracing.rs +++ b/datafusion-examples/examples/tracing.rs @@ -15,113 +15,113 @@ // specific language governing permissions and limitations // under the License. -//! This example demonstrates the trace feature in DataFusion’s runtime. -//! When the `tracing` feature is enabled, spawned tasks in DataFusion (such as those -//! created during repartitioning or when reading Parquet files) are instrumented -//! with the current tracing span, allowing to propagate any existing tracing context. +//! This example demonstrates the tracing injection feature for the DataFusion runtime. +//! Tasks spawned on new threads behave differently depending on whether a tracer is injected. +//! The log output clearly distinguishes the two cases. //! -//! In this example we create a session configured to use multiple partitions, -//! register a Parquet table (based on the `alltypes_tiny_pages_plain.parquet` file), -//! and run a query that should trigger parallel execution on multiple threads. -//! We wrap the entire query execution within a custom span and log messages. -//! By inspecting the tracing output, we should see that the tasks spawned -//! internally inherit the span context. - -use arrow::util::pretty::pretty_format_batches; -use datafusion::arrow::record_batch::RecordBatch; +//! # Expected Log Output +//! +//! When **no tracer** is injected, logs from tasks running on `tokio-runtime-worker` threads +//! will _not_ include the `run_instrumented_query` span: +//! +//! ```text +//! 10:29:40.714 INFO main ThreadId(01) tracing: ***** RUNNING WITHOUT INJECTED TRACER ***** +//! 10:29:40.714 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution +//! 10:29:40.728 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col" +//! 10:29:40.743 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 6 ms +//! 10:29:40.759 DEBUG tokio-runtime-worker ThreadId(03) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.758 DEBUG tokio-runtime-worker ThreadId(04) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.771 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned +//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** WITHOUT tracer: Non-main tasks did NOT inherit the `run_instrumented_query` span ***** +//! ``` +//! +//! When a tracer **is** injected, tasks spawned on non‑main threads _do_ inherit the span: +//! +//! ```text +//! 10:29:40.772 INFO main ThreadId(01) tracing: Injecting custom tracer... +//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** RUNNING WITH INJECTED TRACER ***** +//! 10:29:40.772 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution +//! 10:29:40.775 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col" +//! 10:29:40.784 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 7 ms +//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(03) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(04) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.809 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned +//! 10:29:40.809 INFO main ThreadId(01) tracing: ***** WITH tracer: Non-main tasks DID inherit the `run_instrumented_query` span ***** +//! ``` + +use datafusion::common::runtime::set_join_set_tracer; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::error::Result; use datafusion::prelude::*; use datafusion::test_util::parquet_test_data; +use futures::future::BoxFuture; +use futures::FutureExt; +use std::any::Any; use std::sync::Arc; -use tracing::{info, instrument, Level}; +use tracing::{info, instrument, Instrument, Level, Span}; #[tokio::main] async fn main() -> Result<()> { - // Initialize a tracing subscriber that prints to stdout. + // Initialize tracing subscriber with thread info. tracing_subscriber::fmt() .with_thread_ids(true) .with_thread_names(true) .with_max_level(Level::DEBUG) .init(); - log::info!("Starting example, this log is not captured by tracing"); + // Run query WITHOUT tracer injection. + info!("***** RUNNING WITHOUT INJECTED TRACER *****"); + run_instrumented_query().await?; + info!("***** WITHOUT tracer: `tokio-runtime-worker` tasks did NOT inherit the `run_instrumented_query` span *****"); - // execute the query within a tracing span - let result = run_instrumented_query().await; + // Inject custom tracer so tasks run in the current span. + info!("Injecting custom tracer..."); + set_join_set_tracer(instrument_future, instrument_block) + .expect("Failed to set tracer"); + + // Run query WITH tracer injection. + info!("***** RUNNING WITH INJECTED TRACER *****"); + run_instrumented_query().await?; + info!("***** WITH tracer: `tokio-runtime-worker` tasks DID inherit the `run_instrumented_query` span *****"); + + Ok(()) +} - info!( - "Finished example. Check the logs above for tracing span details showing \ -that tasks were spawned within the 'run_instrumented_query' span on different threads." - ); +/// Instruments a boxed future to run in the current span. +fn instrument_future( + fut: BoxFuture<'static, Box>, +) -> BoxFuture<'static, Box> { + fut.in_current_span().boxed() +} - result +/// Instruments a boxed blocking closure to execute within the current span. +fn instrument_block( + f: Box Box + Send>, +) -> Box Box + Send> { + let span = Span::current(); + Box::new(move || span.in_scope(|| f())) } #[instrument(level = "info")] async fn run_instrumented_query() -> Result<()> { - info!("Starting query execution within the custom tracing span"); + info!("Starting query execution"); - // The default session will set the number of partitions to `std::thread::available_parallelism()`. let ctx = SessionContext::new(); - - // Get the path to the test parquet data. let test_data = parquet_test_data(); - // Build listing options that pick up only the "alltypes_tiny_pages_plain.parquet" file. let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension("alltypes_tiny_pages_plain.parquet"); - info!("Registering Parquet table 'alltypes' from {test_data} in {listing_options:?}"); - - // Register a listing table using an absolute URL. let table_path = format!("file://{test_data}/"); - ctx.register_listing_table( - "alltypes", - &table_path, - listing_options.clone(), - None, - None, - ) - .await - .expect("register_listing_table failed"); - - info!("Registered Parquet table 'alltypes' from {table_path}"); - - // Run a query that will trigger parallel execution on multiple threads. - let sql = "SELECT COUNT(*), bool_col, date_string_col, string_col - FROM ( - SELECT bool_col, date_string_col, string_col FROM alltypes - UNION ALL - SELECT bool_col, date_string_col, string_col FROM alltypes - ) AS t - GROUP BY bool_col, date_string_col, string_col - ORDER BY 1,2,3,4 DESC - LIMIT 5;"; - info!(%sql, "Executing SQL query"); - let df = ctx.sql(sql).await?; - - let results: Vec = df.collect().await?; - info!("Query execution complete"); - - // Print out the results and tracing output. - datafusion::common::assert_batches_eq!( - [ - "+----------+----------+-----------------+------------+", - "| count(*) | bool_col | date_string_col | string_col |", - "+----------+----------+-----------------+------------+", - "| 2 | false | 01/01/09 | 9 |", - "| 2 | false | 01/01/09 | 7 |", - "| 2 | false | 01/01/09 | 5 |", - "| 2 | false | 01/01/09 | 3 |", - "| 2 | false | 01/01/09 | 1 |", - "+----------+----------+-----------------+------------+", - ], - &results - ); - - info!("Query results:\n{}", pretty_format_batches(&results)?); - + info!("Registering table 'alltypes' from {}", table_path); + ctx.register_listing_table("alltypes", &table_path, listing_options, None, None) + .await + .expect("Failed to register table"); + + let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"; + info!(sql, "Executing SQL query"); + let result = ctx.sql(sql).await?.collect().await?; + info!("Query complete: {} batches returned", result.len()); Ok(()) } diff --git a/datafusion/common-runtime/Cargo.toml b/datafusion/common-runtime/Cargo.toml index faf6b280da419..6fd9b7ac8fe8d 100644 --- a/datafusion/common-runtime/Cargo.toml +++ b/datafusion/common-runtime/Cargo.toml @@ -34,17 +34,12 @@ all-features = true [lints] workspace = true -[features] -tracing = ["dep:tracing", "dep:tracing-futures"] - [lib] name = "datafusion_common_runtime" [dependencies] log = { workspace = true } tokio = { workspace = true } -tracing = { version = "0.1", optional = true } -tracing-futures = { version = "0.2", optional = true } [dev-dependencies] tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] } diff --git a/datafusion/common-runtime/src/join_set.rs b/datafusion/common-runtime/src/join_set.rs index 1852302d5a639..359f194bb9b2f 100644 --- a/datafusion/common-runtime/src/join_set.rs +++ b/datafusion/common-runtime/src/join_set.rs @@ -15,40 +15,145 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::future::Future; use std::task::{Context, Poll}; use tokio::runtime::Handle; -use tokio::task::JoinSet as TokioJoinSet; -use tokio::task::{AbortHandle, Id, JoinError, LocalSet}; +use tokio::task::LocalSet; +use tokio::task::{AbortHandle, Id, JoinError, JoinSet as TokioJoinSet}; -#[cfg(feature = "tracing")] -mod trace_utils { - use std::future::Future; - use tracing_futures::Instrument; +pub mod trace_utils { + use std::pin::Pin; + // Import required items from the outer scope. + use super::*; + use tokio::sync::OnceCell; - /// Instruments the provided future with the current tracing span. - pub fn trace_future(future: F) -> impl Future + Send + // Redefine the futures::future::BoxFuture type alias + // to avoid depending on the `futures` crate for a single alias + pub type BoxFuture = Pin + Send + 'static>>; + + /// Function pointer type for tracing a future. + /// + /// This function takes a boxed future (with its output type erased) + /// and returns a boxed future (with its output still erased). The + /// tracer must apply instrumentation without altering the output. + pub type TraceFutureFn = + fn(BoxFuture>) -> BoxFuture>; + + /// Function pointer type for tracing a blocking closure. + /// + /// This function takes a boxed closure (with its return type erased) + /// and returns a boxed closure (with its return type still erased). The + /// tracer must apply instrumentation without changing the return value. + pub type TraceBlockFn = fn( + Box Box + Send>, + ) -> Box Box + Send>; + + /// Container for the injected tracer functions. + /// + /// Consumers of the library can inject custom tracer functions via + /// `set_tracer`. If none is set, no instrumentation is applied. + #[derive(Clone)] + pub struct ErasedTracer { + pub trace_future: TraceFutureFn, + pub trace_block: TraceBlockFn, + } + + // Global storage for the tracer functions, to be set once at startup. + static TRACER: OnceCell = OnceCell::const_new(); + + /// Set the custom tracer functions for both futures and blocking closures. + /// + /// This should be called once at startup. If not set, no instrumentation is performed. + pub fn set_join_set_tracer(tf: TraceFutureFn, tb: TraceBlockFn) -> Result<(), ()> { + TRACER + .set(ErasedTracer { + trace_future: tf, + trace_block: tb, + }) + .map_err(|_| ()) + } + + /// Optionally instruments a future with custom tracing. + /// + /// If a tracer has been injected via `set_tracer`, the future's output is + /// boxed (erasing its type), passed to the tracer, and then downcast back + /// to the expected type. If no tracer is set, the original future is returned. + /// + /// # Type Parameters + /// * `T` - The concrete output type of the future. + /// * `F` - The future type. + /// + /// # Parameters + /// * `future` - The future to potentially instrument. + pub fn trace_future(future: F) -> BoxFuture where F: Future + Send + 'static, - T: Send, + T: Send + 'static, { - future.in_current_span() + match TRACER.get() { + // No tracer injected: simply box and return the original future. + None => Box::pin(future), + Some(tracer) => { + // Box the future's output to erase its type. + let fut_boxed: BoxFuture> = Box::pin(async move { + let t = future.await; + Box::new(t) as Box + }); + // Apply the tracer function and downcast the output back. + Box::pin(async move { + let boxed_any = (tracer.trace_future)(fut_boxed).await; + *boxed_any.downcast::().expect( + "Type mismatch in tracer: tracer must preserve the output type", + ) + }) + } + } } - /// Wraps the provided blocking function to execute within the current tracing span. - pub fn trace_block(f: F) -> impl FnOnce() -> T + Send + 'static + /// Optionally instruments a blocking closure with custom tracing. + /// + /// If a tracer has been injected via `set_tracer`, the closure is wrapped so that + /// its return value is boxed (erasing its type), passed to the tracer, and then the + /// result is downcast back to the original type. If no tracer is set, the closure is + /// returned unmodified (except for being boxed). + /// + /// # Type Parameters + /// * `T` - The concrete return type of the closure. + /// * `F` - The closure type. + /// + /// # Parameters + /// * `f` - The blocking closure to potentially instrument. + pub fn trace_block(f: F) -> Box T + Send> where F: FnOnce() -> T + Send + 'static, - T: Send, + T: Send + 'static, { - let span = tracing::Span::current(); - move || span.in_scope(f) + match TRACER.get() { + // No tracer injected: simply box and return the original closure. + None => Box::new(f), + Some(tracer) => { + // Wrap the closure so that its return value is boxed. + let f_wrapped = || { + let t = f(); + Box::new(t) as Box + }; + // Apply the tracer function and downcast the result back. + Box::new(move || { + let boxed_any = (tracer.trace_block)(Box::new(f_wrapped))(); + *boxed_any.downcast::().expect( + "Type mismatch in tracer: tracer must preserve the output type", + ) + }) + } + } } } -/// A wrapper around Tokio's [`JoinSet`](tokio::task::JoinSet) that transparently forwards all public API calls -/// while optionally instrumenting spawned tasks and blocking closures with the current tracing span -/// when the `tracing` feature is enabled. +/// A wrapper around Tokio's JoinSet that forwards all API calls while optionally +/// instrumenting spawned tasks and blocking closures with custom tracing behavior. +/// If no tracer is injected via `trace_utils::set_tracer`, tasks and closures are executed +/// without any instrumentation. #[derive(Debug)] pub struct JoinSet { inner: TokioJoinSet, @@ -78,18 +183,16 @@ impl JoinSet { self.inner.is_empty() } } + impl JoinSet { /// [JoinSet::spawn](tokio::task::JoinSet::spawn) - Spawn a new task. pub fn spawn(&mut self, task: F) -> AbortHandle where - F: Future, + F: Future + Send, F: Send + 'static, T: Send, { - #[cfg(feature = "tracing")] - let task = trace_utils::trace_future(task); - - self.inner.spawn(task) + self.inner.spawn(trace_utils::trace_future(task)) } /// [JoinSet::spawn_on](tokio::task::JoinSet::spawn_on) - Spawn a task on a provided runtime. @@ -99,10 +202,7 @@ impl JoinSet { F: Send + 'static, T: Send, { - #[cfg(feature = "tracing")] - let task = trace_utils::trace_future(task); - - self.inner.spawn_on(task, handle) + self.inner.spawn_on(trace_utils::trace_future(task), handle) } /// [JoinSet::spawn_local](tokio::task::JoinSet::spawn_local) - Spawn a local task. @@ -130,10 +230,7 @@ impl JoinSet { F: Send + 'static, T: Send, { - #[cfg(feature = "tracing")] - let f = trace_utils::trace_block(f); - - self.inner.spawn_blocking(f) + self.inner.spawn_blocking(trace_utils::trace_block(f)) } /// [JoinSet::spawn_blocking_on](tokio::task::JoinSet::spawn_blocking_on) - Spawn a blocking task on a provided runtime. @@ -143,10 +240,8 @@ impl JoinSet { F: Send + 'static, T: Send, { - #[cfg(feature = "tracing")] - let f = trace_utils::trace_block(f); - - self.inner.spawn_blocking_on(f, handle) + self.inner + .spawn_blocking_on(trace_utils::trace_block(f), handle) } /// [JoinSet::join_next](tokio::task::JoinSet::join_next) - Await the next completed task. diff --git a/datafusion/common-runtime/src/lib.rs b/datafusion/common-runtime/src/lib.rs index 499447c77c2a6..47c4075aa56f9 100644 --- a/datafusion/common-runtime/src/lib.rs +++ b/datafusion/common-runtime/src/lib.rs @@ -27,4 +27,5 @@ pub mod common; mod join_set; pub use common::SpawnedTask; +pub use join_set::trace_utils::set_join_set_tracer; pub use join_set::JoinSet; diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index a982fa8f8e656..438e2600a66d7 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -75,7 +75,6 @@ recursive_protection = [ ] serde = ["dep:serde"] string_expressions = ["datafusion-functions/string_expressions"] -tracing = ["datafusion-common-runtime/tracing"] unicode_expressions = [ "datafusion-sql/unicode_expressions", "datafusion-functions/unicode_expressions",