diff --git a/Cargo.lock b/Cargo.lock index 4a8a6d2..8476565 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -525,6 +525,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.4.5" @@ -955,6 +961,7 @@ dependencies = [ "futures-channel", "futures-executor", "futures-util", + "glob", "once_cell", "opentelemetry", "percent-encoding", diff --git a/Cargo.toml b/Cargo.toml index fd1b6f7..5109e08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread", "signal", tokio-util = "0.7.12" # OTLP dependencies -opentelemetry-proto = { version = "0.26.1", default-features = false, features = ["gen-tonic-messages", "logs", "trace"], optional = true } +opentelemetry-proto = { version = "0.26.1", default-features = false, features = ["gen-tonic-messages", "logs", "trace", "metrics"], optional = true } opentelemetry = { version = "0.26.0", default-features = false, features = ["trace"], optional = true } opentelemetry_sdk = { version = "0.26.0", default-features = false, features = ["trace"], optional = true } tracing-core = {version = "0.1.32", optional = true } diff --git a/src/main.rs b/src/main.rs index d25d9e8..fb4961d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -136,7 +136,7 @@ fn setup_tracing( environ.otlp_endpoint.clone(), environ.otlp_auth.clone(), environ.trace_attributes(), - Duration::from_millis(1000), + Duration::from_secs(30), cancel_token, ); (el_reg, handle) @@ -180,9 +180,20 @@ mod tests { #[tokio::test] async fn test_setup_tracing() { + let mut server = mockito::Server::new_async().await; + let url = server.url(); + let mock = server.mock("POST", "/v1/metrics").create_async().await; + + let rest_mock = server + .mock("POST", mockito::Matcher::Any) + // Expect no other requests + .expect(0) + .create_async() + .await; + let cancel_token = CancellationToken::new(); let env = Env { - otlp_endpoint: Some("url".to_string()), + otlp_endpoint: Some(url), otlp_auth: Some("unittest".to_string()), ..Env::default() }; @@ -194,6 +205,8 @@ mod tests { if let Some(handle) = handle { handle.await.unwrap(); } + mock.assert_async().await; + rest_mock.assert_async().await; } #[tokio::test] diff --git a/src/otlp/log.rs b/src/otlp/log.rs index 0c45356..8f48565 100644 --- a/src/otlp/log.rs +++ b/src/otlp/log.rs @@ -70,10 +70,10 @@ pub struct OtlpLogLayer { // Public methods impl OtlpLogLayer { - pub fn new(otlp_endpoint: String, otlp_auth: String) -> Self { + pub fn new(otlp_endpoint: &str, otlp_auth: &str) -> Self { Self { - otlp_endpoint, - otlp_auth, + otlp_endpoint: otlp_endpoint.to_string(), + otlp_auth: otlp_auth.to_string(), records: Arc::new(RwLock::new(vec![])), } } @@ -229,7 +229,7 @@ mod tests { .await; // init tracing with the otlp layer - let otlp_layer = OtlpLogLayer::new(url, "unittest_auth".to_string()); + let otlp_layer = OtlpLogLayer::new(&url, "unittest_auth"); let otlp_clone = otlp_layer.clone(); let subscriber = tracing_subscriber::registry() .with(SpanIdLayer::default()) @@ -277,7 +277,7 @@ mod tests { .await; // init tracing with the otlp layer - let otlp_layer = OtlpLogLayer::new(url, "".into()); + let otlp_layer = OtlpLogLayer::new(&url, ""); let otlp_clone = otlp_layer.clone(); let subscriber = tracing_subscriber::registry() .with(SpanIdLayer::default()) diff --git a/src/otlp/metrics.rs b/src/otlp/metrics.rs new file mode 100644 index 0000000..c87ed17 --- /dev/null +++ b/src/otlp/metrics.rs @@ -0,0 +1,168 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use prost::Message; +use time::OffsetDateTime; + +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; +use opentelemetry_proto::tonic::metrics::v1::{ + metric::Data, number_data_point::Value, AggregationTemporality, Metric, NumberDataPoint, + ResourceMetrics, ScopeMetrics, Sum, +}; +use opentelemetry_proto::tonic::resource::v1::Resource; + +use crate::otlp::Toilet; +use crate::USER_AGENT; + +/// Set of metrics to track +#[derive(Debug)] +struct Metrics { + uptime: UptimeMetric, +} + +impl Default for Metrics { + fn default() -> Self { + Self { + uptime: UptimeMetric::new(), + } + } +} + +impl Metrics { + fn as_metrics(&self, attributes: &[KeyValue]) -> Vec { + vec![self.uptime.as_metric(attributes)] + } +} + +#[derive(Debug)] +struct UptimeMetric { + /// Moment this metric started measuring + start_ns: f64, +} + +impl UptimeMetric { + fn new() -> Self { + Self { + start_ns: OffsetDateTime::now_utc().unix_timestamp_nanos() as f64, + } + } + + fn as_metric(&self, attributes: &[KeyValue]) -> Metric { + let now = OffsetDateTime::now_utc().unix_timestamp_nanos(); + let now_u64: u64 = now.try_into().expect("timestamp does not fit in u64"); + let diff = (now as f64 - self.start_ns) / 1_000_000_000.0; + Metric { + name: "pyoci_uptime".to_string(), + description: "Time in seconds this instance has been running".to_string(), + unit: "seconds".to_string(), + data: Some(Data::Sum(Sum { + data_points: vec![NumberDataPoint { + attributes: attributes.to_vec(), + start_time_unix_nano: now_u64, + time_unix_nano: now_u64, + value: Some(Value::AsDouble(diff)), + ..NumberDataPoint::default() + }], + aggregation_temporality: AggregationTemporality::Cumulative.into(), + is_monotonic: true, + })), + metadata: vec![], + } + } +} + +/// Convert metrics into a ExportMetricsServiceRequest +/// +fn build_metrics_export_body( + metrics: &Metrics, + attributes: &HashMap<&str, Option>, +) -> ExportMetricsServiceRequest { + let mut attrs = vec![]; + for (key, value) in attributes { + let Some(value) = value else { + continue; + }; + attrs.push(KeyValue { + key: (*key).into(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue(value.into())), + }), + }); + } + let scope_metrics = ScopeMetrics { + scope: None, + metrics: metrics.as_metrics(&attrs), + schema_url: "".to_string(), + }; + let resource_metrics = ResourceMetrics { + resource: Some(Resource { + attributes: attrs, + dropped_attributes_count: 0, + }), + scope_metrics: vec![scope_metrics], + schema_url: "".to_string(), + }; + ExportMetricsServiceRequest { + resource_metrics: vec![resource_metrics], + } +} + +/// Tracing Layer for pushing metrics to an OTLP consumer. +#[derive(Debug, Clone)] +pub struct OtlpMetricsLayer { + otlp_endpoint: String, + otlp_auth: String, + /// Buffer of Metrics + metrics: Arc, +} + +// Public methods +impl OtlpMetricsLayer { + pub fn new(otlp_endpoint: &str, otlp_auth: &str) -> Self { + Self { + otlp_endpoint: otlp_endpoint.to_string(), + otlp_auth: otlp_auth.to_string(), + metrics: Arc::new(Metrics::default()), + } + } +} + +impl Toilet for OtlpMetricsLayer { + /// Push all recorded log messages to the OTLP collector + /// This should be called at the end of every request, after the span is closed + async fn flush(&self, attributes: &HashMap<&str, Option>) { + let client = reqwest::Client::builder() + .user_agent(USER_AGENT) + .timeout(Duration::from_secs(10)) + .build() + .unwrap(); + + let body = build_metrics_export_body(&self.metrics, attributes).encode_to_vec(); + let mut url = url::Url::parse(&self.otlp_endpoint).unwrap(); + url.path_segments_mut().unwrap().extend(&["v1", "metrics"]); + // send to OTLP Collector + match client + .post(url) + .header("Content-Type", "application/x-protobuf") + .header("Authorization", &self.otlp_auth) + .body(body) + .send() + .await + { + Ok(response) => { + if !response.status().is_success() { + tracing::info!("Failed to send metrics to OTLP: {:?}", response); + tracing::info!("Response body: {:?}", response.text().await.unwrap()); + } else { + tracing::info!("Metrics sent to OTLP: {:?}", response); + }; + } + Err(err) => { + tracing::info!("Error sending metrics to OTLP: {:?}", err); + } + }; + } +} diff --git a/src/otlp/mod.rs b/src/otlp/mod.rs index c10368a..9c6e630 100644 --- a/src/otlp/mod.rs +++ b/src/otlp/mod.rs @@ -1,6 +1,8 @@ mod log; +mod metrics; mod trace; +use metrics::OtlpMetricsLayer; use std::collections::HashMap; use tokio::task::JoinHandle; use tokio::time::{interval, Duration, MissedTickBehavior}; @@ -40,15 +42,16 @@ where let (Some(otlp_endpoint), Some(otlp_auth)) = (otlp_endpoint, otlp_auth) else { return (Box::new(subscriber), None); }; - let log_layer = crate::otlp::OtlpLogLayer::new(otlp_endpoint.clone(), otlp_auth.clone()); - let trace_layer = crate::otlp::OtlpTraceLayer::new(otlp_endpoint, otlp_auth); + let log_layer = crate::otlp::OtlpLogLayer::new(&otlp_endpoint, &otlp_auth); + let trace_layer = crate::otlp::OtlpTraceLayer::new(&otlp_endpoint, &otlp_auth); + let metrics = crate::otlp::metrics::OtlpMetricsLayer::new(&otlp_endpoint, &otlp_auth); let subscriber = subscriber .with(SpanIdLayer::default()) .with(SpanTimeLayer::default()) .with(log_layer.clone()) .with(trace_layer.clone()); - let otlp_layer = (log_layer, trace_layer); + let otlp_layer = (log_layer, trace_layer, metrics); // A task that will flush every second let handle = tokio::spawn(async move { @@ -72,11 +75,12 @@ pub trait Toilet { async fn flush(&self, _attributes: &HashMap<&str, Option>); } -type OtlpLayer = (OtlpLogLayer, OtlpTraceLayer); +type OtlpLayer = (OtlpLogLayer, OtlpTraceLayer, OtlpMetricsLayer); impl Toilet for OtlpLayer { async fn flush(&self, attributes: &HashMap<&str, Option>) { self.0.flush(attributes).await; self.1.flush(attributes).await; + self.2.flush(attributes).await; } } diff --git a/src/otlp/trace.rs b/src/otlp/trace.rs index 8ddc488..eaf2eda 100644 --- a/src/otlp/trace.rs +++ b/src/otlp/trace.rs @@ -72,10 +72,10 @@ pub struct OtlpTraceLayer { // Public methods impl OtlpTraceLayer { - pub fn new(otlp_endpoint: String, otlp_auth: String) -> Self { + pub fn new(otlp_endpoint: &str, otlp_auth: &str) -> Self { Self { - otlp_endpoint, - otlp_auth, + otlp_endpoint: otlp_endpoint.to_string(), + otlp_auth: otlp_auth.to_string(), spans: Arc::new(RwLock::new(vec![])), } } @@ -349,7 +349,7 @@ mod tests { .await; // init tracing with the otlp layer - let otlp_layer = OtlpTraceLayer::new(url, "unittest_auth".to_string()); + let otlp_layer = OtlpTraceLayer::new(&url, "unittest_auth"); let otlp_clone = otlp_layer.clone(); let subscriber = tracing_subscriber::registry() .with(SpanIdLayer::default()) @@ -396,7 +396,7 @@ mod tests { .await; // init tracing with the otlp layer - let otlp_layer = OtlpTraceLayer::new(url, "".into()); + let otlp_layer = OtlpTraceLayer::new(&url, ""); let otlp_clone = otlp_layer.clone(); let subscriber = tracing_subscriber::registry() .with(SpanIdLayer::default())