diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index b953e577c3..e12283aa01 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -60,6 +60,7 @@ impl Driver { .post(url) .json(request) .timeout(timeout) + .header("X-REQUEST-ID", request.auction_id.to_string()) .send() .await .context("send")?; @@ -94,7 +95,7 @@ impl Driver { if let Some(timeout) = timeout { request = request.timeout(timeout); } - if let Some(request_id) = observe::request_id::get_task_local_storage() { + if let Some(request_id) = observe::request_id::from_current_span() { request = request.header("X-REQUEST-ID", request_id); } diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index ba2d6e9dd7..867c3a5b6d 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -107,12 +107,10 @@ impl RunLoop { .await; if let Some(auction) = auction { let auction_id = auction.id; - let auction_task = self_arc + self_arc .single_run(auction) - .instrument(tracing::info_span!("auction", auction_id)); - - ::observe::request_id::set_task_local_storage(auction_id.to_string(), auction_task) - .await; + .instrument(tracing::info_span!("auction", auction_id)) + .await }; } } diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index e8c8d964ed..b516f2126a 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -355,6 +355,7 @@ impl Competition { solution_id, submission_deadline, response_sender, + tracing_span: tracing::Span::current(), }; self.settle_queue.try_send(request).map_err(|err| { @@ -387,8 +388,8 @@ impl Competition { solution_id, submission_deadline, response_sender, + tracing_span, } = request; - let solver = self.solver.name().as_str(); async { if self.eth.current_block().borrow().number >= submission_deadline { if let Err(err) = response_sender.send(Err(DeadlineExceeded.into())) { @@ -410,7 +411,7 @@ impl Competition { tracing::error!(?err, "Failed to send /settle response"); } } - .instrument(tracing::info_span!("/settle", solver, %auction_id)) + .instrument(tracing_span) .await } } @@ -533,6 +534,7 @@ struct SettleRequest { solution_id: u64, submission_deadline: BlockNo, response_sender: oneshot::Sender>, + tracing_span: tracing::Span, } /// Solution information sent to the protocol by the driver before the solution diff --git a/crates/driver/src/infra/api/mod.rs b/crates/driver/src/infra/api/mod.rs index ccbdc5a178..6a632a4f59 100644 --- a/crates/driver/src/infra/api/mod.rs +++ b/crates/driver/src/infra/api/mod.rs @@ -111,7 +111,7 @@ impl Api { .layer(axum::extract::DefaultBodyLimit::disable()); } - let make_svc = observe::make_service_with_task_local_storage!(app); + let make_svc = observe::make_service_with_request_tracing!(app); // Start the server. let server = axum::Server::bind(&self.addr).serve(make_svc); diff --git a/crates/driver/src/infra/api/routes/settle/mod.rs b/crates/driver/src/infra/api/routes/settle/mod.rs index 1334693a6e..022d0e521f 100644 --- a/crates/driver/src/infra/api/routes/settle/mod.rs +++ b/crates/driver/src/infra/api/routes/settle/mod.rs @@ -2,7 +2,7 @@ mod dto; use { crate::{ - domain::{competition, competition::auction}, + domain::competition::auction, infra::{ api::{self, Error, State}, observe, @@ -23,7 +23,7 @@ async fn route( auction::Id::try_from(req.auction_id).map_err(api::routes::AuctionError::from)?; let solver = state.solver().name().to_string(); - let handle_request = async move { + async move { observe::settling(); let result = state .competition() @@ -36,16 +36,6 @@ async fn route( observe::settled(state.solver().name(), &result); result.map(|_| ()).map_err(Into::into) } - .instrument(tracing::info_span!("/settle", solver, %auction_id)); - - // Handle `/settle` call in a background task to ensure that we correctly - // submit the settlement (or cancellation) on-chain even if the server - // aborts the endpoint handler code. - // This can happen due do connection issues or when the autopilot aborts - // the `/settle` call when we reach the submission deadline. - Ok( - ::observe::request_id::spawn_task_with_current_request_id(handle_request) - .await - .unwrap_or_else(|_| Err(competition::Error::SubmissionError))?, - ) + .instrument(tracing::info_span!("/settle", solver, %auction_id)) + .await } diff --git a/crates/driver/src/infra/solver/mod.rs b/crates/driver/src/infra/solver/mod.rs index 1b361b1c0b..efdd9dfb2d 100644 --- a/crates/driver/src/infra/solver/mod.rs +++ b/crates/driver/src/infra/solver/mod.rs @@ -242,7 +242,7 @@ impl Solver { .post(url.clone()) .body(body) .timeout(auction.deadline().solvers().remaining().unwrap_or_default()); - if let Some(id) = observe::request_id::get_task_local_storage() { + if let Some(id) = observe::request_id::from_current_span() { req = req.header("X-REQUEST-ID", id); } let res = util::http::send(self.config.response_size_limit_max_bytes, req).await; @@ -268,7 +268,7 @@ impl Solver { let url = shared::url::join(&self.config.endpoint, "notify"); super::observe::solver_request(&url, &body); let mut req = self.client.post(url).body(body); - if let Some(id) = observe::request_id::get_task_local_storage() { + if let Some(id) = observe::request_id::from_current_span() { req = req.header("X-REQUEST-ID", id); } let response_size = self.config.response_size_limit_max_bytes; diff --git a/crates/e2e/src/setup/solver/mock.rs b/crates/e2e/src/setup/solver/mock.rs index 50c9413796..9ebf9834a6 100644 --- a/crates/e2e/src/setup/solver/mock.rs +++ b/crates/e2e/src/setup/solver/mock.rs @@ -52,7 +52,7 @@ impl Default for Mock { .route("/solve", axum::routing::post(solve)) .with_state(state.clone()); - let make_svc = observe::make_service_with_task_local_storage!(app); + let make_svc = observe::make_service_with_request_tracing!(app); let server = axum::Server::bind(&"0.0.0.0:0".parse().unwrap()).serve(make_svc); let mock = Mock { diff --git a/crates/ethrpc/src/buffered.rs b/crates/ethrpc/src/buffered.rs index 9ea3a4d267..8be7d9f98a 100644 --- a/crates/ethrpc/src/buffered.rs +++ b/crates/ethrpc/src/buffered.rs @@ -107,11 +107,8 @@ where (requests.remove(0), trace_ids.remove(0), senders.remove(0)); let result = match (&request, trace_id) { (Call::MethodCall(_), Some(trace_id)) => { - observe::request_id::set_task_local_storage( - trace_id, - inner.send(id, request), - ) - .await + let span = observe::request_id::info_span(trace_id); + inner.send(id, request).instrument(span).await } _ => inner.send(id, request).await, }; @@ -120,11 +117,8 @@ where n => { let results = match build_rpc_metadata(&requests, &trace_ids) { Ok(metadata) => { - observe::request_id::set_task_local_storage( - metadata, - inner.send_batch(requests), - ) - .await + let span = observe::request_id::info_span(metadata); + inner.send_batch(requests).instrument(span).await } Err(err) => { tracing::error!( @@ -148,7 +142,7 @@ where /// Queue a call by sending it over calls channel to the background worker. fn queue_call(&self, id: RequestId, request: Call) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); - let trace_id = observe::request_id::get_task_local_storage(); + let trace_id = observe::request_id::from_current_span(); let context = (id, request, trace_id, sender); self.calls .unbounded_send(context) diff --git a/crates/ethrpc/src/http.rs b/crates/ethrpc/src/http.rs index d4ec6730be..5148fffb51 100644 --- a/crates/ethrpc/src/http.rs +++ b/crates/ethrpc/src/http.rs @@ -79,13 +79,13 @@ async fn execute_rpc( .body(body); match request { Request::Single(Call::MethodCall(call)) => { - if let Some(metadata) = observe::request_id::get_task_local_storage() { + if let Some(metadata) = observe::request_id::from_current_span() { request_builder = request_builder.header("X-REQUEST-ID", metadata); } request_builder = request_builder.header("X-RPC-METHOD", call.method.clone()); } Request::Batch(_) => { - if let Some(metadata) = observe::request_id::get_task_local_storage() { + if let Some(metadata) = observe::request_id::from_current_span() { request_builder = request_builder.header("X-RPC-BATCH-METADATA", metadata); } } diff --git a/crates/observe/src/request_id.rs b/crates/observe/src/request_id.rs index 69a688f887..21c0c132c8 100644 --- a/crates/observe/src/request_id.rs +++ b/crates/observe/src/request_id.rs @@ -16,42 +16,24 @@ //! And when we issue requests to another process we can simply fetch the //! current identifier specific to our task and send that along with the //! request. -use {std::future::Future, tokio::task::JoinHandle}; +use { + std::fmt, + tracing::{ + field::{Field, Visit}, + span::Attributes, + Id, + Span, + Subscriber, + }, + tracing_subscriber::{layer::Context, registry::LookupSpan, Layer, Registry}, +}; -tokio::task_local! { - pub static REQUEST_ID: String; -} - -/// Tries to read the `request_id` from this task's storage. -/// Returns `None` if task local storage was not initialized or is empty. -pub fn get_task_local_storage() -> Option { - let mut id = None; - let _ = REQUEST_ID.try_with(|cell| { - id = Some(cell.clone()); - }); - id -} +/// Name of the span that stores the id used to associated logs +/// across processes. +pub const SPAN_NAME: &str = "request"; -/// Sets the tasks's local id to the passed in value for the given scope. -pub async fn set_task_local_storage(id: String, scope: F) -> R -where - F: Future, -{ - REQUEST_ID.scope(id, scope).await -} - -/// Spawns a new task and ensures it uses the same request id as the current -/// task (if present). This allows for tracing requests across task boundaries. -pub fn spawn_task_with_current_request_id(future: F) -> JoinHandle -where - F: Future + Send + 'static, - F::Output: Send + 'static, -{ - if let Some(id) = get_task_local_storage() { - tokio::task::spawn(set_task_local_storage(id, future)) - } else { - tokio::task::spawn(future) - } +pub fn info_span(request_id: String) -> Span { + tracing::info_span!(SPAN_NAME, id = request_id) } /// Takes a `tower::Service` and embeds it in a `make_service` function that @@ -61,7 +43,7 @@ where /// Either that gets taken from the requests `X-REQUEST-ID` header of if that's /// missing a globally unique request number will be generated. #[macro_export] -macro_rules! make_service_with_task_local_storage { +macro_rules! make_service_with_request_tracing { ($service:expr) => {{ { let internal_request_id = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); @@ -81,12 +63,9 @@ macro_rules! make_service_with_task_local_storage { .fetch_add(1, std::sync::atomic::Ordering::SeqCst) ) }; - let span = tracing::info_span!("request", id); - let handle_request = observe::request_id::set_task_local_storage( - id, - hyper::service::Service::call(&mut warp_svc, req), - ); - tracing::Instrument::instrument(handle_request, span) + let span = tracing::info_span!(observe::request_id::SPAN_NAME, id); + let task = hyper::service::Service::call(&mut warp_svc, req); + tracing::Instrument::instrument(task, span) }); Ok::<_, std::convert::Infallible>(svc) } @@ -95,43 +74,160 @@ macro_rules! make_service_with_task_local_storage { }}; } +/// Looks up the request id from the current tracing span. +pub fn from_current_span() -> Option { + let mut result = None; + + Span::current().with_subscriber(|(id, sub)| { + let Some(registry) = sub.downcast_ref::() else { + tracing::error!( + "looking up request_ids using the `RequestIdLayer` requires the global tracing \ + subscriber to be `tracing_subscriber::Registry`" + ); + return; + }; + let mut current_span = registry.span(id); + while let Some(span) = current_span { + if let Some(request_id) = span.extensions().get::() { + result = Some(request_id.0.clone()); + return; + } + current_span = span.parent(); + } + }); + + result +} + +/// Request id recovered from a tracing span. +struct RequestId(String); + +/// Tracing layer that allows us to recover the request id +/// from the current tracing span. +pub struct RequestIdLayer; + +impl LookupSpan<'lookup>> Layer for RequestIdLayer { + /// When creating a new span check if it contains the request_id and store + /// it in the trace's extension storage to make it available for lookup + /// later on. + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let Some(span) = ctx.span(id) else { + return; + }; + if span.name() != crate::request_id::SPAN_NAME { + return; + } + + struct RequestIdVisitor(Option); + impl Visit for RequestIdVisitor { + // empty body because we want to use `record_str()` anyway + fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {} + + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "id" { + self.0 = Some(RequestId(value.to_string())); + } + } + } + + let mut visitor = RequestIdVisitor(None); + attrs.values().record(&mut visitor); + + if let Some(request_id) = visitor.0 { + span.extensions_mut().insert(request_id); + } + } +} + #[cfg(test)] mod test { - use super::*; + use {super::*, tracing::Instrument}; #[tokio::test] - async fn request_id_copied_to_new_task() { - // use channels to enforce that assertions happen in the desired order. - // First we assert that the parent task's storage is empty after we - // spawned the child task. - // Afterwards we assert that the child task still has the parent task's - // value at the time of spawning. - let (sender1, receiver1) = tokio::sync::oneshot::channel(); - let (sender2, receiver2) = tokio::sync::oneshot::channel(); - - spawn_task_with_current_request_id(async { - assert_eq!(None, get_task_local_storage()); - }) + async fn request_id_from_current_span() { + crate::tracing::initialize_reentrant("error"); + async { + assert_eq!( + Some("test".to_string()), + crate::request_id::from_current_span() + ); + } + .instrument(info_span("test".to_string())) + .await + } + + #[tokio::test] + async fn request_id_not_set() { + crate::tracing::initialize_reentrant("debug"); + async { + assert_eq!(None, crate::request_id::from_current_span()); + } + .await + } + + #[tokio::test] + async fn request_id_from_ancestor_span() { + crate::tracing::initialize_reentrant("error"); + async { + async { + async { + // we traverse the span hierarchy until we find a span with the request id + assert_eq!( + Some("test".to_string()), + crate::request_id::from_current_span() + ); + } + .instrument(tracing::info_span!("wrap2", value = "value2")) + .await + } + .instrument(tracing::info_span!("wrap1", value = "value1")) + .await + } + .instrument(info_span("test".to_string())) + .await + } + + #[tokio::test] + async fn request_id_from_first_ancestor_span() { + crate::tracing::initialize_reentrant("error"); + async { + async { + async { + // if multiple ancestors have a request id we take the closest one + assert_eq!( + Some("test_inner".to_string()), + crate::request_id::from_current_span() + ); + } + .instrument(tracing::info_span!("wrap", value = "value")) + .await + } + .instrument(info_span("test_inner".to_string())) + .await + } + .instrument(info_span("test".to_string())) + .await + } + + #[tokio::test] + async fn request_id_within_spawned_task() { + crate::tracing::initialize_reentrant("error"); + async { + tokio::spawn( + async { + // we can spawn a new task and still find the request id if the spawned task + // was instrumented with a span that contains the request id + assert_eq!( + Some("test".to_string()), + crate::request_id::from_current_span() + ); + } + .instrument(Span::current()), + ) + .await + .unwrap(); + } + .instrument(info_span("test".to_string())) .await - .unwrap(); - - // create a task with some task local value - let _ = set_task_local_storage("1234".into(), async { - // spawn a new task that copies the parent's task local value - assert_eq!(Some("1234".into()), get_task_local_storage()); - spawn_task_with_current_request_id(async { - receiver1.await.unwrap(); - assert_eq!(Some("1234".into()), get_task_local_storage()); - sender2.send(()).unwrap(); - }); - }) - .await; - - // task local value is not populated outside of the previous scope - assert_eq!(None, get_task_local_storage()); - sender1.send(()).unwrap(); - - // block test until the important assertion happened - receiver2.await.unwrap(); } } diff --git a/crates/observe/src/tracing.rs b/crates/observe/src/tracing.rs index 7ba48bc52f..5254d16067 100644 --- a/crates/observe/src/tracing.rs +++ b/crates/observe/src/tracing.rs @@ -1,5 +1,5 @@ use { - crate::tracing_reload_handler::spawn_reload_handler, + crate::{request_id::RequestIdLayer, tracing_reload_handler::spawn_reload_handler}, std::{panic::PanicHookInfo, sync::Once}, time::macros::format_description, tracing::level_filters::LevelFilter, @@ -82,6 +82,7 @@ fn set_tracing_subscriber(env_filter: &str, stderr_threshold: LevelFilter) { tracing_subscriber::registry() .with(console_subscriber::spawn()) .with(fmt_layer!(env_filter, stderr_threshold)) + .with(RequestIdLayer) .init(); tracing::info!("started programm with support for tokio-console"); @@ -97,6 +98,7 @@ fn set_tracing_subscriber(env_filter: &str, stderr_threshold: LevelFilter) { // `sqlx` uses under the hood. .with(tracing::level_filters::LevelFilter::TRACE) .with(fmt_layer!(env_filter, stderr_threshold)) + .with(RequestIdLayer) .init(); tracing::info!("started programm without support for tokio-console"); diff --git a/crates/orderbook/src/run.rs b/crates/orderbook/src/run.rs index e1824713db..612b62e4c4 100644 --- a/crates/orderbook/src/run.rs +++ b/crates/orderbook/src/run.rs @@ -479,7 +479,7 @@ fn serve_api( .boxed(); tracing::info!(%address, "serving order book"); let warp_svc = warp::service(filter); - let warp_svc = observe::make_service_with_task_local_storage!(warp_svc); + let warp_svc = observe::make_service_with_request_tracing!(warp_svc); let server = hyper::Server::bind(&address) .serve(warp_svc) .with_graceful_shutdown(shutdown_receiver) diff --git a/crates/shared/src/trade_finding/external.rs b/crates/shared/src/trade_finding/external.rs index 1ab83f9d0f..2d6c67d5f7 100644 --- a/crates/shared/src/trade_finding/external.rs +++ b/crates/shared/src/trade_finding/external.rs @@ -70,7 +70,7 @@ impl ExternalTradeFinder { deadline: chrono::Utc::now() + self.timeout, }; let block_dependent = query.block_dependent; - let id = observe::request_id::get_task_local_storage(); + let id = observe::request_id::from_current_span(); let timeout = self.timeout; let client = self.client.clone(); let quote_endpoint = self.quote_endpoint.clone(); diff --git a/crates/shared/src/zeroex_api.rs b/crates/shared/src/zeroex_api.rs index 23944b7a47..b079ec6e0f 100644 --- a/crates/shared/src/zeroex_api.rs +++ b/crates/shared/src/zeroex_api.rs @@ -392,7 +392,7 @@ impl DefaultZeroExApi { self.block_stream.borrow().hash.to_string(), ); }; - if let Some(id) = observe::request_id::get_task_local_storage() { + if let Some(id) = observe::request_id::from_current_span() { request = request.header("X-REQUEST-ID", id); } diff --git a/crates/solvers/src/api/mod.rs b/crates/solvers/src/api/mod.rs index 41affc375d..e9b6f220a6 100644 --- a/crates/solvers/src/api/mod.rs +++ b/crates/solvers/src/api/mod.rs @@ -35,7 +35,7 @@ impl Api { // axum's default body limit needs to be disabled to not have the default limit on top of our custom limit .layer(axum::extract::DefaultBodyLimit::disable()); - let make_svc = observe::make_service_with_task_local_storage!(app); + let make_svc = observe::make_service_with_request_tracing!(app); let server = axum::Server::bind(&self.addr).serve(make_svc); if let Some(bind) = bind {