Skip to content

Commit

Permalink
feat(OTLP): Collect uptime metrics
Browse files Browse the repository at this point in the history
Since azure does not provided a good way to monitor uptime, which is
needed to calculate how much of the free tier you have used, I now
collect the time since startup as an OTLP metric.
  • Loading branch information
AllexVeldman committed Oct 18, 2024
1 parent 9bc9aa6 commit ece5069
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 17 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread", "signal",
tokio-util = "0.7.12"

# OTLP dependencies
opentelemetry-proto = { version = "0.26.1", default-features = false, features = ["gen-tonic-messages", "logs", "trace"], optional = true }
opentelemetry-proto = { version = "0.26.1", default-features = false, features = ["gen-tonic-messages", "logs", "trace", "metrics"], optional = true }
opentelemetry = { version = "0.26.0", default-features = false, features = ["trace"], optional = true }
opentelemetry_sdk = { version = "0.26.0", default-features = false, features = ["trace"], optional = true }
tracing-core = {version = "0.1.32", optional = true }
Expand Down
17 changes: 15 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn setup_tracing(
environ.otlp_endpoint.clone(),
environ.otlp_auth.clone(),
environ.trace_attributes(),
Duration::from_millis(1000),
Duration::from_secs(30),
cancel_token,
);
(el_reg, handle)
Expand Down Expand Up @@ -180,9 +180,20 @@ mod tests {

#[tokio::test]
async fn test_setup_tracing() {
let mut server = mockito::Server::new_async().await;
let url = server.url();
let mock = server.mock("POST", "/v1/metrics").create_async().await;

let rest_mock = server
.mock("POST", mockito::Matcher::Any)
// Expect no other requests
.expect(0)
.create_async()
.await;

let cancel_token = CancellationToken::new();
let env = Env {
otlp_endpoint: Some("url".to_string()),
otlp_endpoint: Some(url),
otlp_auth: Some("unittest".to_string()),
..Env::default()
};
Expand All @@ -194,6 +205,8 @@ mod tests {
if let Some(handle) = handle {
handle.await.unwrap();
}
mock.assert_async().await;
rest_mock.assert_async().await;
}

#[tokio::test]
Expand Down
10 changes: 5 additions & 5 deletions src/otlp/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ pub struct OtlpLogLayer {

// Public methods
impl OtlpLogLayer {
pub fn new(otlp_endpoint: String, otlp_auth: String) -> Self {
pub fn new(otlp_endpoint: &str, otlp_auth: &str) -> Self {
Self {
otlp_endpoint,
otlp_auth,
otlp_endpoint: otlp_endpoint.to_string(),
otlp_auth: otlp_auth.to_string(),
records: Arc::new(RwLock::new(vec![])),
}
}
Expand Down Expand Up @@ -229,7 +229,7 @@ mod tests {
.await;

// init tracing with the otlp layer
let otlp_layer = OtlpLogLayer::new(url, "unittest_auth".to_string());
let otlp_layer = OtlpLogLayer::new(&url, "unittest_auth");
let otlp_clone = otlp_layer.clone();
let subscriber = tracing_subscriber::registry()
.with(SpanIdLayer::default())
Expand Down Expand Up @@ -277,7 +277,7 @@ mod tests {
.await;

// init tracing with the otlp layer
let otlp_layer = OtlpLogLayer::new(url, "".into());
let otlp_layer = OtlpLogLayer::new(&url, "");
let otlp_clone = otlp_layer.clone();
let subscriber = tracing_subscriber::registry()
.with(SpanIdLayer::default())
Expand Down
168 changes: 168 additions & 0 deletions src/otlp/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use prost::Message;
use time::OffsetDateTime;

use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value;
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::{
metric::Data, number_data_point::Value, AggregationTemporality, Metric, NumberDataPoint,
ResourceMetrics, ScopeMetrics, Sum,
};
use opentelemetry_proto::tonic::resource::v1::Resource;

use crate::otlp::Toilet;
use crate::USER_AGENT;

/// Set of metrics to track
#[derive(Debug)]
struct Metrics {
uptime: UptimeMetric,
}

impl Default for Metrics {
fn default() -> Self {
Self {
uptime: UptimeMetric::new(),
}
}
}

impl Metrics {
fn as_metrics(&self, attributes: &[KeyValue]) -> Vec<Metric> {
vec![self.uptime.as_metric(attributes)]
}
}

#[derive(Debug)]
struct UptimeMetric {
/// Moment this metric started measuring
start_ns: f64,
}

impl UptimeMetric {
fn new() -> Self {
Self {
start_ns: OffsetDateTime::now_utc().unix_timestamp_nanos() as f64,
}
}

fn as_metric(&self, attributes: &[KeyValue]) -> Metric {
let now = OffsetDateTime::now_utc().unix_timestamp_nanos();
let now_u64: u64 = now.try_into().expect("timestamp does not fit in u64");
let diff = (now as f64 - self.start_ns) / 1_000_000_000.0;
Metric {
name: "pyoci_uptime".to_string(),
description: "Time in seconds this instance has been running".to_string(),
unit: "seconds".to_string(),
data: Some(Data::Sum(Sum {
data_points: vec![NumberDataPoint {
attributes: attributes.to_vec(),
start_time_unix_nano: now_u64,
time_unix_nano: now_u64,
value: Some(Value::AsDouble(diff)),
..NumberDataPoint::default()
}],
aggregation_temporality: AggregationTemporality::Cumulative.into(),
is_monotonic: true,
})),
metadata: vec![],
}
}
}

/// Convert metrics into a ExportMetricsServiceRequest
/// <https://opentelemetry.io/docs/specs/otlp/#otlpgrpc>
fn build_metrics_export_body(
metrics: &Metrics,
attributes: &HashMap<&str, Option<String>>,
) -> ExportMetricsServiceRequest {
let mut attrs = vec![];
for (key, value) in attributes {
let Some(value) = value else {
continue;
};
attrs.push(KeyValue {
key: (*key).into(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue(value.into())),
}),
});
}
let scope_metrics = ScopeMetrics {
scope: None,
metrics: metrics.as_metrics(&attrs),
schema_url: "".to_string(),
};
let resource_metrics = ResourceMetrics {
resource: Some(Resource {
attributes: attrs,
dropped_attributes_count: 0,
}),
scope_metrics: vec![scope_metrics],
schema_url: "".to_string(),
};
ExportMetricsServiceRequest {
resource_metrics: vec![resource_metrics],
}
}

/// Tracing Layer for pushing metrics to an OTLP consumer.
#[derive(Debug, Clone)]
pub struct OtlpMetricsLayer {
otlp_endpoint: String,
otlp_auth: String,
/// Buffer of Metrics
metrics: Arc<Metrics>,
}

// Public methods
impl OtlpMetricsLayer {
pub fn new(otlp_endpoint: &str, otlp_auth: &str) -> Self {
Self {
otlp_endpoint: otlp_endpoint.to_string(),
otlp_auth: otlp_auth.to_string(),
metrics: Arc::new(Metrics::default()),
}
}
}

impl Toilet for OtlpMetricsLayer {
/// Push all recorded log messages to the OTLP collector
/// This should be called at the end of every request, after the span is closed
async fn flush(&self, attributes: &HashMap<&str, Option<String>>) {
let client = reqwest::Client::builder()
.user_agent(USER_AGENT)
.timeout(Duration::from_secs(10))
.build()
.unwrap();

let body = build_metrics_export_body(&self.metrics, attributes).encode_to_vec();
let mut url = url::Url::parse(&self.otlp_endpoint).unwrap();
url.path_segments_mut().unwrap().extend(&["v1", "metrics"]);
// send to OTLP Collector
match client
.post(url)
.header("Content-Type", "application/x-protobuf")
.header("Authorization", &self.otlp_auth)
.body(body)
.send()
.await
{
Ok(response) => {
if !response.status().is_success() {
tracing::info!("Failed to send metrics to OTLP: {:?}", response);
tracing::info!("Response body: {:?}", response.text().await.unwrap());
} else {
tracing::info!("Metrics sent to OTLP: {:?}", response);
};
}
Err(err) => {
tracing::info!("Error sending metrics to OTLP: {:?}", err);

Check warning on line 164 in src/otlp/metrics.rs

View check run for this annotation

Codecov / codecov/patch

src/otlp/metrics.rs#L163-L164

Added lines #L163 - L164 were not covered by tests
}
};
}
}
12 changes: 8 additions & 4 deletions src/otlp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod log;
mod metrics;
mod trace;

use metrics::OtlpMetricsLayer;
use std::collections::HashMap;
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration, MissedTickBehavior};
Expand Down Expand Up @@ -40,15 +42,16 @@ where
let (Some(otlp_endpoint), Some(otlp_auth)) = (otlp_endpoint, otlp_auth) else {
return (Box::new(subscriber), None);
};
let log_layer = crate::otlp::OtlpLogLayer::new(otlp_endpoint.clone(), otlp_auth.clone());
let trace_layer = crate::otlp::OtlpTraceLayer::new(otlp_endpoint, otlp_auth);
let log_layer = crate::otlp::OtlpLogLayer::new(&otlp_endpoint, &otlp_auth);
let trace_layer = crate::otlp::OtlpTraceLayer::new(&otlp_endpoint, &otlp_auth);
let metrics = crate::otlp::metrics::OtlpMetricsLayer::new(&otlp_endpoint, &otlp_auth);

let subscriber = subscriber
.with(SpanIdLayer::default())
.with(SpanTimeLayer::default())
.with(log_layer.clone())
.with(trace_layer.clone());
let otlp_layer = (log_layer, trace_layer);
let otlp_layer = (log_layer, trace_layer, metrics);

// A task that will flush every second
let handle = tokio::spawn(async move {
Expand All @@ -72,11 +75,12 @@ pub trait Toilet {
async fn flush(&self, _attributes: &HashMap<&str, Option<String>>);
}

type OtlpLayer = (OtlpLogLayer, OtlpTraceLayer);
type OtlpLayer = (OtlpLogLayer, OtlpTraceLayer, OtlpMetricsLayer);
impl Toilet for OtlpLayer {
async fn flush(&self, attributes: &HashMap<&str, Option<String>>) {
self.0.flush(attributes).await;
self.1.flush(attributes).await;
self.2.flush(attributes).await;
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/otlp/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ pub struct OtlpTraceLayer {

// Public methods
impl OtlpTraceLayer {
pub fn new(otlp_endpoint: String, otlp_auth: String) -> Self {
pub fn new(otlp_endpoint: &str, otlp_auth: &str) -> Self {
Self {
otlp_endpoint,
otlp_auth,
otlp_endpoint: otlp_endpoint.to_string(),
otlp_auth: otlp_auth.to_string(),
spans: Arc::new(RwLock::new(vec![])),
}
}
Expand Down Expand Up @@ -349,7 +349,7 @@ mod tests {
.await;

// init tracing with the otlp layer
let otlp_layer = OtlpTraceLayer::new(url, "unittest_auth".to_string());
let otlp_layer = OtlpTraceLayer::new(&url, "unittest_auth");
let otlp_clone = otlp_layer.clone();
let subscriber = tracing_subscriber::registry()
.with(SpanIdLayer::default())
Expand Down Expand Up @@ -396,7 +396,7 @@ mod tests {
.await;

// init tracing with the otlp layer
let otlp_layer = OtlpTraceLayer::new(url, "".into());
let otlp_layer = OtlpTraceLayer::new(&url, "");
let otlp_clone = otlp_layer.clone();
let subscriber = tracing_subscriber::registry()
.with(SpanIdLayer::default())
Expand Down

0 comments on commit ece5069

Please sign in to comment.