Skip to content

Commit

Permalink
feat: allowing injecting custom join_set tracer to avoid dependency o…
Browse files Browse the repository at this point in the history
…n `tracing` crate
  • Loading branch information
geoffreyclaude committed Mar 4, 2025
1 parent 703aa32 commit 30660d4
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 132 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ Optional features:
- `backtrace`: include backtrace information in error messages
- `pyarrow`: conversions between PyArrow and DataFusion types
- `serde`: enable arrow-schema's `serde` feature
- `tracing`: propagates the current span across thread boundaries

[apache avro]: https://avro.apache.org/
[apache parquet]: https://parquet.apache.org/
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
# note only use main datafusion crate for examples
datafusion = { workspace = true, default-features = true, features = ["avro", "tracing"] }
datafusion = { workspace = true, default-features = true, features = ["avro"] }
datafusion-proto = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
Expand Down
154 changes: 77 additions & 77 deletions datafusion-examples/examples/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,113 +15,113 @@
// specific language governing permissions and limitations
// under the License.

//! This example demonstrates the trace feature in DataFusion’s runtime.
//! When the `tracing` feature is enabled, spawned tasks in DataFusion (such as those
//! created during repartitioning or when reading Parquet files) are instrumented
//! with the current tracing span, allowing to propagate any existing tracing context.
//! This example demonstrates the tracing injection feature for the DataFusion runtime.
//! Tasks spawned on new threads behave differently depending on whether a tracer is injected.
//! The log output clearly distinguishes the two cases.
//!
//! In this example we create a session configured to use multiple partitions,
//! register a Parquet table (based on the `alltypes_tiny_pages_plain.parquet` file),
//! and run a query that should trigger parallel execution on multiple threads.
//! We wrap the entire query execution within a custom span and log messages.
//! By inspecting the tracing output, we should see that the tasks spawned
//! internally inherit the span context.
use arrow::util::pretty::pretty_format_batches;
use datafusion::arrow::record_batch::RecordBatch;
//! # Expected Log Output
//!
//! When **no tracer** is injected, logs from tasks running on `tokio-runtime-worker` threads
//! will _not_ include the `run_instrumented_query` span:
//!
//! ```text
//! 10:29:40.714 INFO main ThreadId(01) tracing: ***** RUNNING WITHOUT INJECTED TRACER *****
//! 10:29:40.714 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution
//! 10:29:40.728 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"
//! 10:29:40.743 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 6 ms
//! 10:29:40.759 DEBUG tokio-runtime-worker ThreadId(03) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
//! 10:29:40.758 DEBUG tokio-runtime-worker ThreadId(04) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
//! 10:29:40.771 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned
//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** WITHOUT tracer: Non-main tasks did NOT inherit the `run_instrumented_query` span *****
//! ```
//!
//! When a tracer **is** injected, tasks spawned on non‑main threads _do_ inherit the span:
//!
//! ```text
//! 10:29:40.772 INFO main ThreadId(01) tracing: Injecting custom tracer...
//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** RUNNING WITH INJECTED TRACER *****
//! 10:29:40.772 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution
//! 10:29:40.775 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"
//! 10:29:40.784 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 7 ms
//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(03) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(04) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
//! 10:29:40.809 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned
//! 10:29:40.809 INFO main ThreadId(01) tracing: ***** WITH tracer: Non-main tasks DID inherit the `run_instrumented_query` span *****
//! ```
use datafusion::common::runtime::set_join_set_tracer;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion::test_util::parquet_test_data;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::any::Any;
use std::sync::Arc;
use tracing::{info, instrument, Level};
use tracing::{info, instrument, Instrument, Level, Span};

