diff --git a/finality-aleph/src/metrics.rs b/finality-aleph/src/metrics.rs index 8c9d34b7..4dfa2e8f 100644 --- a/finality-aleph/src/metrics.rs +++ b/finality-aleph/src/metrics.rs @@ -8,7 +8,7 @@ use std::{ use log::{trace, warn}; use lru::LruCache; use parking_lot::Mutex; -use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64}; +use prometheus_endpoint::{register, Counter, Gauge, PrometheusError, Registry, U64}; use sc_service::Arc; // How many entries (block hash + timestamp) we keep in memory per one checkpoint type. @@ -26,6 +26,15 @@ struct Inner { prev: HashMap, gauges: HashMap>, starts: HashMap>, + sync_broadcast_counter: Counter, + sync_send_request_for_counter: Counter, + sync_send_to_counter: Counter, + sync_handle_state_counter: Counter, + sync_handle_justifications_counter: Counter, + sync_handle_request_counter: Counter, + sync_handle_task_counter: Counter, + sync_handle_block_imported_counter: Counter, + sync_handle_block_finalized_counter: Counter, } impl Inner { @@ -124,6 +133,24 @@ impl Metrics { .iter() .map(|k| (*k, LruCache::new(MAX_BLOCKS_PER_CHECKPOINT))) .collect(), + sync_broadcast_counter: Counter::new("aleph_sync_broadcast", "no help")?, + sync_send_request_for_counter: Counter::new("aleph_sync_send_request_for", "no help")?, + sync_send_to_counter: Counter::new("aleph_sync_send_to", "no help")?, + sync_handle_state_counter: Counter::new("aleph_sync_handle_state", "no help")?, + sync_handle_justifications_counter: Counter::new( + "aleph_sync_handle_justifications", + "no help", + )?, + sync_handle_request_counter: Counter::new("aleph_sync_handle_request", "no help")?, + sync_handle_task_counter: Counter::new("aleph_sync_handle_task", "no help")?, + sync_handle_block_imported_counter: Counter::new( + "aleph_sync_handle_block_imported", + "no help", + )?, + sync_handle_block_finalized_counter: Counter::new( + "aleph_sync_handle_block_finalized", + "no help", + )?, }))); Ok(Metrics { inner }) @@ -141,6 +168,60 @@ impl Metrics { .report_block(hash, checkpoint_time, checkpoint_type); } } + + pub fn report_sync_broadcast(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_broadcast_counter.inc(); + } + } + + pub fn report_sync_send_request_for(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_send_request_for_counter.inc(); + } + } + + pub fn report_sync_send_to(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_send_to_counter.inc(); + } + } + + pub fn report_sync_handle_state(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_state_counter.inc(); + } + } + + pub fn report_sync_handle_justifications(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_justifications_counter.inc(); + } + } + + pub fn report_sync_handle_request(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_request_counter.inc(); + } + } + + pub fn report_sync_handle_task(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_task_counter.inc(); + } + } + + pub fn report_sync_handle_block_imported(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_block_imported_counter.inc(); + } + } + + pub fn report_sync_handle_block_finalized(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_block_finalized_counter.inc(); + } + } } #[cfg(test)] diff --git a/finality-aleph/src/nodes.rs b/finality-aleph/src/nodes.rs index 705d0887..e9705bef 100644 --- a/finality-aleph/src/nodes.rs +++ b/finality-aleph/src/nodes.rs @@ -158,6 +158,7 @@ pub async fn run_validator_node( finalizer, session_period, justification_rx, + metrics.clone(), ) { Ok(x) => x, Err(e) => panic!("Failed to initialize Sync service: {}", e), diff --git a/finality-aleph/src/sync/service.rs b/finality-aleph/src/sync/service.rs index a9eecc67..37c70342 100644 --- a/finality-aleph/src/sync/service.rs +++ b/finality-aleph/src/sync/service.rs @@ -5,6 +5,7 @@ use log::{debug, error, trace, warn}; use tokio::time::{interval_at, Instant}; use crate::{ + metrics::Key, network::GossipNetwork, sync::{ data::{ @@ -17,7 +18,7 @@ use crate::{ BlockIdFor, BlockIdentifier, ChainStatus, ChainStatusNotification, ChainStatusNotifier, Finalizer, Header, Justification, JustificationSubmissions, Verifier, LOG_TARGET, }, - SessionPeriod, + Metrics, SessionPeriod, }; const BROADCAST_COOLDOWN: Duration = Duration::from_millis(200); @@ -32,6 +33,7 @@ pub struct Service< CS: ChainStatus, V: Verifier, F: Finalizer, + H: Key, > { network: VersionWrapper, handler: Handler, @@ -40,6 +42,7 @@ pub struct Service< chain_events: CE, justifications_from_user: mpsc::UnboundedReceiver, additional_justifications_from_user: mpsc::UnboundedReceiver, + metrics: Metrics, } impl JustificationSubmissions for mpsc::UnboundedSender { @@ -57,10 +60,12 @@ impl< CS: ChainStatus, V: Verifier, F: Finalizer, - > Service + H: Key, + > Service { /// Create a new service using the provided network for communication. Also returns an /// interface for submitting additional justifications. + #[allow(clippy::too_many_arguments)] pub fn new( network: N, chain_events: CE, @@ -69,6 +74,7 @@ impl< finalizer: F, period: SessionPeriod, additional_justifications_from_user: mpsc::UnboundedReceiver, + metrics: Metrics, ) -> Result<(Self, impl JustificationSubmissions + Clone), HandlerError> { let network = VersionWrapper::new(network); let handler = Handler::new(chain_status, verifier, finalizer, period)?; @@ -84,6 +90,7 @@ impl< chain_events, justifications_from_user, additional_justifications_from_user, + metrics, }, justifications_for_sync, )) @@ -113,6 +120,7 @@ impl< } }; trace!(target: LOG_TARGET, "Broadcasting state: {:?}", state); + self.metrics.report_sync_broadcast(); let data = NetworkData::StateBroadcast(state); if let Err(e) = self.network.broadcast(data) { warn!(target: LOG_TARGET, "Error sending broadcast: {}.", e); @@ -137,6 +145,7 @@ impl< }; let request = Request::new(block_id, branch_knowledge, state); trace!(target: LOG_TARGET, "Sending a request: {:?}", request); + self.metrics.report_sync_send_request_for(); let data = NetworkData::Request(request); if let Err(e) = self.network.send_to_random(data, peers) { warn!(target: LOG_TARGET, "Error sending request: {}.", e); @@ -144,6 +153,13 @@ impl< } fn send_to(&mut self, data: NetworkData, peer: N::PeerId) { + trace!( + target: LOG_TARGET, + "Sending data {:?} to peer {:?}", + data, + peer + ); + self.metrics.report_sync_send_to(); if let Err(e) = self.network.send_to(data, peer) { warn!(target: LOG_TARGET, "Error sending response: {}.", e); } @@ -165,6 +181,7 @@ impl< state, peer ); + self.metrics.report_sync_handle_state(); match self.handler.handle_state(state, peer.clone()) { Ok(action) => self.perform_sync_action(action, peer), Err(e) => warn!( @@ -184,6 +201,7 @@ impl< "Handling {:?} justifications.", justifications.len() ); + self.metrics.report_sync_handle_justifications(); let mut previous_block_id = None; for justification in justifications { let maybe_block_id = match self @@ -235,6 +253,7 @@ impl< request, peer ); + self.metrics.report_sync_handle_request(); match self.handler.handle_request(request) { Ok(action) => self.perform_sync_action(action, peer), Err(e) => { @@ -271,6 +290,7 @@ impl< fn handle_task(&mut self, block_id: BlockIdFor) { trace!(target: LOG_TARGET, "Handling a task for {:?}.", block_id); + self.metrics.report_sync_handle_task(); use Interest::*; match self.handler.block_state(&block_id) { HighestJustified { @@ -300,6 +320,7 @@ impl< match event { BlockImported(header) => { trace!(target: LOG_TARGET, "Handling a new imported block."); + self.metrics.report_sync_handle_block_imported(); if let Err(e) = self.handler.block_imported(header) { error!( target: LOG_TARGET, @@ -309,6 +330,7 @@ impl< } BlockFinalized(_) => { trace!(target: LOG_TARGET, "Handling a new finalized block."); + self.metrics.report_sync_handle_block_finalized(); if self.broadcast_ticker.try_tick() { self.broadcast(); }