From e61b5e2d172f782e953f351a81723713b150c57d Mon Sep 17 00:00:00 2001 From: Hansie Odendaal <39146854+hansieodendaal@users.noreply.github.com> Date: Fri, 1 Nov 2024 09:22:36 +0200 Subject: [PATCH] feat: verify active base node peer connections and disconnect if stale (#6655) Description --- Added check connections to the p2p services (`MonitorPeersService`). All active connections are pinged on a set (slowish) interval (10 times slower than the _auto ping metadata interval_). The nodes that do not respond timeously on three consecutive iterations with a corresponding pong are disconnected. This will help keep the list of active connections (lazily) up to date. **Edit:** Fixed an error in the liveness service where misbehaving ping peers were never disconnected. The liveness service and monitor peers service work hand in hand. Liveness selects 8 randomly connected peers to obtain metadata from and will disconnect any of those that misbehave after 1 minute (2x ping intervals). The monitor peers service assesses all connected peers at a much slower pace and disconnects non-responsive peers after 15 minutes (10 x 3 ping intervals). Motivation and Context --- See #6516 How Has This Been Tested? --- Performed system-level testing. From the log below we can see that 5 of 41 active peer connections did not respond with a ping. Peer `e19e1454a1e0519866297960ad ` was disconnected because it did not respond three times in a row, ``` 2024-10-29 15:12:07.664466900 [minotari::base_node::monitor_peers] TRACE Found 5 of 41 outbound base node peer connections that did not respond to pings 2024-10-29 15:12:07.664619800 [minotari::base_node::monitor_peers] TRACE Peer e2fa82050c2f7579febafb7e08 stats - (iteration, connected, responsive) [(3, true, true), (4, true, true), (5, true, false)] 2024-10-29 15:12:07.664683300 [minotari::base_node::monitor_peers] DEBUG Disconnecting e19e1454a1e0519866297960ad as the peer is no longer responsive - (iteration, connected, responsive) [(2, true, true), (3, true, false), (4, true, false), (5, true, false)] 2024-10-29 15:12:07.665853300 [minotari::base_node::monitor_peers] TRACE Peer 6ea597117476676d5ddcb18153 stats - (iteration, connected, responsive) [(1, true, true), (2, true, true), (3, true, true), (4, true, true), (5, true, false)] 2024-10-29 15:12:07.665965500 [minotari::base_node::monitor_peers] TRACE Peer a671f812efe5ab14cbb3c1f9f4 stats - (iteration, connected, responsive) [(2, true, true), (3, true, true), (4, true, true), (5, true, false)] 2024-10-29 15:12:07.665997800 [minotari::base_node::monitor_peers] TRACE Peer e336b264e02f611cf4fbf51f22 stats - (iteration, connected, responsive) [(2, true, true), (3, true, true), (4, true, true), (5, true, false)] ``` What process can a PR reviewer use to test or verify this change? --- - Code review - System-level testing Breaking Changes --- - [x] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [ ] Other - Please specify --------- Co-authored-by: SW van Heerden --- applications/minotari_node/src/bootstrap.rs | 8 +- .../src/commands/command/ping_peer.rs | 49 +-- applications/minotari_node/src/lib.rs | 3 +- .../chain_metadata_service/service.rs | 12 +- .../p2p/src/services/liveness/handle.rs | 27 +- base_layer/p2p/src/services/liveness/mock.rs | 12 +- base_layer/p2p/src/services/liveness/mod.rs | 3 +- .../p2p/src/services/liveness/service.rs | 78 +++-- base_layer/p2p/src/services/liveness/state.rs | 26 +- base_layer/p2p/src/services/mod.rs | 1 + .../p2p/src/services/monitor_peers/mod.rs | 90 +++++ .../p2p/src/services/monitor_peers/service.rs | 324 ++++++++++++++++++ 12 files changed, 554 insertions(+), 79 deletions(-) create mode 100644 base_layer/p2p/src/services/monitor_peers/mod.rs create mode 100644 base_layer/p2p/src/services/monitor_peers/service.rs diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index fc41b77cfd..6620e7bd07 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -59,7 +59,10 @@ use tari_p2p::{ initialization, initialization::P2pInitializer, peer_seeds::SeedPeer, - services::liveness::{config::LivenessConfig, LivenessInitializer}, + services::{ + liveness::{config::LivenessConfig, LivenessInitializer}, + monitor_peers::MonitorPeersInitializer, + }, P2pConfig, TransportType, }; @@ -155,6 +158,9 @@ where B: BlockchainBackend + 'static }, peer_message_subscriptions, )) + .add_initializer(MonitorPeersInitializer::new( + base_node_config.metadata_auto_ping_interval, + )) .add_initializer(ChainMetadataServiceInitializer) .add_initializer(BaseNodeStateMachineInitializer::new( self.db.clone().into(), diff --git a/applications/minotari_node/src/commands/command/ping_peer.rs b/applications/minotari_node/src/commands/command/ping_peer.rs index ea37a8fdcc..55553756b6 100644 --- a/applications/minotari_node/src/commands/command/ping_peer.rs +++ b/applications/minotari_node/src/commands/command/ping_peer.rs @@ -45,34 +45,39 @@ impl HandleCommand for CommandContext { } impl CommandContext { - /// Function to process the dial-peer command + /// Function to process the ping-peer command pub async fn ping_peer(&mut self, dest_node_id: NodeId) -> Result<(), Error> { - println!("🏓 Pinging peer..."); let mut liveness_events = self.liveness.get_event_stream(); let mut liveness = self.liveness.clone(); task::spawn(async move { - if let Err(e) = liveness.send_ping(dest_node_id.clone()).await { - println!("🏓 Ping failed to send to {}: {}", dest_node_id, e); - return; - } - loop { - match liveness_events.recv().await { - Ok(event) => { - if let LivenessEvent::ReceivedPong(pong) = &*event { - if pong.node_id == dest_node_id { - println!( - "🏓️ Pong received, round-trip-time is {:.2?}!", - pong.latency.unwrap_or_default() - ); + match liveness.send_ping(dest_node_id.clone()).await { + Ok(nonce) => { + println!("🏓 Pinging peer {} with nonce {} ...", dest_node_id, nonce); + loop { + match liveness_events.recv().await { + Ok(event) => { + if let LivenessEvent::ReceivedPong(pong) = &*event { + if pong.node_id == dest_node_id && pong.nonce == nonce { + println!( + "🏓️ Pong: peer {} responded with nonce {}, round-trip-time is {:.2?}!", + pong.node_id, + pong.nonce, + pong.latency.unwrap_or_default() + ); + break; + } + } + }, + Err(RecvError::Closed) => { break; - } + }, + Err(RecvError::Lagged(_)) => {}, } - }, - Err(RecvError::Closed) => { - break; - }, - Err(RecvError::Lagged(_)) => {}, - } + } + }, + Err(e) => { + println!("🏓 Ping failed to send to {}: {}", dest_node_id, e); + }, } }); Ok(()) diff --git a/applications/minotari_node/src/lib.rs b/applications/minotari_node/src/lib.rs index 99ebebf22b..2c40ac2101 100644 --- a/applications/minotari_node/src/lib.rs +++ b/applications/minotari_node/src/lib.rs @@ -37,6 +37,7 @@ mod grpc_method; mod metrics; mod recovery; mod utils; + use std::{process, sync::Arc}; use commands::{cli_loop::CliLoop, command::CommandContext}; @@ -151,7 +152,7 @@ pub async fn run_base_node_with_cli( } // Run, node, run! - let context = CommandContext::new(&ctx, shutdown); + let context = CommandContext::new(&ctx, shutdown.clone()); let main_loop = CliLoop::new(context, cli.watch, cli.non_interactive_mode); if cli.non_interactive_mode { println!("Node started in non-interactive mode (pid = {})", process::id()); diff --git a/base_layer/core/src/base_node/chain_metadata_service/service.rs b/base_layer/core/src/base_node/chain_metadata_service/service.rs index 58d8768ac2..ebe656e571 100644 --- a/base_layer/core/src/base_node/chain_metadata_service/service.rs +++ b/base_layer/core/src/base_node/chain_metadata_service/service.rs @@ -141,10 +141,6 @@ impl ChainMetadataService { match event { // Received a ping, check if it contains ChainMetadata LivenessEvent::ReceivedPing(event) => { - debug!( - target: LOG_TARGET, - "Received ping from neighbouring node '{}'.", event.node_id - ); self.number_of_rounds_no_pings = 0; if event.metadata.has(MetadataKey::ChainMetadata) { self.send_chain_metadata_to_event_publisher(event).await?; @@ -152,11 +148,6 @@ impl ChainMetadataService { }, // Received a pong, check if our neighbour sent it and it contains ChainMetadata LivenessEvent::ReceivedPong(event) => { - trace!( - target: LOG_TARGET, - "Received pong from neighbouring node '{}'.", - event.node_id - ); self.number_of_rounds_no_pings = 0; if event.metadata.has(MetadataKey::ChainMetadata) { self.send_chain_metadata_to_event_publisher(event).await?; @@ -325,6 +316,7 @@ mod test { metadata, node_id: node_id.clone(), latency: None, + nonce: 0, }; let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event)); @@ -347,6 +339,7 @@ mod test { metadata, node_id, latency: None, + nonce: 0, }; let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event)); @@ -365,6 +358,7 @@ mod test { metadata, node_id, latency: None, + nonce: 0, }; let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event)); diff --git a/base_layer/p2p/src/services/liveness/handle.rs b/base_layer/p2p/src/services/liveness/handle.rs index 2eabd07c9d..96696c528f 100644 --- a/base_layer/p2p/src/services/liveness/handle.rs +++ b/base_layer/p2p/src/services/liveness/handle.rs @@ -35,6 +35,8 @@ use crate::proto::liveness::MetadataKey; pub enum LivenessRequest { /// Send a ping to the given node ID SendPing(NodeId), + /// Ping a list of peers + SendPings(Vec), /// Retrieve the total number of pings received GetPingCount, /// Retrieve the total number of pongs received @@ -55,7 +57,7 @@ pub enum LivenessRequest { #[derive(Debug)] pub enum LivenessResponse { /// Indicates that the request succeeded - Ok, + Ok(Option>), /// Used to return a counter value from `GetPingCount` and `GetPongCount` Count(usize), /// Response for GetAvgLatency and GetNetworkAvgLatency @@ -84,14 +86,17 @@ pub struct PingPongEvent { pub latency: Option, /// Metadata of the corresponding node pub metadata: Metadata, + /// The nonce of the ping/pong message, for clients that want to match pings with pongs + pub nonce: u64, } impl PingPongEvent { - pub fn new(node_id: NodeId, latency: Option, metadata: Metadata) -> Self { + pub fn new(node_id: NodeId, latency: Option, metadata: Metadata, nonce: u64) -> Self { Self { node_id, latency, metadata, + nonce, } } } @@ -122,9 +127,17 @@ impl LivenessHandle { } /// Send a ping to a given node ID - pub async fn send_ping(&mut self, node_id: NodeId) -> Result<(), LivenessError> { + pub async fn send_ping(&mut self, node_id: NodeId) -> Result { match self.handle.call(LivenessRequest::SendPing(node_id)).await?? { - LivenessResponse::Ok => Ok(()), + LivenessResponse::Ok(Some(nonces)) => Ok(nonces[0]), + _ => Err(LivenessError::UnexpectedApiResponse), + } + } + + /// Send pings to a list of peers + pub async fn send_pings(&mut self, node_ids: Vec) -> Result, LivenessError> { + match self.handle.call(LivenessRequest::SendPings(node_ids)).await?? { + LivenessResponse::Ok(Some(nonces)) => Ok(nonces), _ => Err(LivenessError::UnexpectedApiResponse), } } @@ -152,7 +165,7 @@ impl LivenessHandle { .call(LivenessRequest::SetMetadataEntry(key, value)) .await?? { - LivenessResponse::Ok => Ok(()), + LivenessResponse::Ok(_) => Ok(()), _ => Err(LivenessError::UnexpectedApiResponse), } } @@ -160,7 +173,7 @@ impl LivenessHandle { /// Add a monitored peer to the basic config if not present pub async fn check_add_monitored_peer(&mut self, node_id: NodeId) -> Result<(), LivenessError> { match self.handle.call(LivenessRequest::AddMonitoredPeer(node_id)).await?? { - LivenessResponse::Ok => Ok(()), + LivenessResponse::Ok(_) => Ok(()), _ => Err(LivenessError::UnexpectedApiResponse), } } @@ -172,7 +185,7 @@ impl LivenessHandle { .call(LivenessRequest::RemoveMonitoredPeer(node_id)) .await?? { - LivenessResponse::Ok => Ok(()), + LivenessResponse::Ok(_) => Ok(()), _ => Err(LivenessError::UnexpectedApiResponse), } } diff --git a/base_layer/p2p/src/services/liveness/mock.rs b/base_layer/p2p/src/services/liveness/mock.rs index 652531a17f..9b765fe5d3 100644 --- a/base_layer/p2p/src/services/liveness/mock.rs +++ b/base_layer/p2p/src/services/liveness/mock.rs @@ -125,7 +125,11 @@ impl LivenessMock { self.mock_state.add_request_call(req.clone()); match req { SendPing(_) => { - reply.send(Ok(LivenessResponse::Ok)).unwrap(); + reply.send(Ok(LivenessResponse::Ok(Some(vec![0])))).unwrap(); + }, + SendPings(node_ids) => { + let nonces: Vec = (0..node_ids.len() as u64).collect(); + reply.send(Ok(LivenessResponse::Ok(Some(nonces)))).unwrap(); }, GetPingCount => { reply.send(Ok(LivenessResponse::Count(1))).unwrap(); @@ -140,13 +144,13 @@ impl LivenessMock { reply.send(Ok(LivenessResponse::AvgLatency(None))).unwrap(); }, SetMetadataEntry(_, _) => { - reply.send(Ok(LivenessResponse::Ok)).unwrap(); + reply.send(Ok(LivenessResponse::Ok(None))).unwrap(); }, AddMonitoredPeer(_) => { - reply.send(Ok(LivenessResponse::Ok)).unwrap(); + reply.send(Ok(LivenessResponse::Ok(None))).unwrap(); }, RemoveMonitoredPeer(_) => { - reply.send(Ok(LivenessResponse::Ok)).unwrap(); + reply.send(Ok(LivenessResponse::Ok(None))).unwrap(); }, } } diff --git a/base_layer/p2p/src/services/liveness/mod.rs b/base_layer/p2p/src/services/liveness/mod.rs index 53031bbe7a..e7842002db 100644 --- a/base_layer/p2p/src/services/liveness/mod.rs +++ b/base_layer/p2p/src/services/liveness/mod.rs @@ -52,6 +52,7 @@ pub use handle::{ mod message; mod service; +pub use service::MAX_INFLIGHT_TTL; mod state; pub use state::Metadata; @@ -87,7 +88,7 @@ const LOG_TARGET: &str = "p2p::services::liveness"; /// Initializer for the Liveness service handle and service future. pub struct LivenessInitializer { - config: Option, + pub(crate) config: Option, inbound_message_subscription_factory: Arc>>, } diff --git a/base_layer/p2p/src/services/liveness/service.rs b/base_layer/p2p/src/services/liveness/service.rs index e7407b4190..1be0bb0015 100644 --- a/base_layer/p2p/src/services/liveness/service.rs +++ b/base_layer/p2p/src/services/liveness/service.rs @@ -20,7 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{iter, sync::Arc, time::Instant}; +use std::{ + iter, + sync::Arc, + time::{Duration, Instant}, +}; use futures::{future::Either, pin_mut, stream::StreamExt, Stream}; use log::*; @@ -55,6 +59,8 @@ use crate::{ tari_message::TariMessageType, }; +pub const MAX_INFLIGHT_TTL: Duration = Duration::from_secs(30); + /// Service responsible for testing Liveness of Peers. pub struct LivenessService { config: LivenessConfig, @@ -131,9 +137,7 @@ where warn!(target: LOG_TARGET, "Error when pinging peers: {}", err); } if self.config.max_allowed_ping_failures > 0 { - if let Err(err) = self.disconnect_failed_peers().await { - error!(target: LOG_TARGET, "Error occurred while disconnecting failed peers: {}", err); - } + self.disconnect_failed_peers().await; } }, @@ -184,7 +188,7 @@ where self.send_pong(ping_pong_msg.nonce, public_key).await?; self.state.inc_pongs_sent(); - debug!( + trace!( target: LOG_TARGET, "Received ping from peer '{}' with useragent '{}' (Trace: {})", node_id.short_str(), @@ -192,7 +196,7 @@ where message_tag, ); - let ping_event = PingPongEvent::new(node_id, None, ping_pong_msg.metadata.into()); + let ping_event = PingPongEvent::new(node_id, None, ping_pong_msg.metadata.into(), ping_pong_msg.nonce); self.publish_event(LivenessEvent::ReceivedPing(Box::new(ping_event))); }, PingPong::Pong => { @@ -208,7 +212,7 @@ where } let maybe_latency = self.state.record_pong(ping_pong_msg.nonce, &node_id); - debug!( + trace!( target: LOG_TARGET, "Received pong from peer '{}' with useragent '{}'. {} (Trace: {})", node_id.short_str(), @@ -219,7 +223,12 @@ where message_tag, ); - let pong_event = PingPongEvent::new(node_id.clone(), maybe_latency, ping_pong_msg.metadata.into()); + let pong_event = PingPongEvent::new( + node_id.clone(), + maybe_latency, + ping_pong_msg.metadata.into(), + ping_pong_msg.nonce, + ); self.publish_event(LivenessEvent::ReceivedPong(Box::new(pong_event))); if let Some(address) = source_peer.last_address_used() { @@ -232,9 +241,14 @@ where Ok(()) } - async fn send_ping(&mut self, node_id: NodeId) -> Result<(), LivenessError> { + async fn send_ping(&mut self, node_id: NodeId) -> Result { let msg = PingPongMessage::ping_with_metadata(self.state.metadata().clone()); - self.state.add_inflight_ping(msg.nonce, node_id.clone()); + let nonce = msg.nonce; + self.state.add_inflight_ping( + nonce, + node_id.clone(), + self.config.auto_ping_interval.unwrap_or(MAX_INFLIGHT_TTL), + ); debug!(target: LOG_TARGET, "Sending ping to peer '{}'", node_id.short_str(),); self.outbound_messaging @@ -246,7 +260,7 @@ where .await .map_err(Into::::into)?; - Ok(()) + Ok(nonce) } async fn send_pong(&mut self, nonce: u64, dest: CommsPublicKey) -> Result<(), LivenessError> { @@ -267,9 +281,17 @@ where use LivenessRequest::*; match request { SendPing(node_id) => { - self.send_ping(node_id).await?; + let nonce = self.send_ping(node_id).await?; self.state.inc_pings_sent(); - Ok(LivenessResponse::Ok) + Ok(LivenessResponse::Ok(Some(vec![nonce]))) + }, + SendPings(node_ids) => { + let mut nonces = Vec::with_capacity(node_ids.len()); + for node_id in node_ids { + nonces.push(self.send_ping(node_id).await?); + self.state.inc_pings_sent(); + } + Ok(LivenessResponse::Ok(Some(nonces))) }, GetPingCount => { let ping_count = self.get_ping_count(); @@ -289,21 +311,21 @@ where }, SetMetadataEntry(key, value) => { self.state.set_metadata_entry(key, value); - Ok(LivenessResponse::Ok) + Ok(LivenessResponse::Ok(None)) }, AddMonitoredPeer(node_id) => { let node_id_exists = { self.monitored_peers.read().await.iter().any(|val| val == &node_id) }; if !node_id_exists { self.monitored_peers.write().await.push(node_id.clone()); } - Ok(LivenessResponse::Ok) + Ok(LivenessResponse::Ok(None)) }, RemoveMonitoredPeer(node_id) => { let node_id_exists = { self.monitored_peers.read().await.iter().position(|val| *val == node_id) }; if let Some(pos) = node_id_exists { self.monitored_peers.write().await.swap_remove(pos); } - Ok(LivenessResponse::Ok) + Ok(LivenessResponse::Ok(None)) }, } } @@ -333,7 +355,11 @@ where for peer in selected_peers { let msg = PingPongMessage::ping_with_metadata(self.state.metadata().clone()); - self.state.add_inflight_ping(msg.nonce, peer.clone()); + self.state.add_inflight_ping( + msg.nonce, + peer.clone(), + self.config.auto_ping_interval.unwrap_or(MAX_INFLIGHT_TTL), + ); self.outbound_messaging .send_direct_node_id( peer, @@ -348,24 +374,31 @@ where Ok(()) } - async fn disconnect_failed_peers(&mut self) -> Result<(), LivenessError> { + async fn disconnect_failed_peers(&mut self) { let max_allowed_ping_failures = self.config.max_allowed_ping_failures; + let mut node_ids = Vec::new(); for node_id in self .state .failed_pings_iter() .filter(|(_, n)| **n > max_allowed_ping_failures) .map(|(node_id, _)| node_id) { - if let Some(mut conn) = self.connectivity.get_connection(node_id.clone()).await? { + if let Ok(Some(mut conn)) = self.connectivity.get_connection(node_id.clone()).await { debug!( target: LOG_TARGET, "Disconnecting peer {} that failed {} rounds of pings", node_id, max_allowed_ping_failures ); - conn.disconnect(Minimized::No).await?; + match conn.disconnect(Minimized::No).await { + Ok(_) => { + node_ids.push(node_id.clone()); + }, + Err(err) => { + warn!(target: LOG_TARGET, "Failed to disconnect peer {} ({})", node_id, err); + }, + } } } - self.state.clear_failed_pings(); - Ok(()) + self.state.clear_failed_pings(&node_ids); } fn publish_event(&mut self, event: LivenessEvent) { @@ -613,6 +646,7 @@ mod test { state.add_inflight_ping( msg.inner.as_ref().map(|i| i.nonce).unwrap(), msg.source_peer.node_id.clone(), + MAX_INFLIGHT_TTL, ); // A stream which emits an inflight pong message and an unexpected one diff --git a/base_layer/p2p/src/services/liveness/state.rs b/base_layer/p2p/src/services/liveness/state.rs index 0c29d20d31..849f2a328c 100644 --- a/base_layer/p2p/src/services/liveness/state.rs +++ b/base_layer/p2p/src/services/liveness/state.rs @@ -33,7 +33,6 @@ use super::LOG_TARGET; use crate::proto::liveness::MetadataKey; const LATENCY_SAMPLE_WINDOW_SIZE: usize = 25; -const MAX_INFLIGHT_TTL: Duration = Duration::from_secs(40); /// Represents metadata in a ping/pong message. #[derive(Clone, Debug, Default, PartialEq, Eq)] @@ -136,17 +135,17 @@ impl LivenessState { } /// Adds a ping to the inflight ping list, while noting the current time that a ping was sent. - pub fn add_inflight_ping(&mut self, nonce: u64, node_id: NodeId) { + pub fn add_inflight_ping(&mut self, nonce: u64, node_id: NodeId, max_inflight_ttl: Duration) { self.inflight_pings.insert(nonce, (node_id, Instant::now())); - self.clear_stale_inflight_pings(); + self.clear_stale_inflight_pings(max_inflight_ttl); } /// Clears inflight ping requests which have not responded and adds them to failed_ping counter - fn clear_stale_inflight_pings(&mut self) { + fn clear_stale_inflight_pings(&mut self, max_inflight_ttl: Duration) { let (inflight, expired) = self .inflight_pings .drain() - .partition(|(_, (_, time))| time.elapsed() <= MAX_INFLIGHT_TTL); + .partition(|(_, (_, time))| time.elapsed() <= max_inflight_ttl); self.inflight_pings = inflight; @@ -221,8 +220,10 @@ impl LivenessState { self.failed_pings.iter() } - pub fn clear_failed_pings(&mut self) { - self.failed_pings.clear(); + pub fn clear_failed_pings(&mut self, node_ids: &[NodeId]) { + for node_id in node_ids { + self.failed_pings.remove(node_id); + } } } @@ -265,6 +266,7 @@ impl AverageLatency { #[cfg(test)] mod test { use super::*; + use crate::services::liveness::service::MAX_INFLIGHT_TTL; #[test] fn new() { @@ -322,7 +324,7 @@ mod test { let mut state = LivenessState::new(); let node_id = NodeId::default(); - state.add_inflight_ping(123, node_id.clone()); + state.add_inflight_ping(123, node_id.clone(), MAX_INFLIGHT_TTL); let latency = state.record_pong(123, &node_id).unwrap(); assert!(latency < Duration::from_millis(50)); @@ -340,10 +342,10 @@ mod test { let mut state = LivenessState::new(); let peer1 = NodeId::default(); - state.add_inflight_ping(1, peer1.clone()); + state.add_inflight_ping(1, peer1.clone(), MAX_INFLIGHT_TTL); let peer2 = NodeId::from_public_key(&Default::default()); - state.add_inflight_ping(2, peer2.clone()); - state.add_inflight_ping(3, peer2.clone()); + state.add_inflight_ping(2, peer2.clone(), MAX_INFLIGHT_TTL); + state.add_inflight_ping(3, peer2.clone(), MAX_INFLIGHT_TTL); assert!(!state.failed_pings.contains_key(&peer1)); assert!(!state.failed_pings.contains_key(&peer2)); @@ -354,7 +356,7 @@ mod test { *time = Instant::now() - (MAX_INFLIGHT_TTL + Duration::from_secs(1)); } - state.clear_stale_inflight_pings(); + state.clear_stale_inflight_pings(MAX_INFLIGHT_TTL); let n = state.failed_pings.get(&peer1).unwrap(); assert_eq!(*n, 1); let n = state.failed_pings.get(&peer2).unwrap(); diff --git a/base_layer/p2p/src/services/mod.rs b/base_layer/p2p/src/services/mod.rs index 95da571d62..2514d17730 100644 --- a/base_layer/p2p/src/services/mod.rs +++ b/base_layer/p2p/src/services/mod.rs @@ -21,4 +21,5 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. pub mod liveness; +pub mod monitor_peers; pub mod utils; diff --git a/base_layer/p2p/src/services/monitor_peers/mod.rs b/base_layer/p2p/src/services/monitor_peers/mod.rs new file mode 100644 index 0000000000..d09dd925ca --- /dev/null +++ b/base_layer/p2p/src/services/monitor_peers/mod.rs @@ -0,0 +1,90 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +mod service; + +use std::{cmp::max, time::Duration}; + +use log::debug; +use tari_comms::{async_trait, connectivity::ConnectivityRequester}; +use tari_service_framework::{ServiceInitializationError, ServiceInitializer, ServiceInitializerContext}; + +use crate::services::{ + liveness::{LivenessHandle, MAX_INFLIGHT_TTL}, + monitor_peers::service::MonitorPeersService, +}; + +const LOG_TARGET: &str = "p2p::services::monitor_peers"; + +/// Initializer for the MonitorPeers service handle and service future. +pub struct MonitorPeersInitializer { + auto_ping_interval: Option, +} + +impl MonitorPeersInitializer { + /// Create a new MonitorPeersInitializer from the inbound message subscriber + pub fn new(auto_ping_interval: Duration) -> Self { + Self { + auto_ping_interval: Some(auto_ping_interval), + } + } +} + +impl Default for MonitorPeersInitializer { + fn default() -> Self { + Self { + auto_ping_interval: Some(MAX_INFLIGHT_TTL), + } + } +} + +#[async_trait] +impl ServiceInitializer for MonitorPeersInitializer { + async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> { + debug!(target: LOG_TARGET, "Initializing Peer Monitoring Service"); + + let auto_ping_interval = max( + self.auto_ping_interval + .take() + .expect("Monitor peers service initialized more than once."), + MAX_INFLIGHT_TTL, + ); + + // Spawn the MonitorPeers service on the executor + context.spawn_when_ready(move |handles| async move { + let liveness = handles.expect_handle::(); + let connectivity = handles.expect_handle::(); + + let service = MonitorPeersService::new( + connectivity, + liveness, + handles.get_shutdown_signal(), + auto_ping_interval, + ); + service.run().await; + debug!(target: LOG_TARGET, "Monitor peers service has shut down"); + }); + + debug!(target: LOG_TARGET, "Monitor peers service initialized"); + Ok(()) + } +} diff --git a/base_layer/p2p/src/services/monitor_peers/service.rs b/base_layer/p2p/src/services/monitor_peers/service.rs new file mode 100644 index 0000000000..fceb157458 --- /dev/null +++ b/base_layer/p2p/src/services/monitor_peers/service.rs @@ -0,0 +1,324 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::collections::{HashMap, VecDeque}; + +use futures::pin_mut; +use log::*; +use tari_comms::{ + connection_manager::ConnectionDirection, + connectivity::ConnectivityRequester, + peer_manager::NodeId, + Minimized, + PeerConnection, +}; +use tari_shutdown::ShutdownSignal; +use tokio::{ + sync::broadcast::error::RecvError, + time::{self, Duration}, +}; + +use crate::services::{ + liveness::{LivenessEvent, LivenessHandle}, + monitor_peers::LOG_TARGET, +}; + +struct PeerLiveness { + vec: VecDeque, +} + +impl PeerLiveness { + pub fn new() -> Self { + Self { + vec: VecDeque::with_capacity(MAX_SIZE), + } + } + + pub fn push_pop(&mut self, item: T) { + if self.vec.len() == MAX_SIZE { + self.vec.pop_front(); + } + self.vec.push_back(item); + } + + pub fn iter(&self) -> std::collections::vec_deque::Iter { + self.vec.iter() + } +} + +struct Stats { + connected: bool, + responsive: bool, + loop_count: u64, +} + +struct PeerPingPong { + expected_nonce: u64, + received_nonce: Option, + node_id: NodeId, +} + +pub struct MonitorPeersService { + comms: ConnectivityRequester, + liveness_handle: LivenessHandle, + shutdown_signal: ShutdownSignal, + auto_ping_interval: Duration, +} + +impl MonitorPeersService { + pub fn new( + comms: ConnectivityRequester, + liveness_handle: LivenessHandle, + shutdown_signal: ShutdownSignal, + auto_ping_interval: Duration, + ) -> Self { + Self { + comms, + liveness_handle, + shutdown_signal, + auto_ping_interval, + } + } + + /// Monitor the liveness of outbound peer connections and disconnect those that do not respond to pings + /// consecutively. The intent of the interval timer is to be significantly longer than the rate at which + /// metadata is requested from peers. + #[allow(clippy::too_many_lines)] + pub async fn run(mut self) { + let mut interval_timer = time::interval(self.auto_ping_interval * 10); + let liveness_events = self.liveness_handle.get_event_stream(); + pin_mut!(liveness_events); + + let mut peer_liveness_stats: HashMap> = HashMap::new(); + + let mut loop_count = 0u64; + loop { + loop_count += 1; + tokio::select! { + biased; + _ = self.shutdown_signal.wait() => { + break; + } + + _ = interval_timer.tick() => { + trace!(target: LOG_TARGET, "Starting monitor peers round (iter {})", loop_count); + let active_connections = match self.comms.get_active_connections().await { + Ok(val) => val, + Err(e) => { + warn!(target: LOG_TARGET, "Failed to get active connections ({})", e); + continue; + }, + }; + let mut active_peer_connections = active_connections + .iter() + .filter(|p|p.peer_features().is_node() && p.direction() == ConnectionDirection::Outbound) + .cloned() + .collect::>(); + if active_peer_connections.is_empty() { + trace!(target: LOG_TARGET, "No active connections found"); + continue; + } + let active_peer_node_ids = active_peer_connections + .iter() + .map(|p|p.peer_node_id().clone()) + .collect::>(); + + let known_peer_connections = peer_liveness_stats.keys().cloned().collect::>(); + for peer_id in &known_peer_connections { + if !active_peer_node_ids.contains(peer_id) { + // Prior connections not connected now are considered inactive and unresponsive + peer_liveness_stats + .entry(peer_id.clone()) + .and_modify(|item| item.push_pop( + Stats {connected: false, responsive: false, loop_count} + )); + } + } + for peer_id in &active_peer_node_ids { + if !known_peer_connections.contains(peer_id) { + // New connections are considered active and responsive + peer_liveness_stats.insert( peer_id.clone(), PeerLiveness::new()); + } + } + + let mut peer_ping_pongs = match self.liveness_handle + .send_pings(active_peer_node_ids.clone()) + .await + { + Ok(nonces) => active_peer_node_ids + .iter() + .zip(nonces.iter()) + .map(|(node_id, &nonce)| PeerPingPong { + expected_nonce: nonce, + received_nonce: None, + node_id: node_id.clone(), + }) + .collect::>(), + Err(e) => { + warn!(target: LOG_TARGET, "Failed to send pings to peers ({})", e); + continue; + }, + }; + + // Only listen for the expected pongs from the peers (ignore any other pongs) + let timeout_timer = time::sleep(self.auto_ping_interval); + tokio::pin!(timeout_timer); + loop { + tokio::select! { + biased; + _ = self.shutdown_signal.wait() => { + break; + } + + event = liveness_events.recv() => { + let event_str = format!("{:?}", event); + match event { + Ok(arc_event) => { + if let LivenessEvent::ReceivedPong(pong) = &*arc_event { + if let Some(ping_pong) = peer_ping_pongs.iter_mut().find(|p| p.expected_nonce == pong.nonce) { + ping_pong.received_nonce = Some(pong.nonce); + } + if peer_ping_pongs.iter().all(|p| p.received_nonce.is_some()) { + break; + } + } + }, + Err(RecvError::Closed) => { + return; + }, + Err(ref e) => { + debug!( + target: LOG_TARGET, + "Liveness event error: {:?} ({})", + event_str, e.to_string() + ); + }, + } + }, + + _ = &mut timeout_timer => { + trace!( + target: LOG_TARGET, + "Timed out waiting for pongs, received {} of {} (iter {})", + peer_ping_pongs.iter().filter(|p| p.received_nonce.is_some()).count(), + peer_ping_pongs.len(), + loop_count + ); + break; + }, + } + } + + // Compare nonces and close connections for peers that did not respond multiple times + update_stats_and_cull_unresponsive_connections( + &peer_ping_pongs, + &mut active_peer_connections, + &mut peer_liveness_stats, + loop_count + ).await; + }, + } + } + } +} + +async fn update_stats_and_cull_unresponsive_connections( + peer_ping_pongs: &[PeerPingPong], + active_peer_connections: &mut [PeerConnection], + peer_liveness_stats: &mut HashMap>, + loop_count: u64, +) { + let received_nonces_count = peer_ping_pongs.iter().filter(|p| p.received_nonce.is_some()).count(); + if received_nonces_count != peer_ping_pongs.len() { + trace!( + target: LOG_TARGET, + "Found {} of {} outbound base node peer connections that did not respond to pings", + peer_ping_pongs.len().saturating_sub(received_nonces_count), active_peer_connections.len() + ); + } + + let mut disconnect_peers = Vec::new(); + for &mut ref peer in active_peer_connections.iter_mut() { + if let Some(ping_pong) = peer_ping_pongs.iter().find(|p| &p.node_id == peer.peer_node_id()) { + if ping_pong.received_nonce.is_some() { + peer_liveness_stats + .entry(peer.peer_node_id().clone()) + .and_modify(|item| { + item.push_pop(Stats { + connected: true, + responsive: true, + loop_count, + }) + }); + } else { + peer_liveness_stats + .entry(peer.peer_node_id().clone()) + .and_modify(|item| { + item.push_pop(Stats { + connected: true, + responsive: false, + loop_count, + }) + }); + if let Some(stats) = peer_liveness_stats.get(peer.peer_node_id()) { + // Evaluate the last 3 entries in the stats + if stats + .iter() + .rev() + .take(3) + .filter(|s| s.connected && !s.responsive) + .count() >= + 3 + { + disconnect_peers.push(peer.clone()); + } else { + trace!( + target: LOG_TARGET, + "Peer {} stats - (iter, conn, resp) {:?}", + peer.peer_node_id(), + stats.iter().map(|s|(s.loop_count, s.connected, s.responsive)).collect::>(), + ); + } + } + } + } + } + + for peer in disconnect_peers { + if let Some(stats) = peer_liveness_stats.get(peer.peer_node_id()) { + debug!( + target: LOG_TARGET, + "Disconnecting {} as the peer is no longer responsive - (iter, conn, resp) {:?}", + peer.peer_node_id(), + stats.iter().map(|s|(s.loop_count, s.connected, s.responsive)).collect::>(), + ); + if let Err(e) = peer.clone().disconnect(Minimized::No).await { + warn!( + target: LOG_TARGET, + "Error while attempting to disconnect peer {}: {}", peer.peer_node_id(), e + ); + } + peer_liveness_stats.remove(peer.peer_node_id()); + trace!(target: LOG_TARGET, "Disconnected {} (iter, {})", peer.peer_node_id(), loop_count); + } + } +}