From bf8dba7245b0fbf637d7de118cbca394ca35dbdc Mon Sep 17 00:00:00 2001 From: chad Date: Fri, 21 Feb 2025 16:38:59 -0500 Subject: [PATCH 1/6] feat: add initial metrics for p2p comms --- Cargo.lock | 2 + Cargo.toml | 6 +- atoma-daemon/Cargo.toml | 8 +-- atoma-p2p/Cargo.toml | 2 + atoma-p2p/src/metrics.rs | 140 +++++++++++++++++++++++++++++++++++++++ atoma-p2p/src/service.rs | 34 +++++++++- atoma-service/Cargo.toml | 8 +-- 7 files changed, 190 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7ab2baba..738b7c76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -903,6 +903,8 @@ dependencies = [ "isocountry", "libp2p", "nvml-wrapper", + "once_cell", + "opentelemetry", "rand 0.8.5", "serde", "sui-keys", diff --git a/Cargo.toml b/Cargo.toml index 84421997..afce581e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ resolver = "2" members = [ "atoma-confidential", - "atoma-daemon", + "atoma-daemon", "atoma-p2p", "atoma-service", "atoma-state", @@ -51,6 +51,9 @@ metrics = "0.23" metrics-exporter-prometheus = "0.14.0" nvml-wrapper = "0.10.0" once_cell = "1.20.3" +opentelemetry = "0.27.0" +opentelemetry_sdk = "0.27.0" +opentelemetry-otlp = "0.27.0" prometheus = "0.13.4" rand = "0.8.5" reqwest = "0.12.1" @@ -75,6 +78,7 @@ tower-http = "0.6.2" tracing = "0.1.40" tracing-appender = "0.2.3" tracing-subscriber = "0.3.18" +tracing-opentelemetry = "0.28.0" url = "2.5.4" utoipa = "5.1.1" utoipa-swagger-ui = "8.0.1" diff --git a/atoma-daemon/Cargo.toml b/atoma-daemon/Cargo.toml index f97c3791..f93c85ad 100644 --- a/atoma-daemon/Cargo.toml +++ b/atoma-daemon/Cargo.toml @@ -24,14 +24,14 @@ serde_yaml = { workspace = true } sui-sdk = { workspace = true } tokio = { workspace = true } once_cell = "1.20" -opentelemetry = { version = "0.27.1", features = ["trace", "metrics", "logs"] } -opentelemetry_sdk = { version = "0.27.1", features = [ +opentelemetry = { workspace = true, features = ["trace", "metrics", "logs"] } +opentelemetry_sdk = { workspace = true, features = [ "rt-tokio", "trace", "metrics", "logs", ] } -opentelemetry-otlp = { version = "0.27.0", features = [ +opentelemetry-otlp = { workspace = true, features = [ "metrics", "grpc-tonic", "trace", @@ -45,7 +45,7 @@ tracing-subscriber = { workspace = true, features = [ "json", "local-time", ] } -tracing-opentelemetry = "0.28.0" +tracing-opentelemetry = { workspace = true } utoipa = { workspace = true, features = ["axum_extras"] } utoipa-swagger-ui = { workspace = true, features = ["axum"] } diff --git a/atoma-p2p/Cargo.toml b/atoma-p2p/Cargo.toml index 6908bd8d..add4bfe9 100644 --- a/atoma-p2p/Cargo.toml +++ b/atoma-p2p/Cargo.toml @@ -36,6 +36,8 @@ tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } url = { workspace = true } validator = { workspace = true, features = ["derive"] } +opentelemetry = { workspace = true } +once_cell = { workspace = true } [dev-dependencies] rand = { workspace = true } diff --git a/atoma-p2p/src/metrics.rs b/atoma-p2p/src/metrics.rs index cf017dfa..55911616 100644 --- a/atoma-p2p/src/metrics.rs +++ b/atoma-p2p/src/metrics.rs @@ -8,6 +8,15 @@ use sysinfo::{Networks, System}; use thiserror::Error; use tracing::instrument; +use once_cell::sync::Lazy; +use opentelemetry::{ + global, + metrics::{Counter, Gauge, Histogram, Meter, UpDownCounter}, +}; + +// Add global metrics +static GLOBAL_METER: Lazy = Lazy::new(|| global::meter("atoma-node")); + /// Structure to store the usage metrics for the node /// /// This data is collected from the system and the GPU @@ -121,3 +130,134 @@ pub enum NodeMetricsError { #[error("Failed to convert number of CPUs to u32: {0}")] TryFromIntError(#[from] std::num::TryFromIntError), } + +pub static TOTAL_DIALS_ATTEMPTED: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_counter("total_dials") + .with_description("The total number of dials attempted") + .with_unit("dials") + .build() +}); + +pub static TOTAL_DIALS_FAILED: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_counter("total_dials_failed") + .with_description("The total number of dials failed") + .with_unit("dials") + .build() +}); + +pub static TOTAL_CONNECTIONS: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_gauge("total_connections") + .with_description("The total number of connections") + .with_unit("connections") + .build() +}); + +pub static TOTAL_STREAMS: Lazy> = Lazy::new(|| { + GLOBAL_METER + .i64_gauge("total_streams") + .with_description("The total number of streams") + .with_unit("streams") + .build() +}); + +pub static PEERS_CONNECTED: Lazy> = Lazy::new(|| { + GLOBAL_METER + .i64_gauge("peers_connected") + .with_description("The number of peers connected") + .with_unit("peers") + .build() +}); + +pub static TOTAL_GOSSIPSUB_SUBSCRIPTIONS: Lazy> = Lazy::new(|| { + GLOBAL_METER + .i64_up_down_counter("total_gossipsub_subscriptions") + .with_description("The total number of gossipsub subscriptions") + .with_unit("subscriptions") + .build() +}); + +pub static TOTAL_VALID_GOSSIPSUB_MESSAGES_RECEIVED: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_counter("total_valid_gossipsub_messages_received") + .with_description("The total number of valid gossipsub messages received") + .with_unit("messages") + .build() +}); + +pub static TOTAL_GOSSIPSUB_MESSAGES_FORWARDED: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_counter("total_gossipsub_messages_forwarded") + .with_description("The total number of gossipsub messages forwarded") + .with_unit("messages") + .build() +}); + +pub static TOTAL_FAILED_GOSSIPSUB_MESSAGES: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_counter("total_failed_gossipsub_messages") + .with_description("The total number of failed gossipsub messages") + .with_unit("messages") + .build() +}); + +pub static GOSSIP_SCORE_HISTOGRAM: Lazy> = Lazy::new(|| { + GLOBAL_METER + .f64_histogram("gossip_score_histogram") + .with_description("The histogram of gossip scores") + .with_unit("score") + .build() +}); + +pub struct NetworkMetrics { + networks: Networks, + bytes_received: Gauge, + bytes_transmitted: Gauge, +} + +impl Default for NetworkMetrics { + fn default() -> Self { + let networks = Networks::new_with_refreshed_list(); + + let bytes_received = GLOBAL_METER + .u64_gauge("total_bytes_received") + .with_description("The total number of bytes received") + .with_unit("bytes") + .build(); + + let bytes_transmitted = GLOBAL_METER + .u64_gauge("total_bytes_transmitted") + .with_description("The total number of bytes transmitted") + .with_unit("bytes") + .build(); + + Self { + networks, + bytes_received, + bytes_transmitted, + } + } +} + +impl NetworkMetrics { + pub fn update_metrics(&mut self) { + self.networks.refresh(true); + + let total_received = self + .networks + .values() + .map(sysinfo::NetworkData::total_received) + .sum(); + + let total_transmitted = self + .networks + .values() + .map(sysinfo::NetworkData::total_transmitted) + .sum(); + + self.bytes_received.record(total_received, &[]); // Empty attributes array is fine since we're just recording a global metric + self.bytes_transmitted.record(total_transmitted, &[]); + } +} diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index 7aaa0575..12d414b9 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -1,5 +1,7 @@ +use crate::metrics::TOTAL_GOSSIPSUB_SUBSCRIPTIONS; use crate::{ config::AtomaP2pNodeConfig, + metrics::{NetworkMetrics, TOTAL_CONNECTIONS, TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED}, timer::usage_metrics_timer_task, types::{AtomaP2pEvent, NodeMessage, SerializeWithSignature, SignedNodeMessage}, }; @@ -12,6 +14,7 @@ use libp2p::{ swarm::{DialError, NetworkBehaviour, SwarmEvent}, tcp, yamux, PeerId, Swarm, SwarmBuilder, TransportError, }; +use opentelemetry::KeyValue; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use sui_keys::keystore::{AccountKeystore, FileBasedKeystore}; @@ -84,6 +87,9 @@ pub struct AtomaP2pNode { /// participating nodes, neither any public URL of clients. That said, a node must /// always share its own public URL with the peers in the network. is_client: bool, + + /// Add network metrics + network_metrics: NetworkMetrics, } impl AtomaP2pNode { @@ -322,6 +328,8 @@ impl AtomaP2pNode { usage_metrics_tx, ); + let network_metrics = NetworkMetrics::default(); + Ok(Self { keystore, swarm, @@ -329,6 +337,7 @@ impl AtomaP2pNode { state_manager_sender, usage_metrics_rx, is_client, + network_metrics, }) } @@ -416,8 +425,16 @@ impl AtomaP2pNode { mut self, mut shutdown_signal: watch::Receiver, ) -> Result<(), AtomaP2pNodeError> { + // Create a metrics update interval + let mut metrics_interval = tokio::time::interval(std::time::Duration::from_secs(15)); + loop { tokio::select! { + // Add metrics interval to the select + _ = metrics_interval.tick() => { + self.network_metrics.update_metrics(); + } + event = self.swarm.select_next_some() => { match event { SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Message { @@ -426,8 +443,11 @@ impl AtomaP2pNode { propagation_source, })) => { match self.handle_gossipsub_message(&message.data, &message_id, &propagation_source).await { - Ok(()) => {} + Ok(()) => { + + } Err(e) => { + error!( target = "atoma-p2p", event = "gossipsub_message_error", @@ -441,6 +461,7 @@ impl AtomaP2pNode { peer_id, topic, })) => { + TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(1, &[KeyValue::new("topic", topic.to_string())]); debug!( target = "atoma-p2p", event = "gossipsub_subscribed", @@ -453,6 +474,7 @@ impl AtomaP2pNode { peer_id, topic, })) => { + TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(-1, &[KeyValue::new("topic", topic.to_string())]); debug!( target = "atoma-p2p", event = "gossipsub_unsubscribed", @@ -504,6 +526,7 @@ impl AtomaP2pNode { connection_id, .. } => { + TOTAL_CONNECTIONS.record(u64::from(num_established.get()), &[KeyValue::new("peerId", peer_id.to_string())]); debug!( target = "atoma-p2p", event = "peer_connection_established", @@ -517,8 +540,10 @@ impl AtomaP2pNode { SwarmEvent::ConnectionClosed { peer_id, connection_id, + num_established, .. } => { + TOTAL_CONNECTIONS.record(u64::from(num_established), &[]); debug!( target = "atoma-p2p", event = "peer_connection_closed", @@ -564,6 +589,13 @@ impl AtomaP2pNode { connection_id = %connection_id, "Dialing peer" ); + TOTAL_DIALS_ATTEMPTED.add(1, &[]); + } + SwarmEvent::OutgoingConnectionError { + peer_id, + .. + } => { + TOTAL_DIALS_FAILED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); } _ => {} } diff --git a/atoma-service/Cargo.toml b/atoma-service/Cargo.toml index 5cc07680..75e30da0 100644 --- a/atoma-service/Cargo.toml +++ b/atoma-service/Cargo.toml @@ -29,14 +29,14 @@ hyper = { workspace = true } isocountry = { workspace = true } lazy_static = { workspace = true } once_cell = { workspace = true } -opentelemetry = { version = "0.27.0", features = ["trace", "metrics", "logs"] } -opentelemetry_sdk = { version = "0.27.0", features = [ +opentelemetry = { workspace = true, features = ["trace", "metrics", "logs"] } +opentelemetry_sdk = { workspace = true, features = [ "rt-tokio", "trace", "metrics", "logs", ] } -opentelemetry-otlp = { version = "0.27.0", features = [ +opentelemetry-otlp = { workspace = true, features = [ "metrics", "grpc-tonic", "trace", @@ -63,7 +63,7 @@ tracing-subscriber = { workspace = true, features = [ "json", "local-time", ] } -tracing-opentelemetry = "0.28.0" +tracing-opentelemetry = { workspace = true } url = { workspace = true } utoipa = { workspace = true, features = ["axum_extras"] } utoipa-swagger-ui = { workspace = true, features = ["axum"] } From 22aa16bce121199a76f5ed1897cb35ec7a3af094 Mon Sep 17 00:00:00 2001 From: chad Date: Fri, 21 Feb 2025 18:40:57 -0500 Subject: [PATCH 2/6] refactor: modularize files for easy of use --- atoma-p2p/Cargo.toml | 1 + atoma-p2p/src/errors.rs | 61 +++ atoma-p2p/src/lib.rs | 5 + atoma-p2p/src/metrics.rs | 32 +- atoma-p2p/src/service.rs | 778 ++------------------------------------- atoma-p2p/src/tests.rs | 356 ++++++++++++++++++ atoma-p2p/src/timer.rs | 4 +- atoma-p2p/src/types.rs | 2 +- atoma-p2p/src/utils.rs | 339 +++++++++++++++++ deny.toml | 2 + 10 files changed, 813 insertions(+), 767 deletions(-) create mode 100644 atoma-p2p/src/errors.rs create mode 100644 atoma-p2p/src/tests.rs create mode 100644 atoma-p2p/src/utils.rs diff --git a/atoma-p2p/Cargo.toml b/atoma-p2p/Cargo.toml index add4bfe9..388b1c4e 100644 --- a/atoma-p2p/Cargo.toml +++ b/atoma-p2p/Cargo.toml @@ -22,6 +22,7 @@ libp2p = { workspace = true, features = [ "tcp", "yamux", "noise", + "metrics", ] } fastcrypto = { workspace = true } flume = { workspace = true } diff --git a/atoma-p2p/src/errors.rs b/atoma-p2p/src/errors.rs new file mode 100644 index 00000000..602fa0b5 --- /dev/null +++ b/atoma-p2p/src/errors.rs @@ -0,0 +1,61 @@ +use config::ConfigError; +use libp2p::{ + gossipsub::{ConfigBuilderError, PublishError, SubscriptionError}, + swarm::DialError, + TransportError, +}; +use thiserror::Error; + +use crate::service::StateManagerEvent; + +#[derive(Debug, Error)] +pub enum AtomaP2pNodeError { + #[error("Failed to build gossipsub config: {0}")] + GossipsubConfigError(#[from] ConfigBuilderError), + #[error("Failed to build behaviour: {0}")] + BehaviourBuildError(String), + #[error("Failed to subscribe to topic: {0}")] + GossipsubSubscriptionError(#[from] SubscriptionError), + #[error("Failed to listen on address: {0}")] + SwarmListenOnError(#[from] TransportError), + #[error("Failed to dial bootstrap node: {0}")] + BootstrapNodeDialError(#[from] DialError), + #[error("Failed to parse signature: {0}")] + SignatureParseError(String), + #[error("Failed to verify signature: {0}")] + SignatureVerificationError(String), + #[error("Invalid public address: {0}")] + InvalidPublicAddressError(String), + #[error("Failed to parse listen address: {0}")] + ListenAddressParseError(#[from] libp2p::multiaddr::Error), + #[error("Failed to build TCP config: {0}")] + TcpConfigBuildError(#[from] ConfigError), + #[error("Failed to initialize noise encryption: {0}")] + NoiseError(#[from] libp2p::noise::Error), + #[error("Failed to send event to state manager: {0}")] + StateManagerError(#[from] flume::SendError), + #[error("Failed to sign hashed message, with error: {0}")] + SignatureError(String), + #[error("Failed to publish gossipsub message: {0}")] + GossipsubMessagePublishError(#[from] PublishError), + #[error("Failed to verify node small ID ownership: {0}")] + NodeSmallIdOwnershipVerificationError(String), + #[error("Failed to send usage metrics")] + UsageMetricsSendError, + #[error("Failed to serialize usage metrics: `{0}`")] + UsageMetricsSerializeError(#[from] ciborium::ser::Error), + #[error("Failed to deserialize usage metrics: `{0}`")] + UsageMetricsDeserializeError(#[from] ciborium::de::Error), + #[error("Failed to parse URL: `{0}`")] + UrlParseError(#[from] url::ParseError), + #[error("Country code is invalid: `{0}`")] + InvalidCountryCodeError(String), + #[error("Validation error: `{0}`")] + ValidationError(#[from] validator::ValidationError), + #[error("Failed to compute usage metrics: `{0}`")] + UsageMetricsComputeError(#[from] crate::metrics::NodeMetricsError), + #[error("Invalid config: `{0}`")] + InvalidConfig(String), + #[error("Invalid message length")] + InvalidMessageLengthError, +} diff --git a/atoma-p2p/src/lib.rs b/atoma-p2p/src/lib.rs index 44ae2b1e..301cb141 100644 --- a/atoma-p2p/src/lib.rs +++ b/atoma-p2p/src/lib.rs @@ -1,8 +1,13 @@ pub mod config; +pub mod errors; pub mod metrics; pub mod service; pub mod timer; pub mod types; +pub mod utils; + +#[cfg(test)] +mod tests; pub use config::AtomaP2pNodeConfig; pub use service::AtomaP2pNode; diff --git a/atoma-p2p/src/metrics.rs b/atoma-p2p/src/metrics.rs index 55911616..9e985a0f 100644 --- a/atoma-p2p/src/metrics.rs +++ b/atoma-p2p/src/metrics.rs @@ -155,14 +155,6 @@ pub static TOTAL_CONNECTIONS: Lazy> = Lazy::new(|| { .build() }); -pub static TOTAL_STREAMS: Lazy> = Lazy::new(|| { - GLOBAL_METER - .i64_gauge("total_streams") - .with_description("The total number of streams") - .with_unit("streams") - .build() -}); - pub static PEERS_CONNECTED: Lazy> = Lazy::new(|| { GLOBAL_METER .i64_gauge("peers_connected") @@ -203,6 +195,30 @@ pub static TOTAL_FAILED_GOSSIPSUB_MESSAGES: Lazy> = Lazy::new(|| { .build() }); +pub static TOTAL_INCOMING_CONNECTIONS: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_gauge("total_incoming_connections") + .with_description("The total number of incoming connections") + .with_unit("connections") + .build() +}); + +pub static TOTAL_STREAM_INCOMING_BANDWIDTH: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_gauge("total_stream_bandwidth") + .with_description("The total number of stream bandwidth") + .with_unit("bytes") + .build() +}); + +pub static TOTAL_STREAM_OUTGOING_BANDWIDTH: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_gauge("total_stream_outgoing_bandwidth") + .with_description("The total number of stream outgoing bandwidth") + .with_unit("bytes") + .build() +}); + pub static GOSSIP_SCORE_HISTOGRAM: Lazy> = Lazy::new(|| { GLOBAL_METER .f64_histogram("gossip_score_histogram") diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index 12d414b9..4bdc7426 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -1,24 +1,24 @@ -use crate::metrics::TOTAL_GOSSIPSUB_SUBSCRIPTIONS; +use crate::errors::AtomaP2pNodeError; +use crate::metrics::{PEERS_CONNECTED, TOTAL_GOSSIPSUB_SUBSCRIPTIONS}; +use crate::utils::validate_signed_node_message; use crate::{ config::AtomaP2pNodeConfig, metrics::{NetworkMetrics, TOTAL_CONNECTIONS, TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED}, timer::usage_metrics_timer_task, types::{AtomaP2pEvent, NodeMessage, SerializeWithSignature, SignedNodeMessage}, }; -use config::ConfigError; use flume::Sender; use futures::StreamExt; use libp2p::{ - gossipsub::{self, ConfigBuilderError}, + gossipsub::{self}, identify, kad, mdns, noise, - swarm::{DialError, NetworkBehaviour, SwarmEvent}, - tcp, yamux, PeerId, Swarm, SwarmBuilder, TransportError, + swarm::{NetworkBehaviour, SwarmEvent}, + tcp, yamux, PeerId, Swarm, SwarmBuilder, }; use opentelemetry::KeyValue; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use sui_keys::keystore::{AccountKeystore, FileBasedKeystore}; -use thiserror::Error; use tokio::{ sync::{mpsc::UnboundedReceiver, oneshot, watch}, task::JoinHandle, @@ -28,7 +28,7 @@ use tracing::{debug, error, instrument}; /// The topic that the P2P network will use to gossip messages const METRICS_GOSPUBSUB_TOPIC: &str = "atoma-p2p-usage-metrics"; -type StateManagerEvent = (AtomaP2pEvent, Option>); +pub type StateManagerEvent = (AtomaP2pEvent, Option>); /// Network behavior configuration for the P2P Atoma node, combining multiple libp2p protocols. /// @@ -90,6 +90,10 @@ pub struct AtomaP2pNode { /// Add network metrics network_metrics: NetworkMetrics, + + /// Add registry field + #[allow(dead_code)] + metrics_registry: libp2p::metrics::Registry, } impl AtomaP2pNode { @@ -187,6 +191,8 @@ impl AtomaP2pNode { "Invalid config, either public_url, node_small_id or country is not set, this should never happen".to_string(), )); } + let mut metrics_registry = libp2p::metrics::Registry::default(); + let mut swarm = SwarmBuilder::with_new_identity() .with_tokio() .with_tcp( @@ -195,6 +201,7 @@ impl AtomaP2pNode { yamux::Config::default, )? .with_quic() + .with_bandwidth_metrics(&mut metrics_registry) .with_behaviour(|key| { // To content-address message, we can take the hash of message and use it as an ID. let message_id_fn = |message: &gossipsub::Message| { @@ -338,6 +345,7 @@ impl AtomaP2pNode { usage_metrics_rx, is_client, network_metrics, + metrics_registry, }) } @@ -433,6 +441,11 @@ impl AtomaP2pNode { // Add metrics interval to the select _ = metrics_interval.tick() => { self.network_metrics.update_metrics(); + + + #[allow(clippy::cast_possible_wrap)] + let connected_peers = self.swarm.connected_peers().count() as i64; + PEERS_CONNECTED.record(connected_peers, &[KeyValue::new("peerId", self.swarm.local_peer_id().to_base58())]); } event = self.swarm.select_next_some() => { @@ -589,7 +602,7 @@ impl AtomaP2pNode { connection_id = %connection_id, "Dialing peer" ); - TOTAL_DIALS_ATTEMPTED.add(1, &[]); + TOTAL_DIALS_ATTEMPTED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); } SwarmEvent::OutgoingConnectionError { peer_id, @@ -740,7 +753,7 @@ impl AtomaP2pNode { ); let node_message = &signed_node_message.node_message; let node_message_hash = blake3::hash(&message_data[signature_len..]); - let message_acceptance = match utils::validate_signed_node_message( + let message_acceptance = match validate_signed_node_message( node_message, node_message_hash.as_bytes(), &signed_node_message.signature, @@ -900,750 +913,3 @@ impl AtomaP2pNode { Ok(()) } } - -#[derive(Debug, Error)] -pub enum AtomaP2pNodeError { - #[error("Failed to build gossipsub config: {0}")] - GossipsubConfigError(#[from] ConfigBuilderError), - #[error("Failed to build behaviour: {0}")] - BehaviourBuildError(String), - #[error("Failed to subscribe to topic: {0}")] - GossipsubSubscriptionError(#[from] gossipsub::SubscriptionError), - #[error("Failed to listen on address: {0}")] - SwarmListenOnError(#[from] TransportError), - #[error("Failed to dial bootstrap node: {0}")] - BootstrapNodeDialError(#[from] DialError), - #[error("Failed to parse signature: {0}")] - SignatureParseError(String), - #[error("Failed to verify signature: {0}")] - SignatureVerificationError(String), - #[error("Invalid public address: {0}")] - InvalidPublicAddressError(String), - #[error("Failed to parse listen address: {0}")] - ListenAddressParseError(#[from] libp2p::multiaddr::Error), - #[error("Failed to build TCP config: {0}")] - TcpConfigBuildError(#[from] ConfigError), - #[error("Failed to initialize noise encryption: {0}")] - NoiseError(#[from] libp2p::noise::Error), - #[error("Failed to send event to state manager: {0}")] - StateManagerError(#[from] flume::SendError), - #[error("Failed to sign hashed message, with error: {0}")] - SignatureError(String), - #[error("Failed to publish gossipsub message: {0}")] - GossipsubMessagePublishError(#[from] gossipsub::PublishError), - #[error("Failed to verify node small ID ownership: {0}")] - NodeSmallIdOwnershipVerificationError(String), - #[error("Failed to send usage metrics")] - UsageMetricsSendError, - #[error("Failed to serialize usage metrics: `{0}`")] - UsageMetricsSerializeError(#[from] ciborium::ser::Error), - #[error("Failed to deserialize usage metrics: `{0}`")] - UsageMetricsDeserializeError(#[from] ciborium::de::Error), - #[error("Failed to parse URL: `{0}`")] - UrlParseError(#[from] url::ParseError), - #[error("Country code is invalid: `{0}`")] - InvalidCountryCodeError(String), - #[error("Validation error: `{0}`")] - ValidationError(#[from] validator::ValidationError), - #[error("Failed to compute usage metrics: `{0}`")] - UsageMetricsComputeError(#[from] crate::metrics::NodeMetricsError), - #[error("Invalid config: `{0}`")] - InvalidConfig(String), - #[error("Invalid message length")] - InvalidMessageLengthError, -} - -mod utils { - use fastcrypto::{ - ed25519::{Ed25519PublicKey, Ed25519Signature}, - secp256k1::{Secp256k1PublicKey, Secp256k1Signature}, - secp256r1::{Secp256r1PublicKey, Secp256r1Signature}, - traits::{ToFromBytes as FastCryptoToFromBytes, VerifyingKey}, - }; - use flume::Sender; - use sui_sdk::types::{ - base_types::SuiAddress, - crypto::{PublicKey, Signature, SignatureScheme, SuiSignature, ToFromBytes}, - }; - use tokio::sync::oneshot; - use tracing::{error, instrument}; - use url::Url; - - use crate::{types::NodeMessage, AtomaP2pEvent}; - - use super::{AtomaP2pNodeError, StateManagerEvent}; - - /// The threshold for considering a timestamp as expired - const EXPIRED_TIMESTAMP_THRESHOLD: u64 = 10 * 60; // 10 minutes - - /// Validates a UsageMetrics message by checking the URL format and timestamp freshness. - /// - /// This function performs two key validations: - /// 1. Ensures the node_public_url is a valid URL format - /// 2. Verifies the timestamp is within an acceptable time window relative to the current time - /// - /// # Arguments - /// - /// * `response` - The `UsageMetrics` containing the node_public_url and timestamp to validate - /// - /// # Returns - /// - /// * `Ok(())` - If all validation checks pass - /// * `Err(AtomaP2pNodeError)` - If any validation fails - /// - /// # Errors - /// - /// Returns `AtomaP2pNodeError::InvalidPublicAddressError` when: - /// * The URL is invalid or malformed - /// * The timestamp is older than `EXPIRED_TIMESTAMP_THRESHOLD` (10 minutes) - /// * The timestamp is in the future - /// - /// # Example - /// - /// ```rust,ignore - /// let metrics = UsageMetrics { - /// node_public_url: "https://example.com".to_string(), - /// timestamp: std::time::Instant::now().elapsed().as_secs(), - /// signature: vec![], - /// node_small_id: 1, - /// }; - /// - /// match validate_public_address_message(&metrics) { - /// Ok(()) => println!("UsageMetrics is valid"), - /// Err(e) => println!("Invalid UsageMetrics: {}", e), - /// } - /// ``` - /// - /// # Security Considerations - /// - /// This validation helps prevent: - /// * Invalid or malicious URLs from being propagated - /// * Replay attacks by enforcing timestamp freshness - /// * Future timestamp manipulation attempts - #[instrument(level = "debug", skip_all)] - pub fn validate_node_message_country_url_timestamp( - node_message: &NodeMessage, - ) -> Result<(), AtomaP2pNodeError> { - let now = std::time::Instant::now().elapsed().as_secs(); - - let country = node_message.node_metadata.country.as_str(); - validate_country_code(country)?; - - // Check if the URL is valid - let _usage_metrics_url = - Url::parse(&node_message.node_metadata.node_public_url).map_err(|e| { - error!( - target = "atoma-p2p", - event = "invalid_url_format", - error = %e, - "Invalid URL format, received address: {}", - node_message.node_metadata.node_public_url - ); - AtomaP2pNodeError::UrlParseError(e) - })?; - - // Check if the timestamp is within a reasonable time frame - if now < node_message.node_metadata.timestamp - || now > node_message.node_metadata.timestamp + EXPIRED_TIMESTAMP_THRESHOLD - { - error!( - target = "atoma-p2p", - event = "invalid_timestamp", - "Timestamp is invalid, timestamp: {}, now: {}", - node_message.node_metadata.timestamp, - now - ); - return Err(AtomaP2pNodeError::InvalidPublicAddressError( - "Timestamp is too far in the past".to_string(), - )); - } - - Ok(()) - } - - /// Custom validation function for ISO 3166-1 alpha-2 country codes - fn validate_country_code(code: &str) -> Result<(), AtomaP2pNodeError> { - isocountry::CountryCode::for_alpha2(code).map_err(|_| { - AtomaP2pNodeError::InvalidCountryCodeError("Country code is invalid.".to_string()) - })?; - Ok(()) - } - - /// Verifies a cryptographic signature against a message hash using the signature scheme embedded in the signature. - /// - /// This function supports multiple signature schemes: - /// - ED25519 - /// - Secp256k1 - /// - Secp256r1 - /// - /// # Arguments - /// - /// * `signature_bytes` - Raw bytes of the signature. This should include both the signature data and metadata - /// that allows extracting the public key and signature scheme. - /// * `body_hash` - A 32-byte array containing the hash of the message that was signed. This is typically - /// produced using blake3 or another cryptographic hash function. - /// - /// # Returns - /// - /// * `Ok(())` - If the signature is valid for the given message hash - /// * `Err(AtomaP2pNodeError)` - If any step of the verification process fails - /// - /// # Errors - /// - /// This function will return an error in the following situations: - /// * `SignatureParseError` - If the signature bytes cannot be parsed into a valid signature structure - /// or if the public key cannot be extracted from the signature - /// * `SignatureVerificationError` - If the signature verification fails (indicating the signature is invalid - /// for the given message) - /// * `SignatureParseError` - If the signature uses an unsupported signature scheme - /// - /// # Example - /// - /// ```rust,ignore - /// use your_crate::utils::verify_signature; - /// - /// let message = b"Hello, world!"; - /// let message_hash = blake3::hash(message).as_bytes(); - /// let signature_bytes = // ... obtained from somewhere ... - /// - /// match verify_signature(&signature_bytes, &message_hash) { - /// Ok(()) => println!("Signature is valid!"), - /// Err(e) => println!("Signature verification failed: {}", e), - /// } - /// ``` - #[instrument(level = "trace", skip_all)] - pub fn verify_signature( - signature_bytes: &[u8], - body_hash: &[u8; 32], - ) -> Result<(), AtomaP2pNodeError> { - let signature = Signature::from_bytes(signature_bytes).map_err(|e| { - error!("Failed to parse signature"); - AtomaP2pNodeError::SignatureParseError(e.to_string()) - })?; - let public_key_bytes = signature.public_key_bytes(); - let signature_scheme = signature.scheme(); - let public_key = - PublicKey::try_from_bytes(signature_scheme, public_key_bytes).map_err(|e| { - error!("Failed to extract public key from bytes, with error: {e}"); - AtomaP2pNodeError::SignatureParseError(e.to_string()) - })?; - let signature_bytes = signature.signature_bytes(); - - match signature_scheme { - SignatureScheme::ED25519 => { - let public_key = Ed25519PublicKey::from_bytes(public_key.as_ref()).unwrap(); - let signature = Ed25519Signature::from_bytes(signature_bytes).unwrap(); - public_key.verify(body_hash, &signature).map_err(|e| { - error!("Failed to verify signature"); - AtomaP2pNodeError::SignatureVerificationError(e.to_string()) - })?; - } - SignatureScheme::Secp256k1 => { - let public_key = Secp256k1PublicKey::from_bytes(public_key.as_ref()).unwrap(); - let signature = Secp256k1Signature::from_bytes(signature_bytes).unwrap(); - public_key.verify(body_hash, &signature).map_err(|e| { - error!("Failed to verify signature"); - AtomaP2pNodeError::SignatureVerificationError(e.to_string()) - })?; - } - SignatureScheme::Secp256r1 => { - let public_key = Secp256r1PublicKey::from_bytes(public_key.as_ref()).unwrap(); - let signature = Secp256r1Signature::from_bytes(signature_bytes).unwrap(); - public_key.verify(body_hash, &signature).map_err(|e| { - error!("Failed to verify signature"); - AtomaP2pNodeError::SignatureVerificationError(e.to_string()) - })?; - } - e => { - error!("Currently unsupported signature scheme, error: {e}"); - return Err(AtomaP2pNodeError::SignatureParseError(e.to_string())); - } - } - Ok(()) - } - - /// Verifies that a node owns its claimed small ID by checking the signature and querying the state manager. - /// - /// This function performs the following steps: - /// 1. Parses the signature to extract the public key - /// 2. Derives the Sui address from the public key - /// 3. Sends a verification request to the state manager - /// 4. Waits for the state manager's response - /// - /// # Arguments - /// - /// * `node_small_id` - The small ID claimed by the node - /// * `signature` - The signature to verify, containing the public key - /// * `state_manager_sender` - Channel to send verification requests to the state manager - /// - /// # Returns - /// - /// * `Ok(())` - If the node owns the small ID - /// * `Err(AtomaP2pNodeError)` - If verification fails at any step - /// - /// # Errors - /// - /// This function can return the following errors: - /// * `SignatureParseError` - If the signature cannot be parsed - /// * `StateManagerError` - If the verification request cannot be sent to the state manager - /// * `NodeSmallIdOwnershipVerificationError` - If the state manager reports the node does not own the ID - /// - /// # Example - /// - /// ```rust,ignore - /// let signature = // ... obtained from node ...; - /// let result = verify_node_small_id_ownership( - /// 42, - /// &signature, - /// state_manager_sender - /// ).await; - /// - /// match result { - /// Ok(()) => println!("Verification succeeded"), - /// Err(e) => println!("Verification failed: {}", e), - /// } - /// ``` - /// - /// # Security Considerations - /// - /// - Uses cryptographic signatures to prevent impersonation - /// - Relies on the state manager's authoritative record of node ownership - /// - Protects against node ID spoofing attacks - #[instrument(level = "debug", skip_all)] - pub async fn verify_node_small_id_ownership( - node_small_id: u64, - signature: &[u8], - state_manager_sender: &Sender, - ) -> Result<(), AtomaP2pNodeError> { - let signature = Signature::from_bytes(signature).map_err(|e| { - error!("Failed to parse signature"); - AtomaP2pNodeError::SignatureParseError(e.to_string()) - })?; - let public_key_bytes = signature.public_key_bytes(); - let public_key = - PublicKey::try_from_bytes(signature.scheme(), public_key_bytes).map_err(|e| { - error!("Failed to extract public key from bytes, with error: {e}"); - AtomaP2pNodeError::SignatureParseError(e.to_string()) - })?; - let sui_address = SuiAddress::from(&public_key); - let (sender, receiver) = oneshot::channel(); - if let Err(e) = state_manager_sender.send(( - AtomaP2pEvent::VerifyNodeSmallIdOwnership { - node_small_id, - sui_address: sui_address.to_string(), - }, - Some(sender), - )) { - error!( - target = "atoma-p2p", - event = "failed_to_send_event_to_state_manager", - error = %e, - "Failed to send event to state manager" - ); - return Err(AtomaP2pNodeError::StateManagerError(e)); - } - match receiver.await { - Ok(result) => { - if result { - Ok(()) - } else { - Err(AtomaP2pNodeError::NodeSmallIdOwnershipVerificationError( - "Node small ID ownership verification failed".to_string(), - )) - } - } - Err(e) => { - error!("Failed to receive result from state manager, with error: {e}"); - Err(AtomaP2pNodeError::NodeSmallIdOwnershipVerificationError( - e.to_string(), - )) - } - } - } - - /// Validates the messages received from the P2P network - /// - /// This function validates the sharing node public addresses messages received from the P2P network by checking the signature and the node small ID ownership. - /// - /// # Arguments - /// - /// * `response` - The message received from the P2P network - /// * `state_manager_sender` - The sender of the state manager - /// - /// # Returns - /// - /// Returns `Ok(())` if the message is valid, or a `AtomaP2pNodeError` if any step fails. - #[instrument(level = "debug", skip_all)] - pub async fn validate_signed_node_message( - node_message: &NodeMessage, - node_message_hash: &[u8; 32], - signature: &[u8], - state_manager_sender: &Sender, - ) -> Result<(), AtomaP2pNodeError> { - // Validate the message's node public URL and timestamp - validate_node_message_country_url_timestamp(node_message)?; - // Verify the signature of the message - verify_signature(signature, node_message_hash)?; - // Verify the node small ID ownership - verify_node_small_id_ownership( - node_message.node_metadata.node_small_id, - signature, - state_manager_sender, - ) - .await?; - Ok(()) - } -} - -#[cfg(test)] -pub mod tests { - use crate::{metrics::NodeMetrics, types::NodeP2pMetadata}; - - use super::*; - use flume::unbounded; - use sui_keys::keystore::InMemKeystore; - - /// Creates a test keystore - /// - /// # Returns - /// - /// Returns an `InMemKeystore` struct - #[must_use] - pub fn create_test_keystore() -> InMemKeystore { - // Create a new in-memory keystore - InMemKeystore::new_insecure_for_tests(1) - } - - /// Creates a test usage metrics message - /// - /// # Arguments - /// - /// * `keystore` - The keystore to use for signing the message - /// * `timestamp_offset` - The offset to add to the current timestamp - /// * `node_small_id` - The small ID of the node - /// - /// # Returns - /// - /// Returns a `SignedNodeMessage` struct - /// - /// # Panics - /// - /// Panics if the usage metrics cannot be serialized - #[must_use] - pub fn create_test_signed_node_message( - keystore: &InMemKeystore, - timestamp_offset: i64, - node_small_id: u64, - ) -> SignedNodeMessage { - let now = std::time::Instant::now().elapsed().as_secs(); - #[allow(clippy::cast_sign_loss)] - #[allow(clippy::cast_possible_wrap)] - let timestamp = (now as i64 + timestamp_offset) as u64; - - let node_public_url = "https://test.example.com".to_string(); - let country = "US".to_string(); - - let node_message = NodeMessage { - node_metadata: NodeP2pMetadata { - node_public_url, - node_small_id, - country, - timestamp, - }, - node_metrics: NodeMetrics::default(), - }; - - let mut node_message_bytes = Vec::new(); - ciborium::into_writer(&node_message, &mut node_message_bytes) - .map_err(|e| { - error!( - target = "atoma-p2p", - event = "serialize_usage_metrics_error", - error = %e, - "Failed to serialize usage metrics" - ); - AtomaP2pNodeError::UsageMetricsSerializeError(e) - }) - .expect("Failed to serialize usage metrics"); - let message_hash = blake3::hash(&node_message_bytes); - - let active_address = keystore.addresses()[0]; - let signature = keystore - .sign_hashed(&active_address, message_hash.as_bytes()) - .expect("Failed to sign message"); - - SignedNodeMessage { - node_message, - signature: signature.as_ref().to_vec(), - } - } - - #[tokio::test] - async fn test_validate_usage_metrics_message_success() { - let keystore = create_test_keystore(); - let (tx, rx) = unbounded(); - - // Create valid usage metrics - let signed_node_message = create_test_signed_node_message(&keystore, 0, 1); - - // Mock successful node small ID ownership verification - tokio::spawn(async move { - let (event, optional_response_sender): (AtomaP2pEvent, Option>) = - rx.recv_async().await.unwrap(); - if let AtomaP2pEvent::VerifyNodeSmallIdOwnership { - node_small_id, - sui_address: _, - } = event - { - assert_eq!(node_small_id, 1); - let response_sender = optional_response_sender.unwrap(); - response_sender.send(true).unwrap(); - } - }); - - let mut node_meessage_bytes = Vec::new(); - ciborium::into_writer(&signed_node_message.node_message, &mut node_meessage_bytes).unwrap(); - let node_message_hash = blake3::hash(&node_meessage_bytes); - // Validation should succeed - let result = utils::validate_signed_node_message( - &signed_node_message.node_message, - node_message_hash.as_bytes(), - &signed_node_message.signature, - &tx, - ) - .await; - result.unwrap(); - // assert!(result.is_ok()); - } - - #[tokio::test] - async fn test_validate_usage_metrics_message_invalid_url() { - let keystore = create_test_keystore(); - let (tx, _rx) = unbounded(); - - // Create base metrics then modify before serialization - let node_message = NodeMessage { - node_metadata: NodeP2pMetadata { - node_public_url: "invalid_url".to_string(), // Direct invalid URL - node_small_id: 1, - country: "US".to_string(), - timestamp: std::time::Instant::now().elapsed().as_secs(), - }, - node_metrics: NodeMetrics::default(), - }; - - let mut bytes = Vec::new(); - ciborium::into_writer(&node_message, &mut bytes).unwrap(); - let hash = blake3::hash(&bytes); - - // Serialize modified message - let signature = keystore - .sign_hashed(&keystore.addresses()[0], hash.as_bytes()) - .unwrap(); - - let bad_metrics = SignedNodeMessage { - node_message, - signature: signature.as_ref().to_vec(), - }; - - let result = utils::validate_signed_node_message( - &bad_metrics.node_message, - hash.as_bytes(), - &bad_metrics.signature, - &tx, - ) - .await; - assert!(matches!(result, Err(AtomaP2pNodeError::UrlParseError(_)))); - } - - #[tokio::test] - async fn test_validate_usage_metrics_message_expired_timestamp() { - let keystore = create_test_keystore(); - let (tx, _rx) = unbounded(); - - // Create metrics with expired timestamp (11 minutes ago) - let signed_node_message = create_test_signed_node_message(&keystore, -(11 * 60), 1); - let node_message = &signed_node_message.node_message; - let mut node_message_bytes = Vec::new(); - ciborium::into_writer(node_message, &mut node_message_bytes).unwrap(); - let node_message_hash = blake3::hash(&node_message_bytes); - - let result = utils::validate_signed_node_message( - node_message, - node_message_hash.as_bytes(), - &signed_node_message.signature, - &tx, - ) - .await; - assert!(matches!( - result, - Err(AtomaP2pNodeError::InvalidPublicAddressError(_)) - )); - } - - #[tokio::test] - async fn test_validate_usage_metrics_message_future_timestamp() { - let keystore = create_test_keystore(); - let (tx, _rx) = unbounded(); - - // Create metrics with future timestamp - let signed_node_message = create_test_signed_node_message(&keystore, 60, 1); - let node_message = &signed_node_message.node_message; - let mut node_message_bytes = Vec::new(); - ciborium::into_writer(node_message, &mut node_message_bytes).unwrap(); - let node_message_hash = blake3::hash(&node_message_bytes); - - let result = utils::validate_signed_node_message( - node_message, - node_message_hash.as_bytes(), - &signed_node_message.signature, - &tx, - ) - .await; - assert!(matches!( - result, - Err(AtomaP2pNodeError::InvalidPublicAddressError(_)) - )); - } - - #[tokio::test] - async fn test_validate_usage_metrics_message_invalid_signature() { - let keystore = create_test_keystore(); - let (tx, _rx) = unbounded(); - - let signed_node_message = create_test_signed_node_message(&keystore, 0, 1); - let node_message = &signed_node_message.node_message; - - // Corrupt the signature part after scheme byte - let scheme_length = 1; // Ed25519 scheme byte - let sig_start = scheme_length; - let mut signature = signed_node_message.signature.clone(); - for byte in &mut signature[sig_start..sig_start + 64] { - *byte = 0xff; - } - - let mut node_message_bytes = Vec::new(); - ciborium::into_writer(&node_message, &mut node_message_bytes).unwrap(); - let node_message_hash = blake3::hash(&node_message_bytes); - - let result = utils::validate_signed_node_message( - &signed_node_message.node_message, - node_message_hash.as_bytes(), - &signature, - &tx, - ) - .await; - assert!(matches!( - result, - Err(AtomaP2pNodeError::SignatureVerificationError(_)) - )); - } - - #[tokio::test] - async fn test_validate_usage_metrics_message_invalid_node_ownership() { - let keystore = create_test_keystore(); - let (tx, rx) = unbounded(); - - // Create valid metrics - let signed_node_message = create_test_signed_node_message(&keystore, 0, 1); - let node_message = &signed_node_message.node_message; - - // Mock failed node small ID ownership verification - tokio::spawn(async move { - let (event, _response_sender) = rx.recv_async().await.unwrap(); - if let AtomaP2pEvent::VerifyNodeSmallIdOwnership { - node_small_id, - sui_address: _, - } = event - { - assert_eq!(node_small_id, 1); - } - }); - - let mut node_message_bytes = Vec::new(); - ciborium::into_writer(node_message, &mut node_message_bytes).unwrap(); - let node_message_hash = blake3::hash(&node_message_bytes); - - let result = utils::validate_signed_node_message( - node_message, - node_message_hash.as_bytes(), - &signed_node_message.signature, - &tx, - ) - .await; - - assert!(matches!( - result, - Err(AtomaP2pNodeError::NodeSmallIdOwnershipVerificationError(_)) - )); - } - - #[tokio::test] - async fn test_validate_usage_metrics_message_state_manager_error() { - let keystore = create_test_keystore(); - let (tx, rx) = unbounded(); - - // Create valid metrics - let signed_node_message = create_test_signed_node_message(&keystore, 0, 1); - let node_message = &signed_node_message.node_message; - - // Mock state manager channel error by dropping the receiver - drop(rx); - - let mut node_message_bytes = Vec::new(); - ciborium::into_writer(node_message, &mut node_message_bytes).unwrap(); - let node_message_hash = blake3::hash(&node_message_bytes); - - let result = utils::validate_signed_node_message( - node_message, - node_message_hash.as_bytes(), - &signed_node_message.signature, - &tx, - ) - .await; - assert!(matches!( - result, - Err(AtomaP2pNodeError::StateManagerError(_)) - )); - } - - #[tokio::test] - async fn test_validate_usage_metrics_message_response_channel_error() { - let keystore = create_test_keystore(); - let (tx, rx) = unbounded(); - - // Create valid metrics - let signed_node_message = create_test_signed_node_message(&keystore, 0, 1); - let node_message = &signed_node_message.node_message; - - // Mock response channel error - tokio::spawn(async move { - let (event, response_sender) = rx.recv_async().await.unwrap(); - if let AtomaP2pEvent::VerifyNodeSmallIdOwnership { - node_small_id, - sui_address: _, - } = event - { - assert_eq!(node_small_id, 1); - // Drop the sender without sending a response - drop(response_sender); - } - }); - - let mut node_message_bytes = Vec::new(); - ciborium::into_writer(node_message, &mut node_message_bytes).unwrap(); - let node_message_hash = blake3::hash(&node_message_bytes); - - let result = utils::validate_signed_node_message( - node_message, - node_message_hash.as_bytes(), - &signed_node_message.signature, - &tx, - ) - .await; - assert!(matches!( - result, - Err(AtomaP2pNodeError::NodeSmallIdOwnershipVerificationError(_)) - )); - } -} diff --git a/atoma-p2p/src/tests.rs b/atoma-p2p/src/tests.rs new file mode 100644 index 00000000..cd212e04 --- /dev/null +++ b/atoma-p2p/src/tests.rs @@ -0,0 +1,356 @@ +use crate::{ + errors::AtomaP2pNodeError, + metrics::NodeMetrics, + types::{NodeMessage, NodeP2pMetadata, SignedNodeMessage}, + utils::validate_signed_node_message, + AtomaP2pEvent, +}; + +use flume::unbounded; +use sui_keys::keystore::{AccountKeystore, InMemKeystore}; +use tokio::sync::oneshot; +use tracing::error; + +/// Creates a test keystore +/// +/// # Returns +/// +/// Returns an `InMemKeystore` struct +#[must_use] +pub fn create_test_keystore() -> InMemKeystore { + // Create a new in-memory keystore + InMemKeystore::new_insecure_for_tests(1) +} + +/// Creates a test usage metrics message +/// +/// # Arguments +/// +/// * `keystore` - The keystore to use for signing the message +/// * `timestamp_offset` - The offset to add to the current timestamp +/// * `node_small_id` - The small ID of the node +/// +/// # Returns +/// +/// Returns a `SignedNodeMessage` struct +/// +/// # Panics +/// +/// Panics if the usage metrics cannot be serialized +#[must_use] +pub fn create_test_signed_node_message( + keystore: &InMemKeystore, + timestamp_offset: i64, + node_small_id: u64, +) -> SignedNodeMessage { + let now = std::time::Instant::now().elapsed().as_secs(); + #[allow(clippy::cast_sign_loss)] + #[allow(clippy::cast_possible_wrap)] + let timestamp = (now as i64 + timestamp_offset) as u64; + + let node_public_url = "https://test.example.com".to_string(); + let country = "US".to_string(); + + let node_message = NodeMessage { + node_metadata: NodeP2pMetadata { + node_public_url, + node_small_id, + country, + timestamp, + }, + node_metrics: NodeMetrics::default(), + }; + + let mut node_message_bytes = Vec::new(); + ciborium::into_writer(&node_message, &mut node_message_bytes) + .map_err(|e| { + error!( + target = "atoma-p2p", + event = "serialize_usage_metrics_error", + error = %e, + "Failed to serialize usage metrics" + ); + AtomaP2pNodeError::UsageMetricsSerializeError(e) + }) + .expect("Failed to serialize usage metrics"); + let message_hash = blake3::hash(&node_message_bytes); + + let active_address = keystore.addresses()[0]; + let signature = keystore + .sign_hashed(&active_address, message_hash.as_bytes()) + .expect("Failed to sign message"); + + SignedNodeMessage { + node_message, + signature: signature.as_ref().to_vec(), + } +} + +#[tokio::test] +async fn test_validate_usage_metrics_message_success() { + let keystore = create_test_keystore(); + let (tx, rx) = unbounded(); + + // Create valid usage metrics + let signed_node_message = create_test_signed_node_message(&keystore, 0, 1); + + // Mock successful node small ID ownership verification + tokio::spawn(async move { + let (event, optional_response_sender): (AtomaP2pEvent, Option>) = + rx.recv_async().await.unwrap(); + if let AtomaP2pEvent::VerifyNodeSmallIdOwnership { + node_small_id, + sui_address: _, + } = event + { + assert_eq!(node_small_id, 1); + let response_sender = optional_response_sender.unwrap(); + response_sender.send(true).unwrap(); + } + }); + + let mut node_meessage_bytes = Vec::new(); + ciborium::into_writer(&signed_node_message.node_message, &mut node_meessage_bytes).unwrap(); + let node_message_hash = blake3::hash(&node_meessage_bytes); + // Validation should succeed + let result = validate_signed_node_message( + &signed_node_message.node_message, + node_message_hash.as_bytes(), + &signed_node_message.signature, + &tx, + ) + .await; + result.unwrap(); + // assert!(result.is_ok()); +} + +#[tokio::test] +async fn test_validate_usage_metrics_message_invalid_url() { + let keystore = create_test_keystore(); + let (tx, _rx) = unbounded(); + + // Create base metrics then modify before serialization + let node_message = NodeMessage { + node_metadata: NodeP2pMetadata { + node_public_url: "invalid_url".to_string(), // Direct invalid URL + node_small_id: 1, + country: "US".to_string(), + timestamp: std::time::Instant::now().elapsed().as_secs(), + }, + node_metrics: NodeMetrics::default(), + }; + + let mut bytes = Vec::new(); + ciborium::into_writer(&node_message, &mut bytes).unwrap(); + let hash = blake3::hash(&bytes); + + // Serialize modified message + let signature = keystore + .sign_hashed(&keystore.addresses()[0], hash.as_bytes()) + .unwrap(); + + let bad_metrics = SignedNodeMessage { + node_message, + signature: signature.as_ref().to_vec(), + }; + + let result = validate_signed_node_message( + &bad_metrics.node_message, + hash.as_bytes(), + &bad_metrics.signature, + &tx, + ) + .await; + assert!(matches!(result, Err(AtomaP2pNodeError::UrlParseError(_)))); +} + +#[tokio::test] +async fn test_validate_usage_metrics_message_expired_timestamp() { + let keystore = create_test_keystore(); + let (tx, _rx) = unbounded(); + + // Create metrics with expired timestamp (11 minutes ago) + let signed_node_message = create_test_signed_node_message(&keystore, -(11 * 60), 1); + let node_message = &signed_node_message.node_message; + let mut node_message_bytes = Vec::new(); + ciborium::into_writer(node_message, &mut node_message_bytes).unwrap(); + let node_message_hash = blake3::hash(&node_message_bytes); + + let result = validate_signed_node_message( + node_message, + node_message_hash.as_bytes(), + &signed_node_message.signature, + &tx, + ) + .await; + assert!(matches!( + result, + Err(AtomaP2pNodeError::InvalidPublicAddressError(_)) + )); +} + +#[tokio::test] +async fn test_validate_usage_metrics_message_future_timestamp() { + let keystore = create_test_keystore(); + let (tx, _rx) = unbounded(); + + // Create metrics with future timestamp + let signed_node_message = create_test_signed_node_message(&keystore, 60, 1); + let node_message = &signed_node_message.node_message; + let mut node_message_bytes = Vec::new(); + ciborium::into_writer(node_message, &mut node_message_bytes).unwrap(); + let node_message_hash = blake3::hash(&node_message_bytes); + + let result = validate_signed_node_message( + node_message, + node_message_hash.as_bytes(), + &signed_node_message.signature, + &tx, + ) + .await; + assert!(matches!( + result, + Err(AtomaP2pNodeError::InvalidPublicAddressError(_)) + )); +} + +#[tokio::test] +async fn test_validate_usage_metrics_message_invalid_signature() { + let keystore = create_test_keystore(); + let (tx, _rx) = unbounded(); + + let signed_node_message = create_test_signed_node_message(&keystore, 0, 1); + let node_message = &signed_node_message.node_message; + + // Corrupt the signature part after scheme byte + let scheme_length = 1; // Ed25519 scheme byte + let sig_start = scheme_length; + let mut signature = signed_node_message.signature.clone(); + for byte in &mut signature[sig_start..sig_start + 64] { + *byte = 0xff; + } + + let mut node_message_bytes = Vec::new(); + ciborium::into_writer(&node_message, &mut node_message_bytes).unwrap(); + let node_message_hash = blake3::hash(&node_message_bytes); + + let result = validate_signed_node_message( + &signed_node_message.node_message, + node_message_hash.as_bytes(), + &signature, + &tx, + ) + .await; + assert!(matches!( + result, + Err(AtomaP2pNodeError::SignatureVerificationError(_)) + )); +} + +#[tokio::test] +async fn test_validate_usage_metrics_message_invalid_node_ownership() { + let keystore = create_test_keystore(); + let (tx, rx) = unbounded(); + + // Create valid metrics + let signed_node_message = create_test_signed_node_message(&keystore, 0, 1); + let node_message = &signed_node_message.node_message; + + // Mock failed node small ID ownership verification + tokio::spawn(async move { + let (event, _response_sender) = rx.recv_async().await.unwrap(); + if let AtomaP2pEvent::VerifyNodeSmallIdOwnership { + node_small_id, + sui_address: _, + } = event + { + assert_eq!(node_small_id, 1); + } + }); + + let mut node_message_bytes = Vec::new(); + ciborium::into_writer(node_message, &mut node_message_bytes).unwrap(); + let node_message_hash = blake3::hash(&node_message_bytes); + + let result = validate_signed_node_message( + node_message, + node_message_hash.as_bytes(), + &signed_node_message.signature, + &tx, + ) + .await; + + assert!(matches!( + result, + Err(AtomaP2pNodeError::NodeSmallIdOwnershipVerificationError(_)) + )); +} + +#[tokio::test] +async fn test_validate_usage_metrics_message_state_manager_error() { + let keystore = create_test_keystore(); + let (tx, rx) = unbounded(); + + // Create valid metrics + let signed_node_message = create_test_signed_node_message(&keystore, 0, 1); + let node_message = &signed_node_message.node_message; + + // Mock state manager channel error by dropping the receiver + drop(rx); + + let mut node_message_bytes = Vec::new(); + ciborium::into_writer(node_message, &mut node_message_bytes).unwrap(); + let node_message_hash = blake3::hash(&node_message_bytes); + + let result = validate_signed_node_message( + node_message, + node_message_hash.as_bytes(), + &signed_node_message.signature, + &tx, + ) + .await; + assert!(matches!( + result, + Err(AtomaP2pNodeError::StateManagerError(_)) + )); +} + +#[tokio::test] +async fn test_validate_usage_metrics_message_response_channel_error() { + let keystore = create_test_keystore(); + let (tx, rx) = unbounded(); + + // Create valid metrics + let signed_node_message = create_test_signed_node_message(&keystore, 0, 1); + let node_message = &signed_node_message.node_message; + + // Mock response channel error + tokio::spawn(async move { + let (event, response_sender) = rx.recv_async().await.unwrap(); + if let AtomaP2pEvent::VerifyNodeSmallIdOwnership { + node_small_id, + sui_address: _, + } = event + { + assert_eq!(node_small_id, 1); + // Drop the sender without sending a response + drop(response_sender); + } + }); + + let mut node_message_bytes = Vec::new(); + ciborium::into_writer(node_message, &mut node_message_bytes).unwrap(); + let node_message_hash = blake3::hash(&node_message_bytes); + + let result = validate_signed_node_message( + node_message, + node_message_hash.as_bytes(), + &signed_node_message.signature, + &tx, + ) + .await; + assert!(matches!( + result, + Err(AtomaP2pNodeError::NodeSmallIdOwnershipVerificationError(_)) + )); +} diff --git a/atoma-p2p/src/timer.rs b/atoma-p2p/src/timer.rs index f7a0341b..f8468aeb 100644 --- a/atoma-p2p/src/timer.rs +++ b/atoma-p2p/src/timer.rs @@ -5,8 +5,8 @@ use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle}; use tracing::{error, instrument}; use crate::{ + errors::AtomaP2pNodeError, metrics::compute_usage_metrics, - service::AtomaP2pNodeError, types::{NodeMessage, NodeP2pMetadata}, }; @@ -42,7 +42,7 @@ const USAGE_METRICS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); fields( is_client = %is_client, event = "usage_metrics_timer_task", - name = "atoma-p2p", + name = "atoma-p2p", ), skip_all )] diff --git a/atoma-p2p/src/types.rs b/atoma-p2p/src/types.rs index d84c3dfb..57417482 100644 --- a/atoma-p2p/src/types.rs +++ b/atoma-p2p/src/types.rs @@ -1,4 +1,4 @@ -use crate::{metrics::NodeMetrics, service::AtomaP2pNodeError}; +use crate::{errors::AtomaP2pNodeError, metrics::NodeMetrics}; use serde::{Deserialize, Serialize}; use sui_sdk::types::crypto::{ Ed25519SuiSignature, Secp256k1SuiSignature, Secp256r1SuiSignature, SuiSignatureInner, diff --git a/atoma-p2p/src/utils.rs b/atoma-p2p/src/utils.rs new file mode 100644 index 00000000..3cd88023 --- /dev/null +++ b/atoma-p2p/src/utils.rs @@ -0,0 +1,339 @@ +use fastcrypto::{ + ed25519::{Ed25519PublicKey, Ed25519Signature}, + secp256k1::{Secp256k1PublicKey, Secp256k1Signature}, + secp256r1::{Secp256r1PublicKey, Secp256r1Signature}, + traits::{ToFromBytes as FastCryptoToFromBytes, VerifyingKey}, +}; +use flume::Sender; +use sui_sdk::types::{ + base_types::SuiAddress, + crypto::{PublicKey, Signature, SignatureScheme, SuiSignature, ToFromBytes}, +}; +use tokio::sync::oneshot; +use tracing::{error, instrument}; +use url::Url; + +use crate::{ + errors::AtomaP2pNodeError, service::StateManagerEvent, types::NodeMessage, AtomaP2pEvent, +}; + +/// The threshold for considering a timestamp as expired +const EXPIRED_TIMESTAMP_THRESHOLD: u64 = 10 * 60; // 10 minutes + +/// Validates a UsageMetrics message by checking the URL format and timestamp freshness. +/// +/// This function performs two key validations: +/// 1. Ensures the node_public_url is a valid URL format +/// 2. Verifies the timestamp is within an acceptable time window relative to the current time +/// +/// # Arguments +/// +/// * `response` - The `UsageMetrics` containing the node_public_url and timestamp to validate +/// +/// # Returns +/// +/// * `Ok(())` - If all validation checks pass +/// * `Err(AtomaP2pNodeError)` - If any validation fails +/// +/// # Errors +/// +/// Returns `AtomaP2pNodeError::InvalidPublicAddressError` when: +/// * The URL is invalid or malformed +/// * The timestamp is older than `EXPIRED_TIMESTAMP_THRESHOLD` (10 minutes) +/// * The timestamp is in the future +/// +/// # Example +/// +/// ```rust,ignore +/// let metrics = UsageMetrics { +/// node_public_url: "https://example.com".to_string(), +/// timestamp: std::time::Instant::now().elapsed().as_secs(), +/// signature: vec![], +/// node_small_id: 1, +/// }; +/// +/// match validate_public_address_message(&metrics) { +/// Ok(()) => println!("UsageMetrics is valid"), +/// Err(e) => println!("Invalid UsageMetrics: {}", e), +/// } +/// ``` +/// +/// # Security Considerations +/// +/// This validation helps prevent: +/// * Invalid or malicious URLs from being propagated +/// * Replay attacks by enforcing timestamp freshness +/// * Future timestamp manipulation attempts +#[instrument(level = "debug", skip_all)] +pub fn validate_node_message_country_url_timestamp( + node_message: &NodeMessage, +) -> Result<(), AtomaP2pNodeError> { + let now = std::time::Instant::now().elapsed().as_secs(); + + let country = node_message.node_metadata.country.as_str(); + validate_country_code(country)?; + + // Check if the URL is valid + let _usage_metrics_url = + Url::parse(&node_message.node_metadata.node_public_url).map_err(|e| { + error!( + target = "atoma-p2p", + event = "invalid_url_format", + error = %e, + "Invalid URL format, received address: {}", + node_message.node_metadata.node_public_url + ); + AtomaP2pNodeError::UrlParseError(e) + })?; + + // Check if the timestamp is within a reasonable time frame + if now < node_message.node_metadata.timestamp + || now > node_message.node_metadata.timestamp + EXPIRED_TIMESTAMP_THRESHOLD + { + error!( + target = "atoma-p2p", + event = "invalid_timestamp", + "Timestamp is invalid, timestamp: {}, now: {}", + node_message.node_metadata.timestamp, + now + ); + return Err(AtomaP2pNodeError::InvalidPublicAddressError( + "Timestamp is too far in the past".to_string(), + )); + } + + Ok(()) +} + +/// Custom validation function for ISO 3166-1 alpha-2 country codes +fn validate_country_code(code: &str) -> Result<(), AtomaP2pNodeError> { + isocountry::CountryCode::for_alpha2(code).map_err(|_| { + AtomaP2pNodeError::InvalidCountryCodeError("Country code is invalid.".to_string()) + })?; + Ok(()) +} + +/// Verifies a cryptographic signature against a message hash using the signature scheme embedded in the signature. +/// +/// This function supports multiple signature schemes: +/// - ED25519 +/// - Secp256k1 +/// - Secp256r1 +/// +/// # Arguments +/// +/// * `signature_bytes` - Raw bytes of the signature. This should include both the signature data and metadata +/// that allows extracting the public key and signature scheme. +/// * `body_hash` - A 32-byte array containing the hash of the message that was signed. This is typically +/// produced using blake3 or another cryptographic hash function. +/// +/// # Returns +/// +/// * `Ok(())` - If the signature is valid for the given message hash +/// * `Err(AtomaP2pNodeError)` - If any step of the verification process fails +/// +/// # Errors +/// +/// This function will return an error in the following situations: +/// * `SignatureParseError` - If the signature bytes cannot be parsed into a valid signature structure +/// or if the public key cannot be extracted from the signature +/// * `SignatureVerificationError` - If the signature verification fails (indicating the signature is invalid +/// for the given message) +/// * `SignatureParseError` - If the signature uses an unsupported signature scheme +/// +/// # Example +/// +/// ```rust,ignore +/// use your_crate::utils::verify_signature; +/// +/// let message = b"Hello, world!"; +/// let message_hash = blake3::hash(message).as_bytes(); +/// let signature_bytes = // ... obtained from somewhere ... +/// +/// match verify_signature(&signature_bytes, &message_hash) { +/// Ok(()) => println!("Signature is valid!"), +/// Err(e) => println!("Signature verification failed: {}", e), +/// } +/// ``` +#[instrument(level = "trace", skip_all)] +pub fn verify_signature( + signature_bytes: &[u8], + body_hash: &[u8; 32], +) -> Result<(), AtomaP2pNodeError> { + let signature = Signature::from_bytes(signature_bytes).map_err(|e| { + error!("Failed to parse signature"); + AtomaP2pNodeError::SignatureParseError(e.to_string()) + })?; + let public_key_bytes = signature.public_key_bytes(); + let signature_scheme = signature.scheme(); + let public_key = + PublicKey::try_from_bytes(signature_scheme, public_key_bytes).map_err(|e| { + error!("Failed to extract public key from bytes, with error: {e}"); + AtomaP2pNodeError::SignatureParseError(e.to_string()) + })?; + let signature_bytes = signature.signature_bytes(); + + match signature_scheme { + SignatureScheme::ED25519 => { + let public_key = Ed25519PublicKey::from_bytes(public_key.as_ref()).unwrap(); + let signature = Ed25519Signature::from_bytes(signature_bytes).unwrap(); + public_key.verify(body_hash, &signature).map_err(|e| { + error!("Failed to verify signature"); + AtomaP2pNodeError::SignatureVerificationError(e.to_string()) + })?; + } + SignatureScheme::Secp256k1 => { + let public_key = Secp256k1PublicKey::from_bytes(public_key.as_ref()).unwrap(); + let signature = Secp256k1Signature::from_bytes(signature_bytes).unwrap(); + public_key.verify(body_hash, &signature).map_err(|e| { + error!("Failed to verify signature"); + AtomaP2pNodeError::SignatureVerificationError(e.to_string()) + })?; + } + SignatureScheme::Secp256r1 => { + let public_key = Secp256r1PublicKey::from_bytes(public_key.as_ref()).unwrap(); + let signature = Secp256r1Signature::from_bytes(signature_bytes).unwrap(); + public_key.verify(body_hash, &signature).map_err(|e| { + error!("Failed to verify signature"); + AtomaP2pNodeError::SignatureVerificationError(e.to_string()) + })?; + } + e => { + error!("Currently unsupported signature scheme, error: {e}"); + return Err(AtomaP2pNodeError::SignatureParseError(e.to_string())); + } + } + Ok(()) +} + +/// Verifies that a node owns its claimed small ID by checking the signature and querying the state manager. +/// +/// This function performs the following steps: +/// 1. Parses the signature to extract the public key +/// 2. Derives the Sui address from the public key +/// 3. Sends a verification request to the state manager +/// 4. Waits for the state manager's response +/// +/// # Arguments +/// +/// * `node_small_id` - The small ID claimed by the node +/// * `signature` - The signature to verify, containing the public key +/// * `state_manager_sender` - Channel to send verification requests to the state manager +/// +/// # Returns +/// +/// * `Ok(())` - If the node owns the small ID +/// * `Err(AtomaP2pNodeError)` - If verification fails at any step +/// +/// # Errors +/// +/// This function can return the following errors: +/// * `SignatureParseError` - If the signature cannot be parsed +/// * `StateManagerError` - If the verification request cannot be sent to the state manager +/// * `NodeSmallIdOwnershipVerificationError` - If the state manager reports the node does not own the ID +/// +/// # Example +/// +/// ```rust,ignore +/// let signature = // ... obtained from node ...; +/// let result = verify_node_small_id_ownership( +/// 42, +/// &signature, +/// state_manager_sender +/// ).await; +/// +/// match result { +/// Ok(()) => println!("Verification succeeded"), +/// Err(e) => println!("Verification failed: {}", e), +/// } +/// ``` +/// +/// # Security Considerations +/// +/// - Uses cryptographic signatures to prevent impersonation +/// - Relies on the state manager's authoritative record of node ownership +/// - Protects against node ID spoofing attacks +#[instrument(level = "debug", skip_all)] +pub async fn verify_node_small_id_ownership( + node_small_id: u64, + signature: &[u8], + state_manager_sender: &Sender, +) -> Result<(), AtomaP2pNodeError> { + let signature = Signature::from_bytes(signature).map_err(|e| { + error!("Failed to parse signature"); + AtomaP2pNodeError::SignatureParseError(e.to_string()) + })?; + let public_key_bytes = signature.public_key_bytes(); + let public_key = + PublicKey::try_from_bytes(signature.scheme(), public_key_bytes).map_err(|e| { + error!("Failed to extract public key from bytes, with error: {e}"); + AtomaP2pNodeError::SignatureParseError(e.to_string()) + })?; + let sui_address = SuiAddress::from(&public_key); + let (sender, receiver) = oneshot::channel(); + if let Err(e) = state_manager_sender.send(( + AtomaP2pEvent::VerifyNodeSmallIdOwnership { + node_small_id, + sui_address: sui_address.to_string(), + }, + Some(sender), + )) { + error!( + target = "atoma-p2p", + event = "failed_to_send_event_to_state_manager", + error = %e, + "Failed to send event to state manager" + ); + return Err(AtomaP2pNodeError::StateManagerError(e)); + } + match receiver.await { + Ok(result) => { + if result { + Ok(()) + } else { + Err(AtomaP2pNodeError::NodeSmallIdOwnershipVerificationError( + "Node small ID ownership verification failed".to_string(), + )) + } + } + Err(e) => { + error!("Failed to receive result from state manager, with error: {e}"); + Err(AtomaP2pNodeError::NodeSmallIdOwnershipVerificationError( + e.to_string(), + )) + } + } +} + +/// Validates the messages received from the P2P network +/// +/// This function validates the sharing node public addresses messages received from the P2P network by checking the signature and the node small ID ownership. +/// +/// # Arguments +/// +/// * `response` - The message received from the P2P network +/// * `state_manager_sender` - The sender of the state manager +/// +/// # Returns +/// +/// Returns `Ok(())` if the message is valid, or a `AtomaP2pNodeError` if any step fails. +#[instrument(level = "debug", skip_all)] +pub async fn validate_signed_node_message( + node_message: &NodeMessage, + node_message_hash: &[u8; 32], + signature: &[u8], + state_manager_sender: &Sender, +) -> Result<(), AtomaP2pNodeError> { + // Validate the message's node public URL and timestamp + validate_node_message_country_url_timestamp(node_message)?; + // Verify the signature of the message + verify_signature(signature, node_message_hash)?; + // Verify the node small ID ownership + verify_node_small_id_ownership( + node_message.node_metadata.node_small_id, + signature, + state_manager_sender, + ) + .await?; + Ok(()) +} diff --git a/deny.toml b/deny.toml index f8a2c135..0ca8b79e 100644 --- a/deny.toml +++ b/deny.toml @@ -24,6 +24,8 @@ ignore = [ "RUSTSEC-2024-0320", # allow unmaintained proc-macro-error used in transitive dependencies "RUSTSEC-2024-0370", + # ring is unmaintained but is a critical dependency through sui-sdk and libp2p + "RUSTSEC-2025-0007", ] # Threshold for security vulnerabilities, any vulnerability with a CVSS score # lower than the range specified will be ignored. Note that ignored advisories From e08b78e747d72943f4a87b44804597e2ac4b5e56 Mon Sep 17 00:00:00 2001 From: chad Date: Fri, 21 Feb 2025 19:22:27 -0500 Subject: [PATCH 3/6] feat: add more connection metrics --- atoma-p2p/src/metrics.rs | 8 +++++++ atoma-p2p/src/service.rs | 48 ++++++++++++++++++++++++++++++++-------- 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/atoma-p2p/src/metrics.rs b/atoma-p2p/src/metrics.rs index 9e985a0f..44a3f9f8 100644 --- a/atoma-p2p/src/metrics.rs +++ b/atoma-p2p/src/metrics.rs @@ -203,6 +203,14 @@ pub static TOTAL_INCOMING_CONNECTIONS: Lazy> = Lazy::new(|| { .build() }); +pub static TOTAL_OUTGOING_CONNECTIONS: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_gauge("total_outgoing_connections") + .with_description("The total number of outgoing connections") + .with_unit("connections") + .build() +}); + pub static TOTAL_STREAM_INCOMING_BANDWIDTH: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_gauge("total_stream_bandwidth") diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index 4bdc7426..eafb2ed3 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -1,5 +1,8 @@ use crate::errors::AtomaP2pNodeError; -use crate::metrics::{PEERS_CONNECTED, TOTAL_GOSSIPSUB_SUBSCRIPTIONS}; +use crate::metrics::{ + PEERS_CONNECTED, TOTAL_GOSSIPSUB_SUBSCRIPTIONS, TOTAL_INCOMING_CONNECTIONS, + TOTAL_OUTGOING_CONNECTIONS, +}; use crate::utils::validate_signed_node_message; use crate::{ config::AtomaP2pNodeConfig, @@ -9,6 +12,7 @@ use crate::{ }; use flume::Sender; use futures::StreamExt; +use libp2p::metrics::{Metrics, Recorder, Registry}; use libp2p::{ gossipsub::{self}, identify, kad, mdns, noise, @@ -92,8 +96,7 @@ pub struct AtomaP2pNode { network_metrics: NetworkMetrics, /// Add registry field - #[allow(dead_code)] - metrics_registry: libp2p::metrics::Registry, + metrics_registry: Registry, } impl AtomaP2pNode { @@ -435,6 +438,8 @@ impl AtomaP2pNode { ) -> Result<(), AtomaP2pNodeError> { // Create a metrics update interval let mut metrics_interval = tokio::time::interval(std::time::Duration::from_secs(15)); + let metrics = Metrics::new(&mut self.metrics_registry); + let peer_id = self.swarm.local_peer_id().to_base58(); loop { tokio::select! { @@ -442,10 +447,15 @@ impl AtomaP2pNode { _ = metrics_interval.tick() => { self.network_metrics.update_metrics(); + let network_info = self.swarm.network_info(); - #[allow(clippy::cast_possible_wrap)] - let connected_peers = self.swarm.connected_peers().count() as i64; - PEERS_CONNECTED.record(connected_peers, &[KeyValue::new("peerId", self.swarm.local_peer_id().to_base58())]); + #[allow(clippy::cast_possible_wrap, clippy::cast_lossless)] + { + PEERS_CONNECTED.record(network_info.num_peers() as i64, &[KeyValue::new("peerId", peer_id.clone())]); + TOTAL_INCOMING_CONNECTIONS.record(network_info.connection_counters().num_established_incoming() as u64, &[KeyValue::new("peerId", peer_id.clone())]); + TOTAL_OUTGOING_CONNECTIONS.record(network_info.connection_counters().num_established_outgoing() as u64, &[KeyValue::new("peerId", peer_id.clone())]); + TOTAL_CONNECTIONS.record(network_info.connection_counters().num_connections() as u64, &[KeyValue::new("peerId", peer_id.clone())]); + } } event = self.swarm.select_next_some() => { @@ -475,6 +485,10 @@ impl AtomaP2pNode { topic, })) => { TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(1, &[KeyValue::new("topic", topic.to_string())]); + metrics.record(&gossipsub::Event::Subscribed { + peer_id, + topic: topic.clone(), + }); debug!( target = "atoma-p2p", event = "gossipsub_subscribed", @@ -539,7 +553,6 @@ impl AtomaP2pNode { connection_id, .. } => { - TOTAL_CONNECTIONS.record(u64::from(num_established.get()), &[KeyValue::new("peerId", peer_id.to_string())]); debug!( target = "atoma-p2p", event = "peer_connection_established", @@ -556,12 +569,12 @@ impl AtomaP2pNode { num_established, .. } => { - TOTAL_CONNECTIONS.record(u64::from(num_established), &[]); debug!( target = "atoma-p2p", event = "peer_connection_closed", peer_id = %peer_id, connection_id = %connection_id, + num_established = %num_established, "Peer connection closed" ); } @@ -610,7 +623,24 @@ impl AtomaP2pNode { } => { TOTAL_DIALS_FAILED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); } - _ => {} + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Identify(identify_event)) => { + tracing::debug!( + target = "atoma-p2p", + event = "identify", + identify_event = ?identify_event, + "Identify event" + ); + metrics.record(&identify_event); + } + swarm_event => { + tracing::debug!( + target = "atoma-p2p", + event = "swarm_event", + swarm_event = ?swarm_event, + "Swarm event" + ); + metrics.record(&swarm_event); + } } } Some(usage_metrics) = self.usage_metrics_rx.recv() => { From 059e01ca083de8d287eb1ee78e3884bb5fae5cc7 Mon Sep 17 00:00:00 2001 From: chad Date: Fri, 21 Feb 2025 19:29:54 -0500 Subject: [PATCH 4/6] refactor: slight refactor to prevent unnecessary clones --- atoma-p2p/src/service.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index eafb2ed3..b5de9088 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -449,12 +449,15 @@ impl AtomaP2pNode { let network_info = self.swarm.network_info(); + let peer_id_kv = KeyValue::new("peerId", peer_id.clone()); + let peer_id_kv_slice = &[peer_id_kv]; + #[allow(clippy::cast_possible_wrap, clippy::cast_lossless)] { - PEERS_CONNECTED.record(network_info.num_peers() as i64, &[KeyValue::new("peerId", peer_id.clone())]); - TOTAL_INCOMING_CONNECTIONS.record(network_info.connection_counters().num_established_incoming() as u64, &[KeyValue::new("peerId", peer_id.clone())]); - TOTAL_OUTGOING_CONNECTIONS.record(network_info.connection_counters().num_established_outgoing() as u64, &[KeyValue::new("peerId", peer_id.clone())]); - TOTAL_CONNECTIONS.record(network_info.connection_counters().num_connections() as u64, &[KeyValue::new("peerId", peer_id.clone())]); + PEERS_CONNECTED.record(network_info.num_peers() as i64, peer_id_kv_slice); + TOTAL_INCOMING_CONNECTIONS.record(network_info.connection_counters().num_established_incoming() as u64, peer_id_kv_slice); + TOTAL_OUTGOING_CONNECTIONS.record(network_info.connection_counters().num_established_outgoing() as u64, peer_id_kv_slice); + TOTAL_CONNECTIONS.record(network_info.connection_counters().num_connections() as u64, peer_id_kv_slice); } } From d430627352926dc1b33f921a0a2681dc57b41c0a Mon Sep 17 00:00:00 2001 From: chad Date: Sat, 22 Feb 2025 21:10:02 -0700 Subject: [PATCH 5/6] feat: added more gossipsub metrics --- atoma-p2p/src/errors.rs | 2 ++ atoma-p2p/src/metrics.rs | 20 +++++++++-- atoma-p2p/src/service.rs | 76 ++++++++++++++++++++++++++++++++-------- atoma-p2p/src/utils.rs | 24 ++++++++++++- 4 files changed, 105 insertions(+), 17 deletions(-) diff --git a/atoma-p2p/src/errors.rs b/atoma-p2p/src/errors.rs index 602fa0b5..bd4c6d8e 100644 --- a/atoma-p2p/src/errors.rs +++ b/atoma-p2p/src/errors.rs @@ -58,4 +58,6 @@ pub enum AtomaP2pNodeError { InvalidConfig(String), #[error("Invalid message length")] InvalidMessageLengthError, + #[error("Failed to publish Message: `{0}`")] + PublishError(String), } diff --git a/atoma-p2p/src/metrics.rs b/atoma-p2p/src/metrics.rs index 44a3f9f8..77ff0bd0 100644 --- a/atoma-p2p/src/metrics.rs +++ b/atoma-p2p/src/metrics.rs @@ -171,7 +171,7 @@ pub static TOTAL_GOSSIPSUB_SUBSCRIPTIONS: Lazy> = Lazy::new(| .build() }); -pub static TOTAL_VALID_GOSSIPSUB_MESSAGES_RECEIVED: Lazy> = Lazy::new(|| { +pub static TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_counter("total_valid_gossipsub_messages_received") .with_description("The total number of valid gossipsub messages received") @@ -187,7 +187,15 @@ pub static TOTAL_GOSSIPSUB_MESSAGES_FORWARDED: Lazy> = Lazy::new(|| .build() }); -pub static TOTAL_FAILED_GOSSIPSUB_MESSAGES: Lazy> = Lazy::new(|| { +pub static TOTAL_GOSSIPSUB_PUBLISHES: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_counter("total_gossipsub_publishes") + .with_description("The total number of gossipsub publishes") + .with_unit("messages") + .build() +}); + +pub static TOTAL_FAILED_GOSSIPSUB_PUBLISHES: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_counter("total_failed_gossipsub_messages") .with_description("The total number of failed gossipsub messages") @@ -211,6 +219,14 @@ pub static TOTAL_OUTGOING_CONNECTIONS: Lazy> = Lazy::new(|| { .build() }); +pub static TOTAL_MDNS_DISCOVERIES: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_counter("total_mdns_discoveries") + .with_description("The total number of mDNS discoveries") + .with_unit("discoveries") + .build() +}); + pub static TOTAL_STREAM_INCOMING_BANDWIDTH: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_gauge("total_stream_bandwidth") diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index b5de9088..b44582da 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -1,12 +1,16 @@ use crate::errors::AtomaP2pNodeError; use crate::metrics::{ - PEERS_CONNECTED, TOTAL_GOSSIPSUB_SUBSCRIPTIONS, TOTAL_INCOMING_CONNECTIONS, + PEERS_CONNECTED, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_MESSAGES_FORWARDED, + TOTAL_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_SUBSCRIPTIONS, TOTAL_INCOMING_CONNECTIONS, TOTAL_OUTGOING_CONNECTIONS, }; -use crate::utils::validate_signed_node_message; +use crate::utils::{extract_gossipsub_metrics, validate_signed_node_message}; use crate::{ config::AtomaP2pNodeConfig, - metrics::{NetworkMetrics, TOTAL_CONNECTIONS, TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED}, + metrics::{ + NetworkMetrics, TOTAL_CONNECTIONS, TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED, + TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED, TOTAL_MDNS_DISCOVERIES, + }, timer::usage_metrics_timer_task, types::{AtomaP2pEvent, NodeMessage, SerializeWithSignature, SignedNodeMessage}, }; @@ -229,10 +233,13 @@ impl AtomaP2pNode { AtomaP2pNodeError::GossipsubConfigError(e) })?; - let gossipsub = gossipsub::Behaviour::new( + let gossipsub = gossipsub::Behaviour::new_with_metrics( gossipsub::MessageAuthenticity::Signed(key.clone()), gossipsub_config, - )?; + &mut metrics_registry, + gossipsub::MetricsConfig::default(), + ) + .map_err(|e| AtomaP2pNodeError::InvalidConfig(e.to_string()))?; let store = kad::store::MemoryStore::new(key.public().to_peer_id()); let kademlia = kad::Behaviour::new(key.public().to_peer_id(), store); @@ -449,6 +456,8 @@ impl AtomaP2pNode { let network_info = self.swarm.network_info(); + extract_gossipsub_metrics(&self.swarm.behaviour_mut().gossipsub); + let peer_id_kv = KeyValue::new("peerId", peer_id.clone()); let peer_id_kv_slice = &[peer_id_kv]; @@ -470,10 +479,10 @@ impl AtomaP2pNode { })) => { match self.handle_gossipsub_message(&message.data, &message_id, &propagation_source).await { Ok(()) => { - + TOTAL_GOSSIPSUB_MESSAGES_FORWARDED.add(1, &[KeyValue::new("peerId", self.swarm.local_peer_id().to_base58())]); } Err(e) => { - + TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED.add(1, &[KeyValue::new("peerId", self.swarm.local_peer_id().to_base58())]); error!( target = "atoma-p2p", event = "gossipsub_message_error", @@ -487,11 +496,13 @@ impl AtomaP2pNode { peer_id, topic, })) => { + // Record subscript metrics TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(1, &[KeyValue::new("topic", topic.to_string())]); metrics.record(&gossipsub::Event::Subscribed { peer_id, topic: topic.clone(), }); + debug!( target = "atoma-p2p", event = "gossipsub_subscribed", @@ -504,7 +515,13 @@ impl AtomaP2pNode { peer_id, topic, })) => { + // Record unsubscription metrics TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(-1, &[KeyValue::new("topic", topic.to_string())]); + metrics.record(&gossipsub::Event::Unsubscribed { + peer_id, + topic: topic.clone(), + }); + debug!( target = "atoma-p2p", event = "gossipsub_unsubscribed", @@ -514,12 +531,7 @@ impl AtomaP2pNode { ); } SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Discovered(discovered_peers))) => { - debug!( - target = "atoma-p2p", - event = "mdns_discovered", - num_discovered_peers = %discovered_peers.len(), - "Mdns discovered" - ); + let peer_count = discovered_peers.len() as u64; for (peer_id, multiaddr) in discovered_peers { debug!( target = "atoma-p2p", @@ -530,6 +542,8 @@ impl AtomaP2pNode { ); self.swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); } + // Record discovery metrics + TOTAL_MDNS_DISCOVERIES.add(peer_count, &[KeyValue::new("peerId", peer_id.clone())]); } SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Expired(expired_peers))) => { debug!( @@ -635,6 +649,15 @@ impl AtomaP2pNode { ); metrics.record(&identify_event); } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Kademlia(kad_event)) => { + tracing::debug!( + target = "atoma-p2p", + event = "kad", + kad_event = ?kad_event, + "Kad event" + ); + metrics.record(&kad_event); + } swarm_event => { tracing::debug!( target = "atoma-p2p", @@ -942,7 +965,32 @@ impl AtomaP2pNode { self.swarm .behaviour_mut() .gossipsub - .publish(topic, serialized_signed_node_message)?; + .publish(topic, serialized_signed_node_message) + .map_err(|e| { + error!( + target = "atoma-p2p", + event = "publish_metrics_error", + error = %e, + "Failed to publish metrics" + ); + TOTAL_FAILED_GOSSIPSUB_PUBLISHES.add( + 1, + &[KeyValue::new( + "peerId", + self.swarm.local_peer_id().to_base58(), + )], + ); + AtomaP2pNodeError::PublishError(e.to_string()) + })?; + + TOTAL_GOSSIPSUB_PUBLISHES.add( + 1, + &[KeyValue::new( + "peerId", + self.swarm.local_peer_id().to_base58(), + )], + ); + Ok(()) } } diff --git a/atoma-p2p/src/utils.rs b/atoma-p2p/src/utils.rs index 3cd88023..81f29258 100644 --- a/atoma-p2p/src/utils.rs +++ b/atoma-p2p/src/utils.rs @@ -5,6 +5,8 @@ use fastcrypto::{ traits::{ToFromBytes as FastCryptoToFromBytes, VerifyingKey}, }; use flume::Sender; +use libp2p::gossipsub; +use opentelemetry::KeyValue; use sui_sdk::types::{ base_types::SuiAddress, crypto::{PublicKey, Signature, SignatureScheme, SuiSignature, ToFromBytes}, @@ -14,7 +16,11 @@ use tracing::{error, instrument}; use url::Url; use crate::{ - errors::AtomaP2pNodeError, service::StateManagerEvent, types::NodeMessage, AtomaP2pEvent, + errors::AtomaP2pNodeError, + metrics::{GOSSIP_SCORE_HISTOGRAM, TOTAL_GOSSIPSUB_SUBSCRIPTIONS}, + service::StateManagerEvent, + types::NodeMessage, + AtomaP2pEvent, }; /// The threshold for considering a timestamp as expired @@ -337,3 +343,19 @@ pub async fn validate_signed_node_message( .await?; Ok(()) } + +#[instrument(level = "debug", skip_all)] +pub fn extract_gossipsub_metrics(gossipsub: &gossipsub::Behaviour) { + for topic in gossipsub.topics() { + #[allow(clippy::cast_possible_wrap)] + let peer_count = gossipsub.mesh_peers(topic).count() as i64; + TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(peer_count, &[KeyValue::new("topic", topic.to_string())]); + + // Process peer scores in the same iteration + gossipsub.mesh_peers(topic).for_each(|peer| { + if let Some(score) = gossipsub.peer_score(peer) { + GOSSIP_SCORE_HISTOGRAM.record(score, &[KeyValue::new("peerId", peer.to_string())]); + } + }); + } +} From 0000a5dcbe4124418e4735a291aa7aafb0cc929a Mon Sep 17 00:00:00 2001 From: chad Date: Sun, 23 Feb 2025 15:14:58 -0700 Subject: [PATCH 6/6] pr feedback --- atoma-p2p/src/metrics.rs | 189 ++++++++++++++++++++++++++++++++++++++- atoma-p2p/src/service.rs | 44 ++++++++- atoma-p2p/src/tests.rs | 3 +- 3 files changed, 228 insertions(+), 8 deletions(-) diff --git a/atoma-p2p/src/metrics.rs b/atoma-p2p/src/metrics.rs index 77ff0bd0..ff5d739b 100644 --- a/atoma-p2p/src/metrics.rs +++ b/atoma-p2p/src/metrics.rs @@ -131,6 +131,17 @@ pub enum NodeMetricsError { TryFromIntError(#[from] std::num::TryFromIntError), } +/// Counter metric that tracks the total number of dial attempts. +/// +/// This metric counts the number of dial attempts, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_dials_attempted` +/// - Type: Counter +/// - Labels: `peer_id` +/// - Unit: requests (count) pub static TOTAL_DIALS_ATTEMPTED: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_counter("total_dials") @@ -139,6 +150,17 @@ pub static TOTAL_DIALS_ATTEMPTED: Lazy> = Lazy::new(|| { .build() }); +/// Counter metric that tracks the total number of dial failures. +/// +/// This metric counts the number of dial failures, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_dials_failed` +/// - Type: Counter +/// - Labels: `peer_id` +/// - Unit: dials (count) pub static TOTAL_DIALS_FAILED: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_counter("total_dials_failed") @@ -147,6 +169,17 @@ pub static TOTAL_DIALS_FAILED: Lazy> = Lazy::new(|| { .build() }); +/// Gauge metric that tracks the total number of connections. +/// +/// This metric counts the number of connections, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_connections` +/// - Type: Gauge +/// - Labels: `peer_id` +/// - Unit: connections (count) pub static TOTAL_CONNECTIONS: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_gauge("total_connections") @@ -155,6 +188,17 @@ pub static TOTAL_CONNECTIONS: Lazy> = Lazy::new(|| { .build() }); +/// Gauge metric that tracks the total number of peers connected. +/// +/// This metric counts the number of peers connected, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_peers_connected` +/// - Type: Gauge +/// - Labels: `peer_id` +/// - Unit: peers (count) pub static PEERS_CONNECTED: Lazy> = Lazy::new(|| { GLOBAL_METER .i64_gauge("peers_connected") @@ -163,6 +207,17 @@ pub static PEERS_CONNECTED: Lazy> = Lazy::new(|| { .build() }); +/// Counter metric that tracks the total number of gossipsub subscriptions. +/// +/// This metric counts the number of gossipsub subscriptions, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_gossipsub_subscriptions` +/// - Type: `UpDownCounter` +/// - Labels: `peer_id` +/// - Unit: `subscriptions` (count) pub static TOTAL_GOSSIPSUB_SUBSCRIPTIONS: Lazy> = Lazy::new(|| { GLOBAL_METER .i64_up_down_counter("total_gossipsub_subscriptions") @@ -171,14 +226,36 @@ pub static TOTAL_GOSSIPSUB_SUBSCRIPTIONS: Lazy> = Lazy::new(| .build() }); +/// Counter metric that tracks the total number of invalid gossipsub messages received. +/// +/// This metric counts the number of invalid gossipsub messages received, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_invalid_gossipsub_messages_received` +/// - Type: Counter +/// - Labels: `peer_id` +/// - Unit: messages (count) pub static TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED: Lazy> = Lazy::new(|| { GLOBAL_METER - .u64_counter("total_valid_gossipsub_messages_received") - .with_description("The total number of valid gossipsub messages received") + .u64_counter("total_invalid_gossipsub_messages_received") + .with_description("The total number of invalid gossipsub messages received") .with_unit("messages") .build() }); +/// Counter metric that tracks the total number of gossipsub messages forwarded. +/// +/// This metric counts the number of gossipsub messages forwarded, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_gossipsub_messages_forwarded` +/// - Type: Counter +/// - Labels: `peer_id` +/// - Unit: messages (count) pub static TOTAL_GOSSIPSUB_MESSAGES_FORWARDED: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_counter("total_gossipsub_messages_forwarded") @@ -187,6 +264,17 @@ pub static TOTAL_GOSSIPSUB_MESSAGES_FORWARDED: Lazy> = Lazy::new(|| .build() }); +/// Counter metric that tracks the total number of gossipsub publishes. +/// +/// This metric counts the number of gossipsub publishes, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_gossipsub_publishes` +/// - Type: Counter +/// - Labels: `peer_id` +/// - Unit: messages (count) pub static TOTAL_GOSSIPSUB_PUBLISHES: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_counter("total_gossipsub_publishes") @@ -195,6 +283,17 @@ pub static TOTAL_GOSSIPSUB_PUBLISHES: Lazy> = Lazy::new(|| { .build() }); +/// Counter metric that tracks the total number of failed gossipsub messages. +/// +/// This metric counts the number of failed gossipsub messages, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_failed_gossipsub_messages` +/// - Type: Counter +/// - Labels: `peer_id` +/// - Unit: messages (count) pub static TOTAL_FAILED_GOSSIPSUB_PUBLISHES: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_counter("total_failed_gossipsub_messages") @@ -203,6 +302,17 @@ pub static TOTAL_FAILED_GOSSIPSUB_PUBLISHES: Lazy> = Lazy::new(|| { .build() }); +/// Gauge metric that tracks the total number of incoming connections. +/// +/// This metric counts the number of incoming connections, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_incoming_connections` +/// - Type: Gauge +/// - Labels: `peer_id` +/// - Unit: connections (count) pub static TOTAL_INCOMING_CONNECTIONS: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_gauge("total_incoming_connections") @@ -211,6 +321,17 @@ pub static TOTAL_INCOMING_CONNECTIONS: Lazy> = Lazy::new(|| { .build() }); +/// Gauge metric that tracks the total number of outgoing connections. +/// +/// This metric counts the number of outgoing connections, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_outgoing_connections` +/// - Type: Gauge +/// - Labels: `peer_id` +/// - Unit: connections (count) pub static TOTAL_OUTGOING_CONNECTIONS: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_gauge("total_outgoing_connections") @@ -219,6 +340,17 @@ pub static TOTAL_OUTGOING_CONNECTIONS: Lazy> = Lazy::new(|| { .build() }); +/// Counter metric that tracks the total number of mDNS discoveries. +/// +/// This metric counts the number of mDNS discoveries, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_mdns_discoveries` +/// - Type: Counter +/// - Labels: `peer_id` +/// - Unit: discoveries (count) pub static TOTAL_MDNS_DISCOVERIES: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_counter("total_mdns_discoveries") @@ -227,6 +359,17 @@ pub static TOTAL_MDNS_DISCOVERIES: Lazy> = Lazy::new(|| { .build() }); +/// Gauge metric that tracks the total number of stream bandwidth. +/// +/// This metric counts the number of stream bandwidth, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_stream_incoming_bandwidth` +/// - Type: Gauge +/// - Labels: `peer_id` +/// - Unit: bytes (count) pub static TOTAL_STREAM_INCOMING_BANDWIDTH: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_gauge("total_stream_bandwidth") @@ -235,6 +378,17 @@ pub static TOTAL_STREAM_INCOMING_BANDWIDTH: Lazy> = Lazy::new(|| { .build() }); +/// Gauge metric that tracks the total number of stream outgoing bandwidth. +/// +/// This metric counts the number of stream outgoing bandwidth, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_stream_outgoing_bandwidth` +/// - Type: Gauge +/// - Labels: `peer_id` +/// - Unit: bytes (count) pub static TOTAL_STREAM_OUTGOING_BANDWIDTH: Lazy> = Lazy::new(|| { GLOBAL_METER .u64_gauge("total_stream_outgoing_bandwidth") @@ -243,6 +397,17 @@ pub static TOTAL_STREAM_OUTGOING_BANDWIDTH: Lazy> = Lazy::new(|| { .build() }); +/// Histogram metric that tracks the histogram of gossip scores. +/// +/// This metric counts the histogram of gossip scores, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +/// - Name: `atoma_gossip_score_histogram` +/// - Type: Histogram +/// - Labels: `peer_id` +/// - Unit: score (count) pub static GOSSIP_SCORE_HISTOGRAM: Lazy> = Lazy::new(|| { GLOBAL_METER .f64_histogram("gossip_score_histogram") @@ -251,6 +416,25 @@ pub static GOSSIP_SCORE_HISTOGRAM: Lazy> = Lazy::new(|| { .build() }); +/// Gauge metric that tracks the size of the Kademlia routing table. +/// +/// This metric counts the size of the Kademlia routing table, +/// broken down by model type. This helps monitor usage patterns and load +/// across different image generation models. +/// +/// # Metric Details +pub static KAD_ROUTING_TABLE_SIZE: Lazy> = Lazy::new(|| { + GLOBAL_METER + .u64_gauge("kad_routing_table_size") + .with_description("The size of the Kademlia routing table") + .with_unit("size") + .build() +}); + +/// Structure to store the network metrics. +/// +/// This data is collected from the system +/// and is used to update the network metrics. pub struct NetworkMetrics { networks: Networks, bytes_received: Gauge, @@ -281,6 +465,7 @@ impl Default for NetworkMetrics { } } +// Network metrics implementation impl NetworkMetrics { pub fn update_metrics(&mut self) { self.networks.refresh(true); diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index b44582da..66f0d4ca 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -1,8 +1,8 @@ use crate::errors::AtomaP2pNodeError; use crate::metrics::{ - PEERS_CONNECTED, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_MESSAGES_FORWARDED, - TOTAL_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_SUBSCRIPTIONS, TOTAL_INCOMING_CONNECTIONS, - TOTAL_OUTGOING_CONNECTIONS, + KAD_ROUTING_TABLE_SIZE, PEERS_CONNECTED, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, + TOTAL_GOSSIPSUB_MESSAGES_FORWARDED, TOTAL_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_SUBSCRIPTIONS, + TOTAL_INCOMING_CONNECTIONS, TOTAL_OUTGOING_CONNECTIONS, }; use crate::utils::{extract_gossipsub_metrics, validate_signed_node_message}; use crate::{ @@ -26,6 +26,7 @@ use libp2p::{ use opentelemetry::KeyValue; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; +use std::time::Duration; use sui_keys::keystore::{AccountKeystore, FileBasedKeystore}; use tokio::{ sync::{mpsc::UnboundedReceiver, oneshot, watch}, @@ -36,6 +37,9 @@ use tracing::{debug, error, instrument}; /// The topic that the P2P network will use to gossip messages const METRICS_GOSPUBSUB_TOPIC: &str = "atoma-p2p-usage-metrics"; +/// The interval at which the metrics are updated +const METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(15); + pub type StateManagerEvent = (AtomaP2pEvent, Option>); /// Network behavior configuration for the P2P Atoma node, combining multiple libp2p protocols. @@ -444,7 +448,7 @@ impl AtomaP2pNode { mut shutdown_signal: watch::Receiver, ) -> Result<(), AtomaP2pNodeError> { // Create a metrics update interval - let mut metrics_interval = tokio::time::interval(std::time::Duration::from_secs(15)); + let mut metrics_interval = tokio::time::interval(METRICS_UPDATE_INTERVAL); let metrics = Metrics::new(&mut self.metrics_registry); let peer_id = self.swarm.local_peer_id().to_base58(); @@ -532,6 +536,12 @@ impl AtomaP2pNode { } SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Discovered(discovered_peers))) => { let peer_count = discovered_peers.len() as u64; + debug!( + target = "atoma-p2p", + event = "mdns_discovered", + peer_count = %peer_count, + "Mdns discovered peers" + ); for (peer_id, multiaddr) in discovered_peers { debug!( target = "atoma-p2p", @@ -563,6 +573,32 @@ impl AtomaP2pNode { self.swarm.behaviour_mut().kademlia.remove_address(&peer_id, &multiaddr); } } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Kademlia(kad::Event::RoutingUpdated { + peer, + is_new_peer, + addresses, + bucket_range, + old_peer, + })) => { + debug!( + target = "atoma-p2p", + event = "kademlia_routing_updated", + peer = %peer, + is_new_peer = %is_new_peer, + addresses = ?addresses, + bucket_range = ?bucket_range, + old_peer = ?old_peer, + "Kademlia routing updated" + ); + KAD_ROUTING_TABLE_SIZE.record(addresses.len() as u64, &[KeyValue::new("peerId", peer.to_base58())]); + metrics.record(&kad::Event::RoutingUpdated { + peer, + is_new_peer, + addresses, + bucket_range, + old_peer, + }); + } SwarmEvent::ConnectionEstablished { peer_id, num_established, diff --git a/atoma-p2p/src/tests.rs b/atoma-p2p/src/tests.rs index cd212e04..f3900fca 100644 --- a/atoma-p2p/src/tests.rs +++ b/atoma-p2p/src/tests.rs @@ -120,8 +120,7 @@ async fn test_validate_usage_metrics_message_success() { &tx, ) .await; - result.unwrap(); - // assert!(result.is_ok()); + assert!(result.is_ok()); } #[tokio::test]