Skip to content

Commit

Permalink
A0-2797: Add sync metrics (#1281)
Browse files Browse the repository at this point in the history
  • Loading branch information
lesniak43 authored Jul 10, 2023
1 parent 4c49b6d commit 52a35b0
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 3 deletions.
83 changes: 82 additions & 1 deletion finality-aleph/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use log::{trace, warn};
use lru::LruCache;
use parking_lot::Mutex;
use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
use prometheus_endpoint::{register, Counter, Gauge, PrometheusError, Registry, U64};
use sc_service::Arc;

// How many entries (block hash + timestamp) we keep in memory per one checkpoint type.
Expand All @@ -26,6 +26,15 @@ struct Inner<H: Key> {
prev: HashMap<Checkpoint, Checkpoint>,
gauges: HashMap<Checkpoint, Gauge<U64>>,
starts: HashMap<Checkpoint, LruCache<H, Instant>>,
sync_broadcast_counter: Counter<U64>,
sync_send_request_for_counter: Counter<U64>,
sync_send_to_counter: Counter<U64>,
sync_handle_state_counter: Counter<U64>,
sync_handle_justifications_counter: Counter<U64>,
sync_handle_request_counter: Counter<U64>,
sync_handle_task_counter: Counter<U64>,
sync_handle_block_imported_counter: Counter<U64>,
sync_handle_block_finalized_counter: Counter<U64>,
}

impl<H: Key> Inner<H> {
Expand Down Expand Up @@ -124,6 +133,24 @@ impl<H: Key> Metrics<H> {
.iter()
.map(|k| (*k, LruCache::new(MAX_BLOCKS_PER_CHECKPOINT)))
.collect(),
sync_broadcast_counter: Counter::new("aleph_sync_broadcast", "no help")?,
sync_send_request_for_counter: Counter::new("aleph_sync_send_request_for", "no help")?,
sync_send_to_counter: Counter::new("aleph_sync_send_to", "no help")?,
sync_handle_state_counter: Counter::new("aleph_sync_handle_state", "no help")?,
sync_handle_justifications_counter: Counter::new(
"aleph_sync_handle_justifications",
"no help",
)?,
sync_handle_request_counter: Counter::new("aleph_sync_handle_request", "no help")?,
sync_handle_task_counter: Counter::new("aleph_sync_handle_task", "no help")?,
sync_handle_block_imported_counter: Counter::new(
"aleph_sync_handle_block_imported",
"no help",
)?,
sync_handle_block_finalized_counter: Counter::new(
"aleph_sync_handle_block_finalized",
"no help",
)?,
})));

Ok(Metrics { inner })
Expand All @@ -141,6 +168,60 @@ impl<H: Key> Metrics<H> {
.report_block(hash, checkpoint_time, checkpoint_type);
}
}

pub fn report_sync_broadcast(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_broadcast_counter.inc();
}
}

pub fn report_sync_send_request_for(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_send_request_for_counter.inc();
}
}

pub fn report_sync_send_to(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_send_to_counter.inc();
}
}

pub fn report_sync_handle_state(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_state_counter.inc();
}
}

pub fn report_sync_handle_justifications(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_justifications_counter.inc();
}
}

pub fn report_sync_handle_request(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_request_counter.inc();
}
}

pub fn report_sync_handle_task(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_task_counter.inc();
}
}

pub fn report_sync_handle_block_imported(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_block_imported_counter.inc();
}
}