#[tokio::main]
async fn main() -> Result<()> {
// Initialize a tracing subscriber that prints to stdout.
// Initialize tracing subscriber with thread info.
tracing_subscriber::fmt()
.with_thread_ids(true)
.with_thread_names(true)
.with_max_level(Level::DEBUG)
.init();

log::info!("Starting example, this log is not captured by tracing");
// Run query WITHOUT tracer injection.
info!("***** RUNNING WITHOUT INJECTED TRACER *****");
run_instrumented_query().await?;
info!("***** WITHOUT tracer: `tokio-runtime-worker` tasks did NOT inherit the `run_instrumented_query` span *****");

// execute the query within a tracing span
let result = run_instrumented_query().await;
// Inject custom tracer so tasks run in the current span.
info!("Injecting custom tracer...");
set_join_set_tracer(instrument_future, instrument_block)
.expect("Failed to set tracer");

// Run query WITH tracer injection.
info!("***** RUNNING WITH INJECTED TRACER *****");
run_instrumented_query().await?;
info!("***** WITH tracer: `tokio-runtime-worker` tasks DID inherit the `run_instrumented_query` span *****");

Ok(())
}

info!(
"Finished example. Check the logs above for tracing span details showing \
that tasks were spawned within the 'run_instrumented_query' span on different threads."
);
/// Instruments a boxed future to run in the current span.
fn instrument_future(
fut: BoxFuture<'static, Box<dyn Any + Send>>,
) -> BoxFuture<'static, Box<dyn Any + Send>> {
fut.in_current_span().boxed()
}

result
/// Instruments a boxed blocking closure to execute within the current span.
fn instrument_block(
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
let span = Span::current();
Box::new(move || span.in_scope(|| f()))
}

#[instrument(level = "info")]
async fn run_instrumented_query() -> Result<()> {
info!("Starting query execution within the custom tracing span");
info!("Starting query execution");

// The default session will set the number of partitions to `std::thread::available_parallelism()`.
let ctx = SessionContext::new();

// Get the path to the test parquet data.
let test_data = parquet_test_data();
// Build listing options that pick up only the "alltypes_tiny_pages_plain.parquet" file.
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension("alltypes_tiny_pages_plain.parquet");

info!("Registering Parquet table 'alltypes' from {test_data} in {listing_options:?}");

// Register a listing table using an absolute URL.
let table_path = format!("file://{test_data}/");
ctx.register_listing_table(
"alltypes",
&table_path,
listing_options.clone(),
None,
None,
)
.await
.expect("register_listing_table failed");

info!("Registered Parquet table 'alltypes' from {table_path}");

// Run a query that will trigger parallel execution on multiple threads.
let sql = "SELECT COUNT(*), bool_col, date_string_col, string_col
FROM (
SELECT bool_col, date_string_col, string_col FROM alltypes
UNION ALL
SELECT bool_col, date_string_col, string_col FROM alltypes
) AS t
GROUP BY bool_col, date_string_col, string_col
ORDER BY 1,2,3,4 DESC
LIMIT 5;";
info!(%sql, "Executing SQL query");
let df = ctx.sql(sql).await?;

let results: Vec<RecordBatch> = df.collect().await?;
info!("Query execution complete");

// Print out the results and tracing output.
datafusion::common::assert_batches_eq!(
[
"+----------+----------+-----------------+------------+",
"| count(*) | bool_col | date_string_col | string_col |",
"+----------+----------+-----------------+------------+",
"| 2 | false | 01/01/09 | 9 |",
"| 2 | false | 01/01/09 | 7 |",
"| 2 | false | 01/01/09 | 5 |",
"| 2 | false | 01/01/09 | 3 |",
"| 2 | false | 01/01/09 | 1 |",
"+----------+----------+-----------------+------------+",
],
&results
);

info!("Query results:\n{}", pretty_format_batches(&results)?);

info!("Registering table 'alltypes' from {}", table_path);
ctx.register_listing_table("alltypes", &table_path, listing_options, None, None)
.await
.expect("Failed to register table");

let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col";
info!(sql, "Executing SQL query");
let result = ctx.sql(sql).await?.collect().await?;
info!("Query complete: {} batches returned", result.len());
Ok(())
}
5 changes: 0 additions & 5 deletions datafusion/common-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,12 @@ all-features = true
[lints]
workspace = true

[features]
tracing = ["dep:tracing", "dep:tracing-futures"]

[lib]
name = "datafusion_common_runtime"

[dependencies]
log = { workspace = true }
tokio = { workspace = true }
tracing = { version = "0.1", optional = true }
tracing-futures = { version = "0.2", optional = true }

[dev-dependencies]
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] }
Loading

0 comments on commit 30660d4

Please sign in to comment.