Skip to content

Commit

Permalink
Add http metadata to spans as annotations (#359)
Browse files Browse the repository at this point in the history
We collect the following data from a sampled request/response and add it to the emitted span as an attribute map:

* http.method
* http.path
* http.authority OR http.host
* http.status_code

Signed-off-by: Alex Leong <alex@buoyant.io>
  • Loading branch information
adleong authored Sep 24, 2019
1 parent 7626c2e commit 77f1877
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
40 changes: 35 additions & 5 deletions lib/linkerd2-trace-context/src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{propagation, Span, SpanSink};
use futures::{try_ready, Async, Future, Poll};
use std::collections::HashMap;
use std::time::SystemTime;
use tracing::{trace, warn};

Expand Down Expand Up @@ -99,9 +100,9 @@ impl<F: Future, S> Future for MakeFuture<F, S> {

// === impl Service ===

impl<Svc, B, S> tower::Service<http::Request<B>> for Service<Svc, S>
impl<Svc, B1, B2, S> tower::Service<http::Request<B1>> for Service<Svc, S>
where
Svc: tower::Service<http::Request<B>>,
Svc: tower::Service<http::Request<B1>, Response = http::Response<B2>>,
S: SpanSink + Clone,
{
type Response = Svc::Response;
Expand All @@ -112,7 +113,7 @@ where
self.inner.poll_ready()
}

fn call(&mut self, mut request: http::Request<B>) -> Self::Future {
fn call(&mut self, mut request: http::Request<B1>) -> Self::Future {
let sink = match &self.sink {
Some(sink) => sink.clone(),
None => {
Expand All @@ -137,6 +138,8 @@ where
.uri()
.path_and_query()
.map(|pq| pq.as_str().to_owned());
let mut labels = HashMap::new();
request_labels(&mut labels, &request);
span = Some(Span {
trace_id: context.trace_id,
span_id,
Expand All @@ -145,6 +148,7 @@ where
start: SystemTime::now(),
// End time will be updated when the span completes.
end: SystemTime::UNIX_EPOCH,
labels,
});
}
}
Expand All @@ -160,9 +164,9 @@ where

// === impl SpanFuture ===

impl<F, S> Future for ResponseFuture<F, S>
impl<F, S, B2> Future for ResponseFuture<F, S>
where
F: Future,
F: Future<Item = http::Response<B2>>,
S: SpanSink,
{
type Item = F::Item;
Expand All @@ -172,6 +176,7 @@ where
let inner = try_ready!(self.inner.poll());
if let Some((mut span, mut sink)) = self.trace.take() {
span.end = SystemTime::now();
response_labels(&mut span.labels, &inner);
trace!(message = "emitting span", ?span);
if sink.try_send(span).is_err() {
warn!("span dropped due to backpressure");
Expand All @@ -180,3 +185,28 @@ where
Ok(Async::Ready(inner))
}
}

fn request_labels<Body>(labels: &mut HashMap<String, String>, req: &http::Request<Body>) {
labels.insert("http.method".to_string(), format!("{}", req.method()));
let path = req
.uri()
.path_and_query()
.map(|pq| pq.as_str().to_owned())
.unwrap_or_default();
labels.insert("http.path".to_string(), path);
if let Some(authority) = req.uri().authority_part() {
labels.insert("http.authority".to_string(), authority.as_str().to_string());
}
if let Some(host) = req.headers().get("host") {
if let Ok(host) = host.to_str() {
labels.insert("http.host".to_string(), host.to_string());
}
}
}

fn response_labels<Body>(labels: &mut HashMap<String, String>, rsp: &http::Response<Body>) {
labels.insert(
"http.status_code".to_string(),
rsp.status().as_str().to_string(),
);
}
2 changes: 2 additions & 0 deletions lib/linkerd2-trace-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use bytes::Bytes;
use futures::Sink;
use linkerd2_error::Error;
use rand::Rng;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt;
use std::time::SystemTime;
Expand Down Expand Up @@ -30,6 +31,7 @@ pub struct Span {
pub span_name: String,
pub start: SystemTime,
pub end: SystemTime,
pub labels: HashMap<String, String>,
}

pub trait SpanSink {
Expand Down
28 changes: 25 additions & 3 deletions src/app/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,26 @@ impl SpanConverter {
}
}

fn mk_span(&self, span: trace_context::Span) -> Result<oc::Span, IdLengthError> {
fn mk_span(&self, mut span: trace_context::Span) -> Result<oc::Span, IdLengthError> {
let mut attributes = HashMap::<String, oc::AttributeValue>::new();
for (k, v) in self.labels.iter() {
attributes.insert(
k.clone(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(
v.clone(),
))),
},
);
}
for (k, v) in span.labels.drain() {
attributes.insert(
k,
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(v))),
},
);
}
Ok(oc::Span {
trace_id: into_bytes(span.trace_id, 16)?,
span_id: into_bytes(span.span_id, 8)?,
Expand All @@ -66,11 +85,14 @@ impl SpanConverter {
kind: self.kind,
start_time: Some(span.start.into()),
end_time: Some(span.end.into()),
attributes: None, // TODO: attributes
attributes: Some(oc::span::Attributes {
attribute_map: attributes,
dropped_attributes_count: 0,
}),
stack_trace: None,
time_events: None,
links: None,
status: None, // TODO: record this
status: None, // TODO: this is gRPC status; we must read response trailers to populate this
resource: None,
same_process_as_parent_span: Some(self.kind == SPAN_KIND_CLIENT),
child_span_count: None,
Expand Down

0 comments on commit 77f1877

Please sign in to comment.