pub fn report_sync_handle_block_finalized(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_block_finalized_counter.inc();
}
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ pub async fn run_validator_node<B, H, C, BE, SC>(
finalizer,
session_period,
justification_rx,
metrics.clone(),
) {
Ok(x) => x,
Err(e) => panic!("Failed to initialize Sync service: {}", e),
Expand Down
26 changes: 24 additions & 2 deletions finality-aleph/src/sync/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use log::{debug, error, trace, warn};
use tokio::time::{interval_at, Instant};

use crate::{
metrics::Key,
network::GossipNetwork,
sync::{
data::{
Expand All @@ -17,7 +18,7 @@ use crate::{
BlockIdFor, BlockIdentifier, ChainStatus, ChainStatusNotification, ChainStatusNotifier,
Finalizer, Header, Justification, JustificationSubmissions, Verifier, LOG_TARGET,
},
SessionPeriod,
Metrics, SessionPeriod,
};

const BROADCAST_COOLDOWN: Duration = Duration::from_millis(200);
Expand All @@ -32,6 +33,7 @@ pub struct Service<
CS: ChainStatus<J>,
V: Verifier<J>,
F: Finalizer<J>,
H: Key,
> {
network: VersionWrapper<J, N>,
handler: Handler<N::PeerId, J, CS, V, F>,
Expand All @@ -40,6 +42,7 @@ pub struct Service<
chain_events: CE,
justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
additional_justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
metrics: Metrics<H>,
}

impl<J: Justification> JustificationSubmissions<J> for mpsc::UnboundedSender<J::Unverified> {
Expand All @@ -57,10 +60,12 @@ impl<
CS: ChainStatus<J>,
V: Verifier<J>,
F: Finalizer<J>,
> Service<J, N, CE, CS, V, F>
H: Key,
> Service<J, N, CE, CS, V, F, H>
{
/// Create a new service using the provided network for communication. Also returns an
/// interface for submitting additional justifications.
#[allow(clippy::too_many_arguments)]
pub fn new(
network: N,
chain_events: CE,
Expand All @@ -69,6 +74,7 @@ impl<
finalizer: F,
period: SessionPeriod,
additional_justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
metrics: Metrics<H>,
) -> Result<(Self, impl JustificationSubmissions<J> + Clone), HandlerError<J, CS, V, F>> {
let network = VersionWrapper::new(network);
let handler = Handler::new(chain_status, verifier, finalizer, period)?;
Expand All @@ -84,6 +90,7 @@ impl<
chain_events,
justifications_from_user,
additional_justifications_from_user,
metrics,
},
justifications_for_sync,
))
Expand Down Expand Up @@ -113,6 +120,7 @@ impl<
}
};
trace!(target: LOG_TARGET, "Broadcasting state: {:?}", state);
self.metrics.report_sync_broadcast();
let data = NetworkData::StateBroadcast(state);
if let Err(e) = self.network.broadcast(data) {
warn!(target: LOG_TARGET, "Error sending broadcast: {}.", e);
Expand All @@ -137,13 +145,21 @@ impl<
};
let request = Request::new(block_id, branch_knowledge, state);
trace!(target: LOG_TARGET, "Sending a request: {:?}", request);
self.metrics.report_sync_send_request_for();
let data = NetworkData::Request(request);
if let Err(e) = self.network.send_to_random(data, peers) {
warn!(target: LOG_TARGET, "Error sending request: {}.", e);
}
}

fn send_to(&mut self, data: NetworkData<J>, peer: N::PeerId) {
trace!(
target: LOG_TARGET,
"Sending data {:?} to peer {:?}",
data,
peer
);
self.metrics.report_sync_send_to();
if let Err(e) = self.network.send_to(data, peer) {
warn!(target: LOG_TARGET, "Error sending response: {}.", e);
}
Expand All @@ -165,6 +181,7 @@ impl<
state,
peer
);
self.metrics.report_sync_handle_state();
match self.handler.handle_state(state, peer.clone()) {
Ok(action) => self.perform_sync_action(action, peer),
Err(e) => warn!(
Expand All @@ -184,6 +201,7 @@ impl<
"Handling {:?} justifications.",
justifications.len()
);
self.metrics.report_sync_handle_justifications();
let mut previous_block_id = None;
for justification in justifications {
let maybe_block_id = match self
Expand Down Expand Up @@ -235,6 +253,7 @@ impl<
request,
peer
);
self.metrics.report_sync_handle_request();
match self.handler.handle_request(request) {
Ok(action) => self.perform_sync_action(action, peer),
Err(e) => {
Expand Down Expand Up @@ -271,6 +290,7 @@ impl<

fn handle_task(&mut self, block_id: BlockIdFor<J>) {
trace!(target: LOG_TARGET, "Handling a task for {:?}.", block_id);
self.metrics.report_sync_handle_task();
use Interest::*;
match self.handler.block_state(&block_id) {
HighestJustified {
Expand Down Expand Up @@ -300,6 +320,7 @@ impl<
match event {
BlockImported(header) => {
trace!(target: LOG_TARGET, "Handling a new imported block.");
self.metrics.report_sync_handle_block_imported();
if let Err(e) = self.handler.block_imported(header) {
error!(
target: LOG_TARGET,
Expand All @@ -309,6 +330,7 @@ impl<
}
BlockFinalized(_) => {
trace!(target: LOG_TARGET, "Handling a new finalized block.");
self.metrics.report_sync_handle_block_finalized();
if self.broadcast_ticker.try_tick() {
self.broadcast();
}
Expand Down

0 comments on commit 52a35b0

Please sign in to comment.