Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add initial metrics for p2p comms #435

Merged
merged 9 commits into from
Feb 23, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ resolver = "2"

members = [
"atoma-confidential",
"atoma-daemon",
"atoma-daemon",
"atoma-p2p",
"atoma-service",
"atoma-state",
Expand Down Expand Up @@ -51,6 +51,9 @@ metrics = "0.23"
metrics-exporter-prometheus = "0.14.0"
nvml-wrapper = "0.10.0"
once_cell = "1.20.3"
opentelemetry = "0.27.0"
opentelemetry_sdk = "0.27.0"
opentelemetry-otlp = "0.27.0"
prometheus = "0.13.4"
rand = "0.8.5"
reqwest = "0.12.1"
Expand All @@ -75,6 +78,7 @@ tower-http = "0.6.2"
tracing = "0.1.40"
tracing-appender = "0.2.3"
tracing-subscriber = "0.3.18"
tracing-opentelemetry = "0.28.0"
url = "2.5.4"
utoipa = "5.1.1"
utoipa-swagger-ui = "8.0.1"
Expand Down
8 changes: 4 additions & 4 deletions atoma-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ serde_yaml = { workspace = true }
sui-sdk = { workspace = true }
tokio = { workspace = true }
once_cell = "1.20"
opentelemetry = { version = "0.27.1", features = ["trace", "metrics", "logs"] }
opentelemetry_sdk = { version = "0.27.1", features = [
opentelemetry = { workspace = true, features = ["trace", "metrics", "logs"] }
opentelemetry_sdk = { workspace = true, features = [
"rt-tokio",
"trace",
"metrics",
"logs",
] }
opentelemetry-otlp = { version = "0.27.0", features = [
opentelemetry-otlp = { workspace = true, features = [
"metrics",
"grpc-tonic",
"trace",
Expand All @@ -45,7 +45,7 @@ tracing-subscriber = { workspace = true, features = [
"json",
"local-time",
] }
tracing-opentelemetry = "0.28.0"
tracing-opentelemetry = { workspace = true }
utoipa = { workspace = true, features = ["axum_extras"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }

Expand Down
3 changes: 3 additions & 0 deletions atoma-p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ libp2p = { workspace = true, features = [
"tcp",
"yamux",
"noise",
"metrics",
] }
fastcrypto = { workspace = true }
flume = { workspace = true }
Expand All @@ -36,6 +37,8 @@ tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
url = { workspace = true }
validator = { workspace = true, features = ["derive"] }
opentelemetry = { workspace = true }
once_cell = { workspace = true }

[dev-dependencies]
rand = { workspace = true }
Expand Down
63 changes: 63 additions & 0 deletions atoma-p2p/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use config::ConfigError;
use libp2p::{
gossipsub::{ConfigBuilderError, PublishError, SubscriptionError},
swarm::DialError,
TransportError,
};
use thiserror::Error;

use crate::service::StateManagerEvent;

#[derive(Debug, Error)]
pub enum AtomaP2pNodeError {
#[error("Failed to build gossipsub config: {0}")]
GossipsubConfigError(#[from] ConfigBuilderError),
#[error("Failed to build behaviour: {0}")]
BehaviourBuildError(String),
#[error("Failed to subscribe to topic: {0}")]
GossipsubSubscriptionError(#[from] SubscriptionError),
#[error("Failed to listen on address: {0}")]
SwarmListenOnError(#[from] TransportError<std::io::Error>),
#[error("Failed to dial bootstrap node: {0}")]
BootstrapNodeDialError(#[from] DialError),
#[error("Failed to parse signature: {0}")]
SignatureParseError(String),
#[error("Failed to verify signature: {0}")]
SignatureVerificationError(String),
#[error("Invalid public address: {0}")]
InvalidPublicAddressError(String),
#[error("Failed to parse listen address: {0}")]
ListenAddressParseError(#[from] libp2p::multiaddr::Error),
#[error("Failed to build TCP config: {0}")]
TcpConfigBuildError(#[from] ConfigError),
#[error("Failed to initialize noise encryption: {0}")]
NoiseError(#[from] libp2p::noise::Error),
#[error("Failed to send event to state manager: {0}")]
StateManagerError(#[from] flume::SendError<StateManagerEvent>),
#[error("Failed to sign hashed message, with error: {0}")]
SignatureError(String),
#[error("Failed to publish gossipsub message: {0}")]
GossipsubMessagePublishError(#[from] PublishError),
#[error("Failed to verify node small ID ownership: {0}")]
NodeSmallIdOwnershipVerificationError(String),
#[error("Failed to send usage metrics")]
UsageMetricsSendError,
#[error("Failed to serialize usage metrics: `{0}`")]
UsageMetricsSerializeError(#[from] ciborium::ser::Error<std::io::Error>),
#[error("Failed to deserialize usage metrics: `{0}`")]
UsageMetricsDeserializeError(#[from] ciborium::de::Error<std::io::Error>),
#[error("Failed to parse URL: `{0}`")]
UrlParseError(#[from] url::ParseError),
#[error("Country code is invalid: `{0}`")]
InvalidCountryCodeError(String),
#[error("Validation error: `{0}`")]
ValidationError(#[from] validator::ValidationError),
#[error("Failed to compute usage metrics: `{0}`")]
UsageMetricsComputeError(#[from] crate::metrics::NodeMetricsError),
#[error("Invalid config: `{0}`")]
InvalidConfig(String),
#[error("Invalid message length")]
InvalidMessageLengthError,
#[error("Failed to publish Message: `{0}`")]
PublishError(String),
}
5 changes: 5 additions & 0 deletions atoma-p2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
pub mod config;
pub mod errors;
pub mod metrics;
pub mod service;
pub mod timer;
pub mod types;
pub mod utils;

#[cfg(test)]
mod tests;

pub use config::AtomaP2pNodeConfig;
pub use service::AtomaP2pNode;
Expand Down
180 changes: 180 additions & 0 deletions atoma-p2p/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ use sysinfo::{Networks, System};
use thiserror::Error;
use tracing::instrument;

use once_cell::sync::Lazy;
use opentelemetry::{
global,
metrics::{Counter, Gauge, Histogram, Meter, UpDownCounter},
};

// Add global metrics
static GLOBAL_METER: Lazy<Meter> = Lazy::new(|| global::meter("atoma-node"));

/// Structure to store the usage metrics for the node
///
/// This data is collected from the system and the GPU
Expand Down Expand Up @@ -121,3 +130,174 @@ pub enum NodeMetricsError {
#[error("Failed to convert number of CPUs to u32: {0}")]
TryFromIntError(#[from] std::num::TryFromIntError),
}

pub static TOTAL_DIALS_ATTEMPTED: Lazy<Counter<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_counter("total_dials")
.with_description("The total number of dials attempted")
.with_unit("dials")
.build()
});

pub static TOTAL_DIALS_FAILED: Lazy<Counter<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_counter("total_dials_failed")
.with_description("The total number of dials failed")
.with_unit("dials")
.build()
});

pub static TOTAL_CONNECTIONS: Lazy<Gauge<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_gauge("total_connections")
.with_description("The total number of connections")
.with_unit("connections")
.build()
});

pub static PEERS_CONNECTED: Lazy<Gauge<i64>> = Lazy::new(|| {
GLOBAL_METER
.i64_gauge("peers_connected")
.with_description("The number of peers connected")
.with_unit("peers")
.build()
});

pub static TOTAL_GOSSIPSUB_SUBSCRIPTIONS: Lazy<UpDownCounter<i64>> = Lazy::new(|| {
GLOBAL_METER
.i64_up_down_counter("total_gossipsub_subscriptions")
.with_description("The total number of gossipsub subscriptions")
.with_unit("subscriptions")
.build()
});

pub static TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED: Lazy<Counter<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_counter("total_valid_gossipsub_messages_received")
.with_description("The total number of valid gossipsub messages received")
.with_unit("messages")
.build()
});

pub static TOTAL_GOSSIPSUB_MESSAGES_FORWARDED: Lazy<Counter<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_counter("total_gossipsub_messages_forwarded")
.with_description("The total number of gossipsub messages forwarded")
.with_unit("messages")
.build()
});

pub static TOTAL_GOSSIPSUB_PUBLISHES: Lazy<Counter<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_counter("total_gossipsub_publishes")
.with_description("The total number of gossipsub publishes")
.with_unit("messages")
.build()
});

pub static TOTAL_FAILED_GOSSIPSUB_PUBLISHES: Lazy<Counter<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_counter("total_failed_gossipsub_messages")
.with_description("The total number of failed gossipsub messages")
.with_unit("messages")
.build()
});

pub static TOTAL_INCOMING_CONNECTIONS: Lazy<Gauge<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_gauge("total_incoming_connections")
.with_description("The total number of incoming connections")
.with_unit("connections")
.build()
});

pub static TOTAL_OUTGOING_CONNECTIONS: Lazy<Gauge<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_gauge("total_outgoing_connections")
.with_description("The total number of outgoing connections")
.with_unit("connections")
.build()
});

pub static TOTAL_MDNS_DISCOVERIES: Lazy<Counter<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_counter("total_mdns_discoveries")
.with_description("The total number of mDNS discoveries")
.with_unit("discoveries")
.build()
});

pub static TOTAL_STREAM_INCOMING_BANDWIDTH: Lazy<Gauge<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_gauge("total_stream_bandwidth")
.with_description("The total number of stream bandwidth")
.with_unit("bytes")
.build()
});

pub static TOTAL_STREAM_OUTGOING_BANDWIDTH: Lazy<Gauge<u64>> = Lazy::new(|| {
GLOBAL_METER
.u64_gauge("total_stream_outgoing_bandwidth")
.with_description("The total number of stream outgoing bandwidth")
.with_unit("bytes")
.build()
});

pub static GOSSIP_SCORE_HISTOGRAM: Lazy<Histogram<f64>> = Lazy::new(|| {
GLOBAL_METER
.f64_histogram("gossip_score_histogram")
.with_description("The histogram of gossip scores")
.with_unit("score")
.build()
});

pub struct NetworkMetrics {
networks: Networks,
bytes_received: Gauge<u64>,
bytes_transmitted: Gauge<u64>,
}

impl Default for NetworkMetrics {
fn default() -> Self {
let networks = Networks::new_with_refreshed_list();

let bytes_received = GLOBAL_METER
.u64_gauge("total_bytes_received")
.with_description("The total number of bytes received")
.with_unit("bytes")
.build();

let bytes_transmitted = GLOBAL_METER
.u64_gauge("total_bytes_transmitted")
.with_description("The total number of bytes transmitted")
.with_unit("bytes")
.build();

Self {
networks,
bytes_received,
bytes_transmitted,
}
}
}

impl NetworkMetrics {
pub fn update_metrics(&mut self) {
self.networks.refresh(true);

let total_received = self
.networks
.values()
.map(sysinfo::NetworkData::total_received)
.sum();

let total_transmitted = self
.networks
.values()
.map(sysinfo::NetworkData::total_transmitted)
.sum();

self.bytes_received.record(total_received, &[]); // Empty attributes array is fine since we're just recording a global metric
self.bytes_transmitted.record(total_transmitted, &[]);
}
}
Loading
Loading