Skip to content

Commit

Permalink
Measure time it takes to send messages in gossip network (#1282)
Browse files Browse the repository at this point in the history
# Description

Measure the time sending over the substrate network takes and make it
show up in prometheus.

## Type of change

- New feature (non-breaking change which adds functionality)

# Checklist:

---------

Co-authored-by: timorl <timorl@disroot.org>
  • Loading branch information
timorl and timorl authored Jul 10, 2023
1 parent 52a35b0 commit 10cacea
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 37 deletions.
8 changes: 4 additions & 4 deletions finality-aleph/src/aggregation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Module to glue legacy and current version of the aggregator;
use std::{fmt::Debug, hash::Hash, marker::PhantomData, time::Instant};
use std::{marker::PhantomData, time::Instant};

use current_aleph_aggregator::NetworkError as CurrentNetworkError;
use legacy_aleph_aggregator::NetworkError as LegacyNetworkError;
Expand All @@ -9,7 +9,7 @@ use sp_runtime::traits::Block;
use crate::{
abft::SignatureSet,
crypto::Signature,
metrics::Checkpoint,
metrics::{Checkpoint, Key},
mpsc,
network::{
data::{Network, SendError},
Expand Down Expand Up @@ -175,13 +175,13 @@ impl<D: Data, N: Network<D>> NetworkWrapper<D, N> {
}
}

impl<H: Debug + Hash + Eq + Debug + Copy> legacy_aleph_aggregator::Metrics<H> for Metrics<H> {
impl<H: Key> legacy_aleph_aggregator::Metrics<H> for Metrics<H> {
fn report_aggregation_complete(&mut self, h: H) {
self.report_block(h, Instant::now(), Checkpoint::Aggregating);
}
}

impl<H: Debug + Hash + Eq + Debug + Copy> current_aleph_aggregator::Metrics<H> for Metrics<H> {
impl<H: Key> current_aleph_aggregator::Metrics<H> for Metrics<H> {
fn report_aggregation_complete(&mut self, h: H) {
self.report_block(h, Instant::now(), Checkpoint::Aggregating);
}
Expand Down
104 changes: 86 additions & 18 deletions finality-aleph/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ use std::{
use log::{trace, warn};
use lru::LruCache;
use parking_lot::Mutex;
use prometheus_endpoint::{register, Counter, Gauge, PrometheusError, Registry, U64};
use prometheus_endpoint::{
exponential_buckets, prometheus::HistogramTimer, register, Counter, Gauge, Histogram,
HistogramOpts, Opts, PrometheusError, Registry, U64,
};
use sc_service::Arc;

use crate::network::Protocol;

// How many entries (block hash + timestamp) we keep in memory per one checkpoint type.
// Each entry takes 32B (Hash) + 16B (Instant), so a limit of 5000 gives ~234kB (per checkpoint).
// Notice that some issues like finalization stall may lead to incomplete metrics
// (e.g. when the gap between checkpoints for a block grows over `MAX_BLOCKS_PER_CHECKPOINT`).
const MAX_BLOCKS_PER_CHECKPOINT: usize = 5000;

pub trait Key: Hash + Eq + Debug + Copy {}
impl<T: Hash + Eq + Debug + Copy> Key for T {}
pub trait Key: Hash + Eq + Debug + Copy + Send + 'static {}
impl<T: Hash + Eq + Debug + Copy + Send + 'static> Key for T {}

const LOG_TARGET: &str = "aleph-metrics";

Expand All @@ -35,6 +40,8 @@ struct Inner<H: Key> {
sync_handle_task_counter: Counter<U64>,
sync_handle_block_imported_counter: Counter<U64>,
sync_handle_block_finalized_counter: Counter<U64>,

network_send_times: HashMap<Protocol, Histogram>,
}

impl<H: Key> Inner<H> {
Expand Down Expand Up @@ -80,6 +87,10 @@ impl<H: Key> Inner<H> {
}
}
}

fn start_sending_in(&self, protocol: Protocol) -> HistogramTimer {
self.network_send_times[&protocol].start_timer()
}
}

#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
Expand All @@ -92,6 +103,15 @@ pub(crate) enum Checkpoint {
Finalized,
}

fn protocol_name(protocol: Protocol) -> String {
use Protocol::*;
match protocol {
Authentication => "authentication",
BlockSync => "block_sync",
}
.to_string()
}

#[derive(Clone)]
pub struct Metrics<H: Key> {
inner: Option<Arc<Mutex<Inner<H>>>>,
Expand Down Expand Up @@ -126,31 +146,73 @@ impl<H: Key> Metrics<H> {
);
}

use Protocol::*;
let mut network_send_times = HashMap::new();
for key in [Authentication, BlockSync] {
network_send_times.insert(
key,
register(
Histogram::with_opts(HistogramOpts {
common_opts: Opts {
namespace: "gossip_network".to_string(),
subsystem: protocol_name(key),
name: "send_duration".to_string(),
help: "How long did it take for substrate to send a message."
.to_string(),
const_labels: Default::default(),
variable_labels: Default::default(),
},
buckets: exponential_buckets(0.001, 1.26, 30)?,
})?,
registry,
)?,
);
}

let inner = Some(Arc::new(Mutex::new(Inner {
prev,
gauges,
starts: keys
.iter()
.map(|k| (*k, LruCache::new(MAX_BLOCKS_PER_CHECKPOINT)))
.collect(),
sync_broadcast_counter: Counter::new("aleph_sync_broadcast", "no help")?,
sync_send_request_for_counter: Counter::new("aleph_sync_send_request_for", "no help")?,
sync_send_to_counter: Counter::new("aleph_sync_send_to", "no help")?,
sync_handle_state_counter: Counter::new("aleph_sync_handle_state", "no help")?,
sync_handle_justifications_counter: Counter::new(
"aleph_sync_handle_justifications",
"no help",
sync_broadcast_counter: register(
Counter::new("aleph_sync_broadcast", "no help")?,
registry,
)?,
sync_send_request_for_counter: register(
Counter::new("aleph_sync_send_request_for", "no help")?,
registry,
)?,
sync_send_to_counter: register(
Counter::new("aleph_sync_send_to", "no help")?,
registry,
)?,
sync_handle_state_counter: register(
Counter::new("aleph_sync_handle_state", "no help")?,
registry,
)?,
sync_handle_justifications_counter: register(
Counter::new("aleph_sync_handle_justifications", "no help")?,
registry,
)?,
sync_handle_request_counter: Counter::new("aleph_sync_handle_request", "no help")?,
sync_handle_task_counter: Counter::new("aleph_sync_handle_task", "no help")?,
sync_handle_block_imported_counter: Counter::new(
"aleph_sync_handle_block_imported",
"no help",
sync_handle_request_counter: register(
Counter::new("aleph_sync_handle_request", "no help")?,
registry,
)?,
sync_handle_block_finalized_counter: Counter::new(
"aleph_sync_handle_block_finalized",
"no help",
sync_handle_task_counter: register(
Counter::new("aleph_sync_handle_task", "no help")?,
registry,
)?,
sync_handle_block_imported_counter: register(
Counter::new("aleph_sync_handle_block_imported", "no help")?,
registry,
)?,
sync_handle_block_finalized_counter: register(
Counter::new("aleph_sync_handle_block_finalized", "no help")?,
registry,
)?,
network_send_times,
})));

Ok(Metrics { inner })
Expand Down Expand Up @@ -222,6 +284,12 @@ impl<H: Key> Metrics<H> {
inner.lock().sync_handle_block_finalized_counter.inc();
}
}

pub fn start_sending_in(&self, protocol: Protocol) -> Option<HistogramTimer> {
self.inner
.as_ref()
.map(|inner| inner.lock().start_sending_in(protocol))
}
}

#[cfg(test)]
Expand Down
42 changes: 29 additions & 13 deletions finality-aleph/src/network/gossip/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ use tokio::time;
const QUEUE_SIZE_WARNING: i64 = 1_000;

use crate::{
metrics::Key,
network::{
gossip::{Event, EventStream, Network, NetworkSender, Protocol, RawNetwork},
Data,
},
SpawnHandle, STATUS_REPORT_INTERVAL,
Metrics, SpawnHandle, STATUS_REPORT_INTERVAL,
};

enum Command<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
Expand All @@ -34,7 +35,7 @@ enum Command<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
/// 1. Messages are forwarded to the user.
/// 2. Various forms of (dis)connecting, keeping track of all currently connected nodes.
/// 3. Outgoing messages, sending them out, using 1.2. to broadcast.
pub struct Service<N: RawNetwork, AD: Data, BSD: Data> {
pub struct Service<N: RawNetwork, AD: Data, BSD: Data, H: Key> {
network: N,
messages_from_authentication_user: mpsc::UnboundedReceiver<Command<AD, N::PeerId>>,
messages_from_block_sync_user: mpsc::UnboundedReceiver<Command<BSD, N::PeerId>>,
Expand All @@ -45,6 +46,7 @@ pub struct Service<N: RawNetwork, AD: Data, BSD: Data> {
block_sync_connected_peers: HashSet<N::PeerId>,
block_sync_peer_senders: HashMap<N::PeerId, TracingUnboundedSender<BSD>>,
spawn_handle: SpawnHandle,
metrics: Metrics<H>,
}

struct ServiceInterface<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
Expand Down Expand Up @@ -110,12 +112,13 @@ enum SendError {
SendingFailed,
}

impl<N: RawNetwork, AD: Data, BSD: Data> Service<N, AD, BSD> {
impl<N: RawNetwork, AD: Data, BSD: Data, H: Key> Service<N, AD, BSD, H> {
pub fn new(
network: N,
spawn_handle: SpawnHandle,
metrics: Metrics<H>,
) -> (
Service<N, AD, BSD>,
Self,
impl Network<AD, Error = Error, PeerId = N::PeerId>,
impl Network<BSD, Error = Error, PeerId = N::PeerId>,
) {
Expand All @@ -137,6 +140,7 @@ impl<N: RawNetwork, AD: Data, BSD: Data> Service<N, AD, BSD> {
authentication_peer_senders: HashMap::new(),
block_sync_connected_peers: HashSet::new(),
block_sync_peer_senders: HashMap::new(),
metrics,
},
ServiceInterface {
messages_from_service: messages_from_authentication_service,
Expand Down Expand Up @@ -170,6 +174,7 @@ impl<N: RawNetwork, AD: Data, BSD: Data> Service<N, AD, BSD> {
protocol: Protocol,
) -> impl Future<Output = ()> + Send + 'static {
let network = self.network.clone();
let metrics = self.metrics.clone();
async move {
let mut sender = None;
loop {
Expand All @@ -185,10 +190,14 @@ impl<N: RawNetwork, AD: Data, BSD: Data> Service<N, AD, BSD> {
}
}
};
let maybe_timer = metrics.start_sending_in(protocol);
if let Err(e) = s.send(data.encode()).await {
debug!(target: "aleph-network", "Failed sending data to peer. Dropping sender and message: {}", e);
sender = None;
}
if let Some(timer) = maybe_timer {
timer.observe_duration();
}
} else {
debug!(target: "aleph-network", "Sender was dropped for peer {:?}. Peer sender exiting.", peer_id);
return;
Expand Down Expand Up @@ -447,21 +456,25 @@ mod tests {
use tokio::runtime::Handle;

use super::{Error, SendError, Service};
use crate::network::{
gossip::{
mock::{MockEvent, MockRawNetwork, MockSenderError},
Network,
use crate::{
metrics::Metrics,
network::{
gossip::{
mock::{MockEvent, MockRawNetwork, MockSenderError},
Network,
},
mock::MockData,
Protocol,
},
mock::MockData,
Protocol,
testing::mocks::THash,
};

const PROTOCOL: Protocol = Protocol::Authentication;

pub struct TestData {
pub network: MockRawNetwork,
gossip_network: Box<dyn Network<MockData, Error = Error, PeerId = MockPublicKey>>,
pub service: Service<MockRawNetwork, MockData, MockData>,
pub service: Service<MockRawNetwork, MockData, MockData, THash>,
// `TaskManager` can't be dropped for `SpawnTaskHandle` to work
_task_manager: TaskManager,
// If we drop the sync network, the underlying network service dies, stopping the whole
Expand All @@ -478,8 +491,11 @@ mod tests {

// Prepare service
let network = MockRawNetwork::new(event_stream_oneshot_tx);
let (service, gossip_network, other_network) =
Service::new(network.clone(), task_manager.spawn_handle().into());
let (service, gossip_network, other_network) = Service::new(
network.clone(),
task_manager.spawn_handle().into(),
Metrics::<THash>::noop(),
);
let gossip_network = Box::new(gossip_network);
let other_network = Box::new(other_network);

Expand Down
1 change: 1 addition & 0 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub async fn run_validator_node<B, H, C, BE, SC>(
let (gossip_network_service, authentication_network, block_sync_network) = GossipService::new(
SubstrateNetwork::new(network.clone(), protocol_naming),
spawn_handle.clone(),
metrics.clone(),
);
let gossip_network_task = async move { gossip_network_service.run().await };

Expand Down
9 changes: 7 additions & 2 deletions finality-aleph/src/testing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use crate::{
},
GossipError, GossipNetwork, GossipService, MockEvent, MockRawNetwork, Protocol,
},
MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod,
testing::mocks::THash,
Metrics, MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod,
};

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -101,7 +102,11 @@ async fn prepare_one_session_test_data() -> TestData {
let validator_network = MockCliqueNetwork::new();

let (gossip_service, gossip_network, sync_network) =
GossipService::<_, _, MockData>::new(network.clone(), task_manager.spawn_handle().into());
GossipService::<_, _, MockData, THash>::new(
network.clone(),
task_manager.spawn_handle().into(),
Metrics::noop(),
);

let (connection_manager_service, session_manager) = ConnectionManager::new(
authorities[0].address(),
Expand Down

0 comments on commit 10cacea

Please sign in to comment.