diff --git a/Cargo.lock b/Cargo.lock index cf2bda85b0..2993bf0ae7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -554,6 +554,7 @@ version = "0.1.0" dependencies = [ "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", "linkerd2-error 0.1.0", + "linkerd2-metrics 0.1.0", "opencensus-proto 0.1.0", "tokio 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", "tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)", diff --git a/lib/linkerd2-opencensus/Cargo.toml b/lib/linkerd2-opencensus/Cargo.toml index a1f188d969..3034d082d6 100644 --- a/lib/linkerd2-opencensus/Cargo.toml +++ b/lib/linkerd2-opencensus/Cargo.toml @@ -8,6 +8,7 @@ publish = false [dependencies] futures = "0.1" linkerd2-error = { path = "../linkerd2-error" } +linkerd2-metrics = { path = "../linkerd2-metrics" } opencensus-proto = { path = "../opencensus-proto" } tokio = "0.1" tower-grpc = { git = "https://github.com/tower-rs/tower-grpc", default-features = false, features = ["protobuf"] } diff --git a/lib/linkerd2-opencensus/src/lib.rs b/lib/linkerd2-opencensus/src/lib.rs index 22a9546056..f861692e07 100644 --- a/lib/linkerd2-opencensus/src/lib.rs +++ b/lib/linkerd2-opencensus/src/lib.rs @@ -1,16 +1,20 @@ use futures::{task, try_ready, Async, Future, Poll, Stream}; use linkerd2_error::Error; +use metrics::Registry; use opencensus_proto::agent::common::v1::Node; use opencensus_proto::agent::trace::v1::{ client::TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse, }; use opencensus_proto::trace::v1::Span; +use std::convert::TryInto; use tokio::sync::mpsc; use tower_grpc::{ self as grpc, client::streaming::ResponseFuture, generic::client::GrpcService, BoxBody, }; use tracing::trace; +pub mod metrics; + /// SpanExporter sends a Stream of spans to the given TraceService gRPC service. pub struct SpanExporter where @@ -21,6 +25,7 @@ where state: State, spans: S, max_batch_size: usize, + metrics: Registry, } enum State> { @@ -32,6 +37,7 @@ enum State> { node: Option, // We hold the response future, but never poll it. _rsp: ResponseFuture, + metrics: Registry, }, } @@ -49,13 +55,14 @@ where { const DEFAULT_MAX_BATCH_SIZE: usize = 100; - pub fn new(client: T, node: Node, spans: S) -> Self { + pub fn new(client: T, node: Node, spans: S, metrics: Registry) -> Self { Self { client, node, spans, state: State::Idle, max_batch_size: Self::DEFAULT_MAX_BATCH_SIZE, + metrics, } } @@ -63,11 +70,15 @@ where spans: Vec, sender: &mut mpsc::Sender, node: &mut Option, + metrics: &mut Registry, ) -> Result<(), StreamError> { if spans.is_empty() { return Ok(()); } + if let Ok(num_spans) = spans.len().try_into() { + metrics.send(num_spans); + } let req = ExportTraceServiceRequest { spans, node: node.take(), @@ -93,6 +104,7 @@ where sender: &mut mpsc::Sender, node: &mut Option, max_batch_size: usize, + metrics: &mut Registry, ) -> Poll<(), StreamError> { try_ready!(sender.poll_ready().map_err(|_| StreamError::SenderLost)); @@ -101,13 +113,13 @@ where match receiver.poll() { Ok(Async::NotReady) => { // If any spans have been collected send them, potentially consuming `node. - Self::do_send(spans, sender, node)?; + Self::do_send(spans, sender, node, metrics)?; return Ok(Async::NotReady); } Ok(Async::Ready(Some(span))) => { spans.push(span); if spans.len() == max_batch_size { - Self::do_send(spans, sender, node)?; + Self::do_send(spans, sender, node, metrics)?; // Because we've voluntarily stopped work due to a batch // size limitation, notify the task to be polled again // immediately. @@ -116,12 +128,12 @@ where } } Ok(Async::Ready(None)) => { - let _ = Self::do_send(spans, sender, node); + let _ = Self::do_send(spans, sender, node, metrics); // The span receiver stream completed, so signal completion. return Ok(Async::Ready(())); } Err(e) => { - let _ = Self::do_send(spans, sender, node); + let _ = Self::do_send(spans, sender, node, metrics); return Err(StreamError::Receiver(e)); } } @@ -150,16 +162,19 @@ where .map_err(|_| grpc::Status::new(grpc::Code::Cancelled, "cancelled")), ); trace!("Establishing new TraceService::export request"); + self.metrics.start_stream(); let _rsp = svc.export(req); State::Sending { sender: request_tx, node: Some(self.node.clone()), _rsp, + metrics: self.metrics.clone(), } } State::Sending { ref mut sender, ref mut node, + ref mut metrics, .. } => { let mut sender = sender.clone(); @@ -168,6 +183,7 @@ where &mut sender, node, self.max_batch_size, + metrics, ) { Ok(ready) => return Ok(ready), Err(StreamError::Receiver(e)) => return Err(e.into()), diff --git a/lib/linkerd2-opencensus/src/metrics.rs b/lib/linkerd2-opencensus/src/metrics.rs new file mode 100644 index 0000000000..aa4c7cbd96 --- /dev/null +++ b/lib/linkerd2-opencensus/src/metrics.rs @@ -0,0 +1,71 @@ +use linkerd2_metrics::{metrics, Counter, FmtMetrics}; +use std::fmt; +use std::sync::{Arc, Mutex}; +use tracing::error; + +metrics! { + opencensus_span_export_streams: Counter { "Total count of opened span export streams" }, + opencensus_span_export_requests: Counter { "Total count of span export request messages" }, + opencensus_span_exports: Counter { "Total count of spans exported" } +} + +struct Metrics { + streams: Counter, + requests: Counter, + spans: Counter, +} + +#[derive(Clone)] +pub struct Registry(Arc>); + +#[derive(Clone)] +pub struct Report(Arc>); + +pub fn new() -> (Registry, Report) { + let metrics = Metrics { + streams: Counter::default(), + requests: Counter::default(), + spans: Counter::default(), + }; + let shared = Arc::new(Mutex::new(metrics)); + (Registry(shared.clone()), Report(shared)) +} + +impl Registry { + pub fn start_stream(&mut self) { + match self.0.lock() { + Ok(mut metrics) => metrics.streams.incr(), + Err(e) => error!(message="failed to lock metrics", %e), + } + } + + pub fn send(&mut self, spans: u64) { + match self.0.lock() { + Ok(mut metrics) => { + metrics.requests.incr(); + metrics.spans += spans; + } + Err(e) => error!(message="failed to lock metrics", %e), + } + } +} + +impl FmtMetrics for Report { + fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let metrics = match self.0.lock() { + Err(_) => return Ok(()), + Ok(lock) => lock, + }; + + opencensus_span_export_streams.fmt_help(f)?; + opencensus_span_export_streams.fmt_metric(f, metrics.streams)?; + + opencensus_span_export_requests.fmt_help(f)?; + opencensus_span_export_requests.fmt_metric(f, metrics.requests)?; + + opencensus_span_exports.fmt_help(f)?; + opencensus_span_exports.fmt_metric(f, metrics.spans)?; + + Ok(()) + } +} diff --git a/src/app/main.rs b/src/app/main.rs index 886b57c647..5897d567fa 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -232,6 +232,8 @@ where let (transport_metrics, transport_report) = transport::metrics::new(); + let (span_metrics, span_report) = linkerd2_opencensus::metrics::new(); + let report = endpoint_http_report .and_then(route_http_report) .and_then(retry_http_report) @@ -239,7 +241,8 @@ where //.and_then(tls_config_report) .and_then(ctl_http_report) .and_then(handle_time_report) - .and_then(telemetry::process::Report::new(start_time)); + .and_then(telemetry::process::Report::new(start_time)) + .and_then(span_report); let mut identity_daemon = None; let (readiness, ready_latch) = Readiness::new(); @@ -441,7 +444,7 @@ where }), ..oc::Node::default() }; - let span_exporter = SpanExporter::new(trace_collector, node, spans_rx); + let span_exporter = SpanExporter::new(trace_collector, node, spans_rx, span_metrics); task::spawn(span_exporter.map_err(|e| { error!("span exporter failed: {}", e); }));