Skip to content

Commit

Permalink
Instrument the opencensus exporter (#362)
Browse files Browse the repository at this point in the history
Fixes linkerd/linkerd2#3453

Instrument the opencensus exporter with the following metrics:

```
# HELP span_export_streams Total count of opened span export streams
# TYPE span_export_streams counter
span_export_streams 1
# HELP span_export_requests Total count of span export request messages
# TYPE span_export_requests counter
span_export_requests 497
# HELP span_exports Total count of spans exported
# TYPE span_exports counter
span_exports 994
```

Signed-off-by: Alex Leong <alex@buoyant.io>
  • Loading branch information
adleong authored Sep 25, 2019
1 parent 77f1877 commit b15735b
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ version = "0.1.0"
dependencies = [
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"linkerd2-error 0.1.0",
"linkerd2-metrics 0.1.0",
"opencensus-proto 0.1.0",
"tokio 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)",
Expand Down
1 change: 1 addition & 0 deletions lib/linkerd2-opencensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ publish = false
[dependencies]
futures = "0.1"
linkerd2-error = { path = "../linkerd2-error" }
linkerd2-metrics = { path = "../linkerd2-metrics" }
opencensus-proto = { path = "../opencensus-proto" }
tokio = "0.1"
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc", default-features = false, features = ["protobuf"] }
Expand Down
26 changes: 21 additions & 5 deletions lib/linkerd2-opencensus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use futures::{task, try_ready, Async, Future, Poll, Stream};
use linkerd2_error::Error;
use metrics::Registry;
use opencensus_proto::agent::common::v1::Node;
use opencensus_proto::agent::trace::v1::{
client::TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use opencensus_proto::trace::v1::Span;
use std::convert::TryInto;
use tokio::sync::mpsc;
use tower_grpc::{
self as grpc, client::streaming::ResponseFuture, generic::client::GrpcService, BoxBody,
};
use tracing::trace;

pub mod metrics;

/// SpanExporter sends a Stream of spans to the given TraceService gRPC service.
pub struct SpanExporter<T, S>
where
Expand All @@ -21,6 +25,7 @@ where
state: State<T>,
spans: S,
max_batch_size: usize,
metrics: Registry,
}

enum State<T: GrpcService<BoxBody>> {
Expand All @@ -32,6 +37,7 @@ enum State<T: GrpcService<BoxBody>> {
node: Option<Node>,
// We hold the response future, but never poll it.
_rsp: ResponseFuture<ExportTraceServiceResponse, T::Future>,
metrics: Registry,
},
}

Expand All @@ -49,25 +55,30 @@ where
{
const DEFAULT_MAX_BATCH_SIZE: usize = 100;

pub fn new(client: T, node: Node, spans: S) -> Self {
pub fn new(client: T, node: Node, spans: S, metrics: Registry) -> Self {
Self {
client,
node,
spans,
state: State::Idle,
max_batch_size: Self::DEFAULT_MAX_BATCH_SIZE,
metrics,
}
}

fn do_send(
spans: Vec<Span>,
sender: &mut mpsc::Sender<ExportTraceServiceRequest>,
node: &mut Option<Node>,
metrics: &mut Registry,
) -> Result<(), StreamError<S::Error>> {
if spans.is_empty() {
return Ok(());
}

if let Ok(num_spans) = spans.len().try_into() {
metrics.send(num_spans);
}
let req = ExportTraceServiceRequest {
spans,
node: node.take(),
Expand All @@ -93,6 +104,7 @@ where
sender: &mut mpsc::Sender<ExportTraceServiceRequest>,
node: &mut Option<Node>,
max_batch_size: usize,
metrics: &mut Registry,
) -> Poll<(), StreamError<S::Error>> {
try_ready!(sender.poll_ready().map_err(|_| StreamError::SenderLost));

Expand All @@ -101,13 +113,13 @@ where
match receiver.poll() {
Ok(Async::NotReady) => {
// If any spans have been collected send them, potentially consuming `node.
Self::do_send(spans, sender, node)?;
Self::do_send(spans, sender, node, metrics)?;
return Ok(Async::NotReady);
}
Ok(Async::Ready(Some(span))) => {
spans.push(span);
if spans.len() == max_batch_size {
Self::do_send(spans, sender, node)?;
Self::do_send(spans, sender, node, metrics)?;
// Because we've voluntarily stopped work due to a batch
// size limitation, notify the task to be polled again
// immediately.
Expand All @@ -116,12 +128,12 @@ where
}
}
Ok(Async::Ready(None)) => {
let _ = Self::do_send(spans, sender, node);
let _ = Self::do_send(spans, sender, node, metrics);
// The span receiver stream completed, so signal completion.
return Ok(Async::Ready(()));
}
Err(e) => {
let _ = Self::do_send(spans, sender, node);
let _ = Self::do_send(spans, sender, node, metrics);
return Err(StreamError::Receiver(e));
}
}
Expand Down Expand Up @@ -150,16 +162,19 @@ where
.map_err(|_| grpc::Status::new(grpc::Code::Cancelled, "cancelled")),
);
trace!("Establishing new TraceService::export request");
self.metrics.start_stream();
let _rsp = svc.export(req);
State::Sending {
sender: request_tx,
node: Some(self.node.clone()),
_rsp,
metrics: self.metrics.clone(),
}
}
State::Sending {
ref mut sender,
ref mut node,
ref mut metrics,
..
} => {
let mut sender = sender.clone();
Expand All @@ -168,6 +183,7 @@ where
&mut sender,
node,
self.max_batch_size,
metrics,
) {
Ok(ready) => return Ok(ready),
Err(StreamError::Receiver(e)) => return Err(e.into()),
Expand Down
71 changes: 71 additions & 0 deletions lib/linkerd2-opencensus/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use linkerd2_metrics::{metrics, Counter, FmtMetrics};
use std::fmt;
use std::sync::{Arc, Mutex};
use tracing::error;

metrics! {
opencensus_span_export_streams: Counter { "Total count of opened span export streams" },
opencensus_span_export_requests: Counter { "Total count of span export request messages" },
opencensus_span_exports: Counter { "Total count of spans exported" }
}

struct Metrics {
streams: Counter,
requests: Counter,
spans: Counter,
}

#[derive(Clone)]
pub struct Registry(Arc<Mutex<Metrics>>);

#[derive(Clone)]
pub struct Report(Arc<Mutex<Metrics>>);

pub fn new() -> (Registry, Report) {
let metrics = Metrics {
streams: Counter::default(),
requests: Counter::default(),
spans: Counter::default(),
};
let shared = Arc::new(Mutex::new(metrics));
(Registry(shared.clone()), Report(shared))
}

impl Registry {
pub fn start_stream(&mut self) {
match self.0.lock() {
Ok(mut metrics) => metrics.streams.incr(),
Err(e) => error!(message="failed to lock metrics", %e),
}
}

pub fn send(&mut self, spans: u64) {
match self.0.lock() {
Ok(mut metrics) => {
metrics.requests.incr();
metrics.spans += spans;
}
Err(e) => error!(message="failed to lock metrics", %e),
}
}
}

impl FmtMetrics for Report {
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let metrics = match self.0.lock() {
Err(_) => return Ok(()),
Ok(lock) => lock,
};

opencensus_span_export_streams.fmt_help(f)?;
opencensus_span_export_streams.fmt_metric(f, metrics.streams)?;

opencensus_span_export_requests.fmt_help(f)?;
opencensus_span_export_requests.fmt_metric(f, metrics.requests)?;

opencensus_span_exports.fmt_help(f)?;
opencensus_span_exports.fmt_metric(f, metrics.spans)?;

Ok(())
}
}
7 changes: 5 additions & 2 deletions src/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,17 @@ where

let (transport_metrics, transport_report) = transport::metrics::new();

let (span_metrics, span_report) = linkerd2_opencensus::metrics::new();

let report = endpoint_http_report
.and_then(route_http_report)
.and_then(retry_http_report)
.and_then(transport_report)
//.and_then(tls_config_report)
.and_then(ctl_http_report)
.and_then(handle_time_report)
.and_then(telemetry::process::Report::new(start_time));
.and_then(telemetry::process::Report::new(start_time))
.and_then(span_report);

let mut identity_daemon = None;
let (readiness, ready_latch) = Readiness::new();
Expand Down Expand Up @@ -441,7 +444,7 @@ where
}),
..oc::Node::default()
};
let span_exporter = SpanExporter::new(trace_collector, node, spans_rx);
let span_exporter = SpanExporter::new(trace_collector, node, spans_rx, span_metrics);
task::spawn(span_exporter.map_err(|e| {
error!("span exporter failed: {}", e);
}));
Expand Down

0 comments on commit b15735b

Please sign in to comment.