From 57b06465bd1a4cefa31bff562a2a3797b34ad4cf Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Wed, 17 Apr 2024 07:41:59 +0200 Subject: [PATCH 1/5] Fix peer db stats - Fixed peer db stats to only update if the specific public address was successfully connected to. This alleviates issues where the node would try to connect to a peer's public address that has a quality score > 0 where it should be 0. - Updated stats with ping-pong. - Added a peer sync delay config option to let a preferred connection get a head start in trying to connect. - Discarded all peer identity claims without valid signatures immediately when they are received. - Reduced the minimum number of sync peers a wallet wants to connect to from 50 to 8. --- .../minotari_app_grpc/proto/network.proto | 6 +- .../minotari_app_grpc/src/conversions/peer.rs | 5 +- .../src/commands/command/get_peer.rs | 2 +- base_layer/p2p/src/services/liveness/error.rs | 9 +- base_layer/p2p/src/services/liveness/mod.rs | 4 +- .../p2p/src/services/liveness/service.rs | 42 ++++++++- common/config/presets/c_base_node_c.toml | 6 ++ common/config/presets/d_console_wallet.toml | 8 +- comms/core/src/connection_manager/common.rs | 19 ++-- comms/core/src/connection_manager/listener.rs | 4 +- comms/core/src/lib.rs | 1 - .../src/net_address/multiaddr_with_stats.rs | 86 ++++++++++++------- .../net_address/mutliaddresses_with_stats.rs | 46 +++++++--- comms/core/src/peer_manager/manager.rs | 41 +++++---- comms/core/src/peer_manager/peer.rs | 9 ++ comms/dht/src/connectivity/mod.rs | 44 +++++++--- comms/dht/src/discovery/service.rs | 1 + comms/dht/src/inbound/dht_handler/task.rs | 1 + comms/dht/src/network_discovery/config.rs | 6 ++ comms/dht/src/network_discovery/on_connect.rs | 2 +- comms/dht/src/peer_validator.rs | 27 +++++- comms/dht/src/rpc/service.rs | 22 ++++- 22 files changed, 293 insertions(+), 98 deletions(-) diff --git a/applications/minotari_app_grpc/proto/network.proto b/applications/minotari_app_grpc/proto/network.proto index 62f6c57cf8..fba0614749 100644 --- a/applications/minotari_app_grpc/proto/network.proto +++ b/applications/minotari_app_grpc/proto/network.proto @@ -70,7 +70,11 @@ message Address{ bytes address =1; string last_seen = 2; uint32 connection_attempts = 3; - uint64 avg_latency = 5; + AverageLatency avg_latency = 5; +} + +message AverageLatency { + uint64 latency = 1; } message ListConnectedPeersResponse { diff --git a/applications/minotari_app_grpc/src/conversions/peer.rs b/applications/minotari_app_grpc/src/conversions/peer.rs index 5329e32e0f..8523ec6575 100644 --- a/applications/minotari_app_grpc/src/conversions/peer.rs +++ b/applications/minotari_app_grpc/src/conversions/peer.rs @@ -72,7 +72,10 @@ impl From for grpc::Address { None => String::new(), }; let connection_attempts = address_with_stats.connection_attempts(); - let avg_latency = address_with_stats.avg_latency().as_secs(); + let avg_latency = address_with_stats + .avg_latency() + .map(|val| grpc::AverageLatency { latency: val.as_secs() }); + Self { address, last_seen, diff --git a/applications/minotari_node/src/commands/command/get_peer.rs b/applications/minotari_node/src/commands/command/get_peer.rs index f9242244f9..391f75c0a7 100644 --- a/applications/minotari_node/src/commands/command/get_peer.rs +++ b/applications/minotari_node/src/commands/command/get_peer.rs @@ -88,7 +88,7 @@ impl CommandContext { println!("Addresses:"); peer.addresses.addresses().iter().for_each(|a| { println!( - "- {} Score: {} - Source: {} Latency: {:?} - Last Seen: {} - Last Failure:{}", + "- {} Score: {:?} - Source: {} Latency: {:?} - Last Seen: {} - Last Failure:{}", a.address(), a.quality_score(), a.source(), diff --git a/base_layer/p2p/src/services/liveness/error.rs b/base_layer/p2p/src/services/liveness/error.rs index 4b6973ef14..07e7a99547 100644 --- a/base_layer/p2p/src/services/liveness/error.rs +++ b/base_layer/p2p/src/services/liveness/error.rs @@ -20,7 +20,12 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use tari_comms::{connectivity::ConnectivityError, message::MessageError, PeerConnectionError}; +use tari_comms::{ + connectivity::ConnectivityError, + message::MessageError, + peer_manager::PeerManagerError, + PeerConnectionError, +}; use tari_comms_dht::{outbound::DhtOutboundError, DhtActorError}; use tari_service_framework::reply_channel::TransportChannelError; use thiserror::Error; @@ -53,4 +58,6 @@ pub enum LivenessError { NodeIdDoesNotExist, #[error("PingPongDecodeError: {0}")] PingPongDecodeError(#[from] prost::DecodeError), + #[error("Peer not found: `{0}`")] + PeerNotFoundError(#[from] PeerManagerError), } diff --git a/base_layer/p2p/src/services/liveness/mod.rs b/base_layer/p2p/src/services/liveness/mod.rs index 4faa04e9c2..53031bbe7a 100644 --- a/base_layer/p2p/src/services/liveness/mod.rs +++ b/base_layer/p2p/src/services/liveness/mod.rs @@ -63,7 +63,7 @@ use std::sync::Arc; use futures::{Stream, StreamExt}; use log::*; -use tari_comms::connectivity::ConnectivityRequester; +use tari_comms::{connectivity::ConnectivityRequester, PeerManager}; use tari_comms_dht::Dht; use tari_service_framework::{ async_trait, @@ -136,6 +136,7 @@ impl ServiceInitializer for LivenessInitializer { let dht = handles.expect_handle::(); let connectivity = handles.expect_handle::(); let outbound_messages = dht.outbound_requester(); + let peer_manager = handles.expect_handle::>(); let service = LivenessService::new( config, @@ -146,6 +147,7 @@ impl ServiceInitializer for LivenessInitializer { outbound_messages, publisher, handles.get_shutdown_signal(), + peer_manager, ); service.run().await; debug!(target: LOG_TARGET, "Liveness service has shut down"); diff --git a/base_layer/p2p/src/services/liveness/service.rs b/base_layer/p2p/src/services/liveness/service.rs index 29724b946a..dc4797b60a 100644 --- a/base_layer/p2p/src/services/liveness/service.rs +++ b/base_layer/p2p/src/services/liveness/service.rs @@ -28,6 +28,7 @@ use tari_comms::{ connectivity::{ConnectivityRequester, ConnectivitySelection}, peer_manager::NodeId, types::CommsPublicKey, + PeerManager, }; use tari_comms_dht::{ domain_message::OutboundDomainMessage, @@ -64,6 +65,7 @@ pub struct LivenessService { event_publisher: LivenessEventSender, shutdown_signal: ShutdownSignal, monitored_peers: Arc>>, + peer_manager: Arc, } impl LivenessService @@ -80,6 +82,7 @@ where outbound_messaging: OutboundMessageRequester, event_publisher: LivenessEventSender, shutdown_signal: ShutdownSignal, + peer_manager: Arc, ) -> Self { Self { request_rx: Some(request_rx), @@ -91,6 +94,7 @@ where shutdown_signal, config: config.clone(), monitored_peers: Arc::new(RwLock::new(config.monitored_peers)), + peer_manager, } } @@ -157,8 +161,8 @@ where inner: ping_pong_msg, .. } = msg; - let node_id = source_peer.node_id; - let public_key = source_peer.public_key; + let node_id = source_peer.node_id.clone(); + let public_key = source_peer.public_key.clone(); let message_tag = dht_header.message_tag; let ping_pong_msg = match ping_pong_msg { Ok(p) => p, @@ -214,8 +218,14 @@ where message_tag, ); - let pong_event = PingPongEvent::new(node_id, maybe_latency, ping_pong_msg.metadata.into()); + let pong_event = PingPongEvent::new(node_id.clone(), maybe_latency, ping_pong_msg.metadata.into()); self.publish_event(LivenessEvent::ReceivedPong(Box::new(pong_event))); + + if let Some(address) = source_peer.last_address_used() { + self.peer_manager + .update_peer_address_latency_and_last_seen(&public_key, &address, maybe_latency) + .await?; + } }, } Ok(()) @@ -386,6 +396,7 @@ mod test { net_address::MultiaddressesWithStats, peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags}, test_utils::mocks::create_connectivity_mock, + types::CommsDatabase, }; use tari_comms_dht::{ envelope::{DhtMessageHeader, DhtMessageType}, @@ -395,6 +406,8 @@ mod test { use tari_crypto::keys::PublicKey; use tari_service_framework::reply_channel; use tari_shutdown::Shutdown; + use tari_storage::lmdb_store::{LMDBBuilder, LMDBConfig}; + use tari_test_utils::{paths::create_temporary_data_path, random}; use tokio::{ sync::{broadcast, mpsc, oneshot}, task, @@ -406,6 +419,24 @@ mod test { services::liveness::{handle::LivenessHandle, state::Metadata}, }; + pub fn build_peer_manager() -> Arc { + let database_name = random::string(8); + let path = create_temporary_data_path(); + let datastore = LMDBBuilder::new() + .set_path(path.to_str().unwrap()) + .set_env_config(LMDBConfig::default()) + .set_max_number_of_databases(1) + .add_database(&database_name, lmdb_zero::db::CREATE) + .build() + .unwrap(); + + let peer_database = datastore.get_handle(&database_name).unwrap(); + + PeerManager::new(CommsDatabase::new(Arc::new(peer_database)), None) + .map(Arc::new) + .unwrap() + } + #[tokio::test] async fn get_ping_pong_count() { let mut state = LivenessState::new(); @@ -436,6 +467,7 @@ mod test { outbound_messaging, publisher, shutdown.to_signal(), + build_peer_manager(), ); // Run the service @@ -471,6 +503,7 @@ mod test { outbound_messaging, publisher, shutdown.to_signal(), + build_peer_manager(), ); // Run the LivenessService @@ -544,6 +577,7 @@ mod test { let (publisher, _) = broadcast::channel(200); let shutdown = Shutdown::new(); + let service = LivenessService::new( Default::default(), stream::empty(), @@ -553,6 +587,7 @@ mod test { outbound_messaging, publisher, shutdown.to_signal(), + build_peer_manager(), ); task::spawn(service.run()); @@ -595,6 +630,7 @@ mod test { outbound_messaging, publisher.clone(), shutdown.to_signal(), + build_peer_manager(), ); task::spawn(service.run()); diff --git a/common/config/presets/c_base_node_c.toml b/common/config/presets/c_base_node_c.toml index 259caa461c..318b594e9a 100644 --- a/common/config/presets/c_base_node_c.toml +++ b/common/config/presets/c_base_node_c.toml @@ -288,6 +288,12 @@ database_url = "data/base_node/dht.db" # The maximum number of sync peer to select for each round. The selection strategy varies depending on the current state. # Default: 5 #network_discovery.max_sync_peers = 5 +# The maximum number of peers we allow per round of sync. (Default: 500) +#max_peers_to_sync_per_round = 500 +# Initial refresh sync peers delay period if more than one peer needs to be synced, handy when a wallet needs to +# give preference to its base node peer connection. (Default: 15) +#[serde(with = "serializers::seconds")] +#initial_peer_sync_dalay = 15 # Length of time to ban a peer if the peer misbehaves at the DHT-level. Default: 6 hrs #ban_duration = 21_600 # 6 * 60 * 60 diff --git a/common/config/presets/d_console_wallet.toml b/common/config/presets/d_console_wallet.toml index d54d355e8d..d5870d665b 100644 --- a/common/config/presets/d_console_wallet.toml +++ b/common/config/presets/d_console_wallet.toml @@ -323,7 +323,7 @@ database_url = "data/wallet/dht.db" #network_discovery.enabled = true # A threshold for the minimum number of peers this node should ideally be aware of. If below this threshold a # more "aggressive" strategy is employed. Default: 50 -#network_discovery.min_desired_peers = 50 +network_discovery.min_desired_peers = 8 # The period to wait once the number of rounds given by `idle_after_num_rounds` has completed. Default: 30 mins #network_discovery.idle_period = 1_800 # 30 * 60 # The minimum number of network discovery rounds to perform before idling (going to sleep). If there are less @@ -334,6 +334,12 @@ database_url = "data/wallet/dht.db" # The maximum number of sync peer to select for each round. The selection strategy varies depending on the current state. # Default: 5 #network_discovery.max_sync_peers = 5 +# The maximum number of peers we allow per round of sync. (Default: 500) +#max_peers_to_sync_per_round = 500 +# Initial refresh sync peers delay period if more than one peer needs to be synced, handy when a wallet needs to +# give preference to its base node peer connection. (Default: 15) +#[serde(with = "serializers::seconds")] +#initial_peer_sync_dalay = 15 # Length of time to ban a peer if the peer misbehaves at the DHT-level. Default: 6 hrs #ban_duration = 21_600 # 6 * 60 * 60 diff --git a/comms/core/src/connection_manager/common.rs b/comms/core/src/connection_manager/common.rs index eedcb607a2..0a95a51d8f 100644 --- a/comms/core/src/connection_manager/common.rs +++ b/comms/core/src/connection_manager/common.rs @@ -190,6 +190,7 @@ pub(super) fn create_or_update_peer_from_validated_peer_identity( known_peer: Option, authenticated_public_key: CommsPublicKey, peer_identity: &ValidatedPeerIdentityExchange, + latency: Duration, ) -> Peer { let peer_node_id = NodeId::from_public_key(&authenticated_public_key); @@ -206,8 +207,9 @@ pub(super) fn create_or_update_peer_from_validated_peer_identity( peer_identity_claim: peer_identity.claim.clone(), }); + // For inbound connections we cannot distinguish between the peer's addresses, so we mark all as seen peer.addresses - .mark_all_addresses_as_last_seen_now(&peer_identity.claim.addresses); + .mark_all_addresses_as_last_seen_now_with_latency(&peer_identity.claim.addresses, latency); peer.features = peer_identity.claim.features; peer.supported_protocols = peer_identity.metadata.supported_protocols.clone(); @@ -221,15 +223,18 @@ pub(super) fn create_or_update_peer_from_validated_peer_identity( "Peer '{}' does not exist in peer list. Adding.", peer_node_id.short_str() ); + let mut addresses = MultiaddressesWithStats::from_addresses_with_source( + peer_identity.claim.addresses.clone(), + &PeerAddressSource::FromPeerConnection { + peer_identity_claim: peer_identity.claim.clone(), + }, + ); + // For inbound connections we cannot distinguish between the peer's addresses, so we mark all as seen + addresses.mark_all_addresses_as_last_seen_now_with_latency(&peer_identity.claim.addresses, latency); Peer::new( authenticated_public_key, peer_node_id, - MultiaddressesWithStats::from_addresses_with_source( - peer_identity.claim.addresses.clone(), - &PeerAddressSource::FromPeerConnection { - peer_identity_claim: peer_identity.claim.clone(), - }, - ), + addresses, PeerFlags::empty(), peer_identity.peer_features(), peer_identity.supported_protocols().to_vec(), diff --git a/comms/core/src/connection_manager/listener.rs b/comms/core/src/connection_manager/listener.rs index c777742ea0..18c1170f49 100644 --- a/comms/core/src/connection_manager/listener.rs +++ b/comms/core/src/connection_manager/listener.rs @@ -359,11 +359,12 @@ where let authenticated_public_key = noise_socket .get_remote_public_key() .ok_or(ConnectionManagerError::InvalidStaticPublicKey)?; + let latency = timer.elapsed(); debug!( target: LOG_TARGET, "Noise socket upgrade completed in {:.2?} with public key '{}'", - timer.elapsed(), + latency, authenticated_public_key ); @@ -399,6 +400,7 @@ where known_peer, authenticated_public_key, &valid_peer_identity, + latency, ); let muxer = Yamux::upgrade_connection(noise_socket, CONNECTION_DIRECTION) diff --git a/comms/core/src/lib.rs b/comms/core/src/lib.rs index abdbb0e849..b9c493a09a 100644 --- a/comms/core/src/lib.rs +++ b/comms/core/src/lib.rs @@ -21,7 +21,6 @@ pub mod connectivity; pub mod peer_manager; pub use peer_manager::{NodeIdentity, OrNotFound, PeerManager}; - pub mod framing; mod multiplexing; diff --git a/comms/core/src/net_address/multiaddr_with_stats.rs b/comms/core/src/net_address/multiaddr_with_stats.rs index b357168fd6..b71e39ecfe 100644 --- a/comms/core/src/net_address/multiaddr_with_stats.rs +++ b/comms/core/src/net_address/multiaddr_with_stats.rs @@ -12,27 +12,29 @@ use std::{ }; use chrono::{NaiveDateTime, Utc}; +use log::trace; use multiaddr::Multiaddr; use serde::{Deserialize, Serialize}; use crate::{peer_manager::PeerIdentityClaim, types::CommsPublicKey}; +const LOG_TARGET: &str = "comms::net_address::multiaddr_with_stats"; + const MAX_LATENCY_SAMPLE_COUNT: u32 = 100; const MAX_INITIAL_DIAL_TIME_SAMPLE_COUNT: u32 = 100; -const HIGH_QUALITY_SCORE: i32 = 1000; #[derive(Debug, Eq, Clone, Deserialize, Serialize)] pub struct MultiaddrWithStats { address: Multiaddr, last_seen: Option, connection_attempts: u32, - avg_initial_dial_time: Duration, + avg_initial_dial_time: Option, initial_dial_time_sample_count: u32, - avg_latency: Duration, + avg_latency: Option, latency_sample_count: u32, last_attempted: Option, last_failed_reason: Option, - quality_score: i32, + quality_score: Option, source: PeerAddressSource, } @@ -43,13 +45,13 @@ impl MultiaddrWithStats { address, last_seen: None, connection_attempts: 0, - avg_initial_dial_time: Duration::from_secs(0), + avg_initial_dial_time: None, initial_dial_time_sample_count: 0, - avg_latency: Duration::from_millis(0), + avg_latency: None, latency_sample_count: 0, last_attempted: None, last_failed_reason: None, - quality_score: 0, + quality_score: None, source, }; addr.update_quality_score(); @@ -58,6 +60,15 @@ impl MultiaddrWithStats { pub fn merge(&mut self, other: &Self) { if self.address == other.address { + trace!( + target: LOG_TARGET, "merge: '{}, {:?}, {:?}' and '{}, {:?}, {:?}'", + self.address.to_string(), + self.last_seen, + self.quality_score, + other.address.to_string(), + other.last_seen, + other.quality_score + ); self.last_seen = cmp::max(other.last_seen, self.last_seen); self.connection_attempts = cmp::max(self.connection_attempts, other.connection_attempts); match self.latency_sample_count.cmp(&other.latency_sample_count) { @@ -122,9 +133,14 @@ impl MultiaddrWithStats { pub fn update_latency(&mut self, latency_measurement: Duration) { self.last_seen = Some(Utc::now().naive_utc()); - self.avg_latency = ((self.avg_latency.saturating_mul(self.latency_sample_count)) + self.avg_latency = Some( + ((self + .avg_latency + .unwrap_or_default() + .saturating_mul(self.latency_sample_count)) .saturating_add(latency_measurement)) / - (self.latency_sample_count + 1); + (self.latency_sample_count + 1), + ); if self.latency_sample_count < MAX_LATENCY_SAMPLE_COUNT { self.latency_sample_count += 1; } @@ -135,9 +151,11 @@ impl MultiaddrWithStats { pub fn update_initial_dial_time(&mut self, initial_dial_time: Duration) { self.last_seen = Some(Utc::now().naive_utc()); - self.avg_initial_dial_time = ((self.avg_initial_dial_time * self.initial_dial_time_sample_count) + - initial_dial_time) / - (self.initial_dial_time_sample_count + 1); + self.avg_initial_dial_time = Some( + ((self.avg_initial_dial_time.unwrap_or_default() * self.initial_dial_time_sample_count) + + initial_dial_time) / + (self.initial_dial_time_sample_count + 1), + ); if self.initial_dial_time_sample_count < MAX_INITIAL_DIAL_TIME_SAMPLE_COUNT { self.initial_dial_time_sample_count += 1; } @@ -146,6 +164,10 @@ impl MultiaddrWithStats { /// Mark that a successful interaction occurred with this address pub fn mark_last_seen_now(&mut self) -> &mut Self { + trace!( + target: LOG_TARGET, "mark_last_seen_now: from {}, address '{}', previous {:?}", + self.source, self.address.to_string(), self.last_seen + ); self.last_seen = Some(Utc::now().naive_utc()); self.last_failed_reason = None; self.reset_connection_attempts(); @@ -184,27 +206,27 @@ impl MultiaddrWithStats { self.clone().address } - fn calculate_quality_score(&self) -> i32 { + fn calculate_quality_score(&self) -> Option { // If we have never seen or attempted the peer, we start with a high score to ensure that if self.last_seen.is_none() && self.last_attempted.is_none() { - return HIGH_QUALITY_SCORE; + return None; } let mut score_self = 0; - if self.avg_latency.as_millis() == 0 { - score_self += 100; - } else { + if let Some(val) = self.avg_latency { // explicitly truncate the latency to avoid casting problems - let avg_latency_millis = i32::try_from(self.avg_latency.as_millis()).unwrap_or(i32::MAX); + let avg_latency_millis = i32::try_from(val.as_millis()).unwrap_or(i32::MAX); score_self += cmp::max(0, 100i32.saturating_sub(avg_latency_millis / 100)); + } else { + score_self += 100; } let last_seen_seconds: i32 = self .last_seen .map(|x| Utc::now().naive_utc() - x) .map(|x| x.num_seconds()) - .unwrap_or(0) + .unwrap_or(i64::MAX / 2) .try_into() .unwrap_or(i32::MAX); score_self += cmp::max(0, 100i32.saturating_sub(last_seen_seconds)); @@ -213,7 +235,7 @@ impl MultiaddrWithStats { score_self -= 100; } - score_self + Some(score_self) } fn update_quality_score(&mut self) { @@ -232,7 +254,7 @@ impl MultiaddrWithStats { self.connection_attempts } - pub fn avg_initial_dial_time(&self) -> Duration { + pub fn avg_initial_dial_time(&self) -> Option { self.avg_initial_dial_time } @@ -240,7 +262,7 @@ impl MultiaddrWithStats { self.initial_dial_time_sample_count } - pub fn avg_latency(&self) -> Duration { + pub fn avg_latency(&self) -> Option { self.avg_latency } @@ -256,7 +278,7 @@ impl MultiaddrWithStats { self.last_failed_reason.as_deref() } - pub fn quality_score(&self) -> i32 { + pub fn quality_score(&self) -> Option { self.quality_score } } @@ -388,13 +410,13 @@ mod test { let latency_measurement3 = Duration::from_millis(60); let latency_measurement4 = Duration::from_millis(140); net_address_with_stats.update_latency(latency_measurement1); - assert_eq!(net_address_with_stats.avg_latency, latency_measurement1); + assert_eq!(net_address_with_stats.avg_latency.unwrap(), latency_measurement1); net_address_with_stats.update_latency(latency_measurement2); - assert_eq!(net_address_with_stats.avg_latency, Duration::from_millis(150)); + assert_eq!(net_address_with_stats.avg_latency.unwrap(), Duration::from_millis(150)); net_address_with_stats.update_latency(latency_measurement3); - assert_eq!(net_address_with_stats.avg_latency, Duration::from_millis(120)); + assert_eq!(net_address_with_stats.avg_latency.unwrap(), Duration::from_millis(120)); net_address_with_stats.update_latency(latency_measurement4); - assert_eq!(net_address_with_stats.avg_latency, Duration::from_millis(125)); + assert_eq!(net_address_with_stats.avg_latency.unwrap(), Duration::from_millis(125)); } #[test] @@ -425,16 +447,16 @@ mod test { fn test_calculate_quality_score() { let address = "/ip4/123.0.0.123/tcp/8000".parse().unwrap(); let mut address = MultiaddrWithStats::new(address, PeerAddressSource::Config); - assert_eq!(address.quality_score, 1000); + assert_eq!(address.quality_score, None); address.mark_last_seen_now(); - assert!(address.quality_score > 100); + assert!(address.quality_score.unwrap() > 100); address.mark_failed_connection_attempt("Testing".to_string()); - assert!(address.quality_score <= 100); + assert!(address.quality_score.unwrap() <= 100); let another_addr = "/ip4/1.0.0.1/tcp/8000".parse().unwrap(); let another_addr = MultiaddrWithStats::new(another_addr, PeerAddressSource::Config); - assert_eq!(another_addr.quality_score, 1000); + assert_eq!(another_addr.quality_score, None); - assert_eq!(another_addr.cmp(&address), Ordering::Greater); + assert_eq!(another_addr.cmp(&address), Ordering::Less); } } diff --git a/comms/core/src/net_address/mutliaddresses_with_stats.rs b/comms/core/src/net_address/mutliaddresses_with_stats.rs index 9d7c9d9e4b..5a2b01da82 100644 --- a/comms/core/src/net_address/mutliaddresses_with_stats.rs +++ b/comms/core/src/net_address/mutliaddresses_with_stats.rs @@ -9,10 +9,14 @@ use std::{ }; use chrono::NaiveDateTime; +use log::trace; use multiaddr::Multiaddr; use serde::{Deserialize, Serialize}; use crate::net_address::{multiaddr_with_stats::PeerAddressSource, MultiaddrWithStats}; + +const LOG_TARGET: &str = "comms::net_address::multiaddresses_with_stats"; + const MAX_ADDRESSES: usize = 10; /// This struct is used to store a set of different net addresses such as IPv4, IPv6, Tor or I2P for a single peer. @@ -178,15 +182,24 @@ impl MultiaddressesWithStats { } } - /// Mark all addresses as seen. Returns true if all addresses are contained in this instance, otherwise false - pub fn mark_all_addresses_as_last_seen_now(&mut self, addresses: &[Multiaddr]) -> bool { + /// Mark all addresses as seen with latency. Returns true if all addresses are contained in this instance, otherwise + /// false + pub fn mark_all_addresses_as_last_seen_now_with_latency( + &mut self, + addresses: &[Multiaddr], + latency_measurement: Duration, + ) -> bool { let mut all_exist = true; for address in addresses { match self.find_address_mut(address) { Some(addr) => { addr.mark_last_seen_now().mark_last_attempted_now(); + addr.update_latency(latency_measurement); + }, + None => { + trace!(target: LOG_TARGET, "Peer address '{}' not in claim, stats not updated", address); + all_exist = false }, - None => all_exist = false, } } self.sort_addresses(); @@ -204,7 +217,10 @@ impl MultiaddressesWithStats { self.sort_addresses(); true }, - None => false, + None => { + trace!(target: LOG_TARGET, "Peer address '{}' not in claim, stats not updated", address); + false + }, } } @@ -238,7 +254,8 @@ impl MultiaddressesWithStats { /// Sort the addresses with the greatest quality score first fn sort_addresses(&mut self) { - self.addresses.sort_by_key(|addr| cmp::Reverse(addr.quality_score())); + self.addresses + .sort_by_key(|addr| cmp::Reverse(addr.quality_score().unwrap_or_default())); self.addresses.truncate(MAX_ADDRESSES) } } @@ -354,15 +371,20 @@ mod test { .mark_last_attempted_now(); assert_eq!( net_addresses.find_address_mut(&net_address1).unwrap().quality_score(), - 200 + Some(200) ); - let other: MultiaddressesWithStats = MultiaddressesWithStats::from_addresses_with_source( + let address_12: MultiaddressesWithStats = MultiaddressesWithStats::from_addresses_with_source( vec![net_address12.clone()], &PeerAddressSource::Config, ); - net_addresses.merge(&other); - assert!(!net_addresses.contains(&net_address1)); - assert!(net_addresses.contains(&net_address12)); + net_addresses.merge(&address_12); + println!("net_address12: {:?}", address_12.addresses[0]); + println!( + "net_address1: {:?}", + net_addresses.find_address_mut(&net_address1).unwrap() + ); + assert!(net_addresses.contains(&net_address1)); + assert!(!net_addresses.contains(&net_address12)); } #[test] @@ -447,9 +469,9 @@ mod test { assert!(net_addresses.mark_failed_connection_attempt(&net_address3, "error".to_string())); assert!(net_addresses.mark_failed_connection_attempt(&net_address1, "error".to_string())); - assert_eq!(net_addresses.addresses[0].connection_attempts(), 1); + assert_eq!(net_addresses.addresses[0].connection_attempts(), 2); assert_eq!(net_addresses.addresses[1].connection_attempts(), 1); - assert_eq!(net_addresses.addresses[2].connection_attempts(), 2); + assert_eq!(net_addresses.addresses[2].connection_attempts(), 1); net_addresses.reset_connection_attempts(); assert_eq!(net_addresses.addresses[0].connection_attempts(), 0); assert_eq!(net_addresses.addresses[1].connection_attempts(), 0); diff --git a/comms/core/src/peer_manager/manager.rs b/comms/core/src/peer_manager/manager.rs index a383fdf437..c4c962c7d0 100644 --- a/comms/core/src/peer_manager/manager.rs +++ b/comms/core/src/peer_manager/manager.rs @@ -192,6 +192,26 @@ impl PeerManager { } } + pub async fn update_peer_address_latency_and_last_seen( + &self, + pubkey: &CommsPublicKey, + address: &Multiaddr, + latency: Option, + ) -> Result<(), PeerManagerError> { + match self.find_by_public_key(pubkey).await { + Ok(Some(mut peer)) => { + if let Some(val) = latency { + peer.addresses.update_latency(address, val); + } + peer.addresses.mark_last_seen_now(address); + self.add_peer(peer.clone()).await?; + Ok(()) + }, + Ok(None) => Err(PeerManagerError::PeerNotFoundError), + Err(err) => Err(err), + } + } + /// Get a peer matching the given node ID pub async fn direct_identity_node_id(&self, node_id: &NodeId) -> Result, PeerManagerError> { match self.peer_storage.read().await.direct_identity_node_id(node_id) { @@ -238,27 +258,6 @@ impl PeerManager { .closest_peers(node_id, n, excluded_peers, features) } - pub async fn mark_last_seen(&self, node_id: &NodeId, addr: &Multiaddr) -> Result<(), PeerManagerError> { - let mut lock = self.peer_storage.write().await; - let mut peer = lock - .find_by_node_id(node_id)? - .ok_or(PeerManagerError::PeerNotFoundError)?; - let source = peer - .addresses - .iter() - .find(|a| a.address() == addr) - .map(|a| a.source().clone()) - .ok_or_else(|| PeerManagerError::AddressNotFoundError { - address: addr.clone(), - node_id: node_id.clone(), - })?; - // if we have an address, update it - peer.addresses.add_address(addr, &source); - peer.addresses.mark_last_seen_now(addr); - lock.add_peer(peer)?; - Ok(()) - } - /// Fetch n random peers pub async fn random_peers(&self, n: usize, excluded: &[NodeId]) -> Result, PeerManagerError> { // Send to a random set of peers of size n that are Communication Nodes diff --git a/comms/core/src/peer_manager/peer.rs b/comms/core/src/peer_manager/peer.rs index 3ec13afe13..f4786d57ce 100644 --- a/comms/core/src/peer_manager/peer.rs +++ b/comms/core/src/peer_manager/peer.rs @@ -168,6 +168,15 @@ impl Peer { .and_then(|a| a.last_attempted()) } + /// The last address used to connect to the peer + pub fn last_address_used(&self) -> Option { + self.addresses + .addresses() + .iter() + .max_by_key(|a| a.last_attempted()) + .map(|a| a.address().clone()) + } + /// Returns true if the peer is marked as offline pub fn is_offline(&self) -> bool { self.addresses.offline_at().is_some() diff --git a/comms/dht/src/connectivity/mod.rs b/comms/dht/src/connectivity/mod.rs index eb2b3dbdc0..b7c7f94996 100644 --- a/comms/dht/src/connectivity/mod.rs +++ b/comms/dht/src/connectivity/mod.rs @@ -93,6 +93,7 @@ pub(crate) struct DhtConnectivity { metrics_collector: MetricsCollectorHandle, cooldown_in_effect: Option, shutdown_signal: ShutdownSignal, + first_round_completed: bool, } impl DhtConnectivity { @@ -121,6 +122,7 @@ impl DhtConnectivity { dht_events, cooldown_in_effect: None, shutdown_signal, + first_round_completed: false, } } @@ -289,7 +291,7 @@ impl DhtConnectivity { match event { DhtEvent::NetworkDiscoveryPeersAdded(info) => { if info.num_new_peers > 0 { - self.refresh_peer_pools().await?; + self.refresh_peer_pools_with_initial_delay().await?; } }, _ => {}, @@ -323,7 +325,8 @@ impl DhtConnectivity { Ok(()) } - async fn refresh_peer_pools(&mut self) -> Result<(), DhtConnectivityError> { + async fn refresh_peer_pools_with_initial_delay(&mut self) -> Result<(), DhtConnectivityError> { + self.handle_initial_delay().await; info!( target: LOG_TARGET, "Reinitializing neighbour pool. (size={})", @@ -336,6 +339,27 @@ impl DhtConnectivity { Ok(()) } + async fn request_many_dials_with_initial_delay>( + &mut self, + peers: I, + ) -> Result<(), DhtConnectivityError> { + self.handle_initial_delay().await; + self.connectivity.request_many_dials(peers).await?; + Ok(()) + } + + async fn handle_initial_delay(&mut self) { + if !self.first_round_completed { + self.first_round_completed = true; + tokio::time::sleep(self.config.network_discovery.initial_peer_sync_dalay).await; + debug!( + target: LOG_TARGET, + "Delayed {:.0?} before refreshing the neighbour pool for the first time", + self.config.network_discovery.initial_peer_sync_dalay + ); + } + } + async fn refresh_neighbour_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> { if self.num_connected_neighbours() < self.config.num_neighbouring_nodes { self.refresh_neighbour_pool().await?; @@ -400,7 +424,7 @@ impl DhtConnectivity { }); if !new_neighbours.is_empty() { - self.connectivity.request_many_dials(new_neighbours).await?; + self.request_many_dials_with_initial_delay(new_neighbours).await?; } self.redial_neighbours_as_required().await?; @@ -427,7 +451,7 @@ impl DhtConnectivity { "Redialling {} disconnected peer(s)", to_redial.len() ); - self.connectivity.request_many_dials(to_redial).await?; + self.request_many_dials_with_initial_delay(to_redial).await?; } Ok(()) @@ -482,7 +506,7 @@ impl DhtConnectivity { difference.iter().for_each(|peer| { self.remove_connection_handle(peer); }); - self.connectivity.request_many_dials(random_peers).await?; + self.request_many_dials_with_initial_delay(random_peers).await?; self.random_pool_last_refresh = Some(Instant::now()); Ok(()) @@ -589,10 +613,10 @@ impl DhtConnectivity { debug!(target: LOG_TARGET, "Pool peer {} disconnected. Redialling...", node_id); // Attempt to reestablish the lost connection to the pool peer. If reconnection fails, // it is replaced with another peer (replace_pool_peer via PeerConnectFailed) - self.connectivity.request_many_dials([node_id]).await?; + self.request_many_dials_with_initial_delay([node_id]).await?; }, ConnectivityStateOnline(n) => { - self.refresh_peer_pools().await?; + self.refresh_peer_pools_with_initial_delay().await?; if self.config.auto_join && self.should_send_join() { debug!( target: LOG_TARGET, @@ -608,7 +632,7 @@ impl DhtConnectivity { }, ConnectivityStateOffline => { debug!(target: LOG_TARGET, "Node is OFFLINE"); - self.refresh_peer_pools().await?; + self.refresh_peer_pools_with_initial_delay().await?; }, _ => {}, } @@ -637,7 +661,7 @@ impl DhtConnectivity { self.random_pool.swap_remove(pos); } self.random_pool.push(new_peer.clone()); - self.connectivity.request_many_dials([new_peer]).await?; + self.request_many_dials_with_initial_delay([new_peer]).await?; }, None => { debug!( @@ -671,7 +695,7 @@ impl DhtConnectivity { self.neighbours.remove(pos); } self.insert_neighbour(node_id.clone()); - self.connectivity.request_many_dials([node_id]).await?; + self.request_many_dials_with_initial_delay([node_id]).await?; }, None => { info!( diff --git a/comms/dht/src/discovery/service.rs b/comms/dht/src/discovery/service.rs index f946327af4..ae0a4a77ec 100644 --- a/comms/dht/src/discovery/service.rs +++ b/comms/dht/src/discovery/service.rs @@ -282,6 +282,7 @@ impl DhtDiscoveryService { ) -> Result { match result { Ok(peer) => Ok(peer), + Err(err @ DhtPeerValidatorError::NewAndExistingMismatch { .. }) => Err(err), Err(err @ DhtPeerValidatorError::IdentityTooManyClaims { .. }) | Err(err @ DhtPeerValidatorError::ValidatorError(_)) => { self.dht.ban_peer(public_key.clone(), OffenceSeverity::High, &err).await; diff --git a/comms/dht/src/inbound/dht_handler/task.rs b/comms/dht/src/inbound/dht_handler/task.rs index 99671f1288..691639f9f0 100644 --- a/comms/dht/src/inbound/dht_handler/task.rs +++ b/comms/dht/src/inbound/dht_handler/task.rs @@ -464,6 +464,7 @@ where S: Service Err(err) => { match &err { DhtInboundError::PeerValidatorError(err) => match err { + DhtPeerValidatorError::NewAndExistingMismatch { .. } => {}, err @ DhtPeerValidatorError::ValidatorError(_) | err @ DhtPeerValidatorError::IdentityTooManyClaims { .. } => { self.dht diff --git a/comms/dht/src/network_discovery/config.rs b/comms/dht/src/network_discovery/config.rs index a6c932ea25..dcebcba40e 100644 --- a/comms/dht/src/network_discovery/config.rs +++ b/comms/dht/src/network_discovery/config.rs @@ -53,6 +53,11 @@ pub struct NetworkDiscoveryConfig { /// The maximum number of peers we allow per round of sync. /// Default: 500 pub max_peers_to_sync_per_round: u32, + /// Initial refresh sync peers delay period if more than one peer needs to be synced, handy when a specific + /// connection needs preference. + /// Default: 15 + #[serde(with = "serializers::seconds")] + pub initial_peer_sync_dalay: Duration, } impl Default for NetworkDiscoveryConfig { @@ -65,6 +70,7 @@ impl Default for NetworkDiscoveryConfig { on_failure_idle_period: Duration::from_secs(5), max_sync_peers: 5, max_peers_to_sync_per_round: 500, + initial_peer_sync_dalay: Duration::from_secs(15), } } } diff --git a/comms/dht/src/network_discovery/on_connect.rs b/comms/dht/src/network_discovery/on_connect.rs index d0ac25cb66..49ff209afc 100644 --- a/comms/dht/src/network_discovery/on_connect.rs +++ b/comms/dht/src/network_discovery/on_connect.rs @@ -121,7 +121,7 @@ impl OnConnect { StateEvent::Shutdown } - async fn sync_peers(&self, mut conn: PeerConnection) -> Result<(), NetworkDiscoveryError> { + async fn sync_peers(&mut self, mut conn: PeerConnection) -> Result<(), NetworkDiscoveryError> { let mut client = conn.connect_rpc::().await?; let peer_stream = client .get_peers(GetPeersRequest { diff --git a/comms/dht/src/peer_validator.rs b/comms/dht/src/peer_validator.rs index a3d65bc0a5..770915a578 100644 --- a/comms/dht/src/peer_validator.rs +++ b/comms/dht/src/peer_validator.rs @@ -27,10 +27,11 @@ use tari_comms::{ peer_validator, peer_validator::{find_most_recent_claim, PeerValidatorError}, }; +use tari_utilities::hex::Hex; use crate::{rpc::UnvalidatedPeerInfo, DhtConfig}; -const _LOG_TARGET: &str = "dht::network_discovery::peer_validator"; +const LOG_TARGET: &str = "dht::network_discovery::peer_validator"; /// Validation errors for peers shared on the network #[derive(Debug, thiserror::Error)] @@ -39,6 +40,8 @@ pub enum DhtPeerValidatorError { ValidatorError(#[from] PeerValidatorError), #[error("Peer provided too many claims: expected max {max} but got {length}")] IdentityTooManyClaims { length: usize, max: usize }, + #[error("Optional existing peer does not match new peer: existing '{existing}', new '{new}'")] + NewAndExistingMismatch { existing: String, new: String }, } /// Validator for Peers @@ -73,9 +76,23 @@ impl<'a> PeerValidator<'a> { }); } + if let Some(existing) = &existing_peer { + if existing.public_key != new_peer.public_key { + return Err(DhtPeerValidatorError::NewAndExistingMismatch { + existing: format!("'{}' / '{}'", existing.node_id.to_hex(), existing.public_key.to_hex()), + new: format!( + "'{}' / '{}'", + NodeId::from_public_key(&new_peer.public_key), + new_peer.public_key.to_hex() + ), + }); + } + } + let most_recent_claim = find_most_recent_claim(&new_peer.claims).expect("new_peer.claims is not empty"); let node_id = NodeId::from_public_key(&new_peer.public_key); + let node_id_hex = node_id.to_hex(); let mut peer = existing_peer.unwrap_or_else(|| { Peer::new( @@ -98,7 +115,13 @@ impl<'a> PeerValidator<'a> { peer.update_addresses(&claim.addresses, &PeerAddressSource::FromDiscovery { peer_identity_claim: claim.clone(), }); - peer.addresses.mark_all_addresses_as_last_seen_now(&claim.addresses); + trace!( + target: LOG_TARGET, + "Peer '{}' / '{}' added with address(es) from claim: {:?}", + node_id_hex, + new_peer.public_key.to_hex(), + claim.addresses + ); } Ok(peer) diff --git a/comms/dht/src/rpc/service.rs b/comms/dht/src/rpc/service.rs index e0ac04f21d..380d15e1da 100644 --- a/comms/dht/src/rpc/service.rs +++ b/comms/dht/src/rpc/service.rs @@ -29,7 +29,7 @@ use tari_comms::{ utils, PeerManager, }; -use tari_utilities::ByteArray; +use tari_utilities::{hex::Hex, ByteArray}; use tokio::{sync::mpsc, task}; use crate::{ @@ -68,9 +68,27 @@ impl DhtRpcServiceImpl { let iter = peers .into_iter() .filter_map(|peer| { - let peer_info = + let mut peer_info = UnvalidatedPeerInfo::from_peer_limited_claims(peer, max_claims, max_addresses_per_claim); + // Filter out all identity claims with invalid signatures + let count = peer_info.claims.len(); + let peer_public_key = peer_info.public_key.clone(); + peer_info.claims.retain(|claim| { + claim + .signature + .is_valid(&peer_public_key, claim.features, claim.addresses.as_slice()) + }); + if count != peer_info.claims.len() { + warn!( + target: LOG_TARGET, + "Peer `{}` provided {} claims but only {} were valid", + peer_info.public_key.to_hex(), + count, + peer_info.claims.len() + ); + } + if peer_info.claims.is_empty() { None } else { From b8ad59ddf0b91aa40a2aed186104b76a15f93389 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Thu, 18 Apr 2024 18:05:22 +0200 Subject: [PATCH 2/5] review comments --- applications/minotari_node/src/bootstrap.rs | 2 - .../src/commands/command/status.rs | 10 ++-- .../contacts/src/chat_client/src/config.rs | 1 + base_layer/contacts/tests/contacts_service.rs | 2 +- base_layer/core/tests/helpers/nodes.rs | 1 + base_layer/p2p/src/config.rs | 4 +- base_layer/p2p/src/initialization.rs | 2 +- base_layer/wallet/src/config.rs | 2 +- base_layer/wallet/tests/other/mod.rs | 4 +- base_layer/wallet_ffi/src/lib.rs | 8 ++- common/config/presets/c_base_node_c.toml | 10 ++-- common/config/presets/d_console_wallet.toml | 12 ++--- comms/core/src/builder/comms_node.rs | 26 ++++------ comms/core/src/builder/mod.rs | 5 +- comms/core/src/connection_manager/listener.rs | 6 +-- comms/core/src/connection_manager/manager.rs | 6 +-- comms/core/src/connection_manager/mod.rs | 6 +-- .../{liveness.rs => self_liveness.rs} | 44 ++++++++-------- comms/dht/src/config.rs | 1 + comms/dht/src/connectivity/mod.rs | 51 ++++++------------- comms/dht/src/network_discovery/config.rs | 11 ++-- .../dht/src/network_discovery/initializing.rs | 15 +++++- .../src/network_discovery/state_machine.rs | 7 ++- comms/dht/src/network_discovery/test.rs | 3 ++ 24 files changed, 118 insertions(+), 121 deletions(-) rename comms/core/src/connection_manager/{liveness.rs => self_liveness.rs} (82%) diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index 9bf7ea1125..229ecd25b9 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -24,7 +24,6 @@ use std::{ cmp, str::FromStr, sync::{Arc, RwLock}, - time::Duration, }; use log::*; @@ -122,7 +121,6 @@ where B: BlockchainBackend + 'static let tor_identity = load_from_json(&base_node_config.tor_identity_file) .map_err(|e| ExitError::new(ExitCode::ConfigError, e))?; p2p_config.transport.tor.identity = tor_identity; - p2p_config.listener_liveness_check_interval = Some(Duration::from_secs(15)); let mut handles = StackBuilder::new(self.interrupt_signal) .add_initializer(P2pInitializer::new( diff --git a/applications/minotari_node/src/commands/command/status.rs b/applications/minotari_node/src/commands/command/status.rs index f4d101a104..67b867ddf0 100644 --- a/applications/minotari_node/src/commands/command/status.rs +++ b/applications/minotari_node/src/commands/command/status.rs @@ -27,7 +27,7 @@ use async_trait::async_trait; use chrono::{DateTime, NaiveDateTime, Utc}; use clap::Parser; use minotari_app_utilities::consts; -use tari_comms::connection_manager::LivenessStatus; +use tari_comms::connection_manager::SelfLivenessStatus; use tokio::time; use super::{CommandContext, HandleCommand}; @@ -127,14 +127,14 @@ impl CommandContext { ); match self.comms.liveness_status() { - LivenessStatus::Disabled => {}, - LivenessStatus::Checking => { + SelfLivenessStatus::Disabled => {}, + SelfLivenessStatus::Checking => { status_line.add("⏳️️"); }, - LivenessStatus::Unreachable => { + SelfLivenessStatus::Unreachable => { status_line.add("️🔌"); }, - LivenessStatus::Live(latency) => { + SelfLivenessStatus::Live(latency) => { status_line.add(format!("⚡️ {:.2?}", latency)); }, } diff --git a/base_layer/contacts/src/chat_client/src/config.rs b/base_layer/contacts/src/chat_client/src/config.rs index 3e0507365c..0c19f7d9ee 100644 --- a/base_layer/contacts/src/chat_client/src/config.rs +++ b/base_layer/contacts/src/chat_client/src/config.rs @@ -168,6 +168,7 @@ impl ChatClientConfig { database_url: DbConnectionUrl::file("data/chat_client/dht.sqlite"), network_discovery: NetworkDiscoveryConfig { enabled: true, + initial_peer_sync_delay: None, ..NetworkDiscoveryConfig::default() }, saf: SafConfig { diff --git a/base_layer/contacts/tests/contacts_service.rs b/base_layer/contacts/tests/contacts_service.rs index 215d0d7456..f9971430e6 100644 --- a/base_layer/contacts/tests/contacts_service.rs +++ b/base_layer/contacts/tests/contacts_service.rs @@ -96,7 +96,7 @@ pub fn setup_contacts_service( user_agent: "tari/test-contacts-service".to_string(), rpc_max_simultaneous_sessions: 0, rpc_max_sessions_per_peer: 0, - listener_liveness_check_interval: None, + listener_self_liveness_check_interval: None, }; let peer_message_subscription_factory = Arc::new(subscription_factory); let shutdown = Shutdown::new(); diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index c22e65ac18..c33586432d 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -357,6 +357,7 @@ async fn setup_base_node_services( let handles = StackBuilder::new(shutdown.to_signal()) .add_initializer(RegisterHandle::new(dht)) .add_initializer(RegisterHandle::new(comms.connectivity())) + .add_initializer(RegisterHandle::new(comms.peer_manager())) .add_initializer(LivenessInitializer::new( liveness_service_config, Arc::clone(&subscription_factory), diff --git a/base_layer/p2p/src/config.rs b/base_layer/p2p/src/config.rs index 734ac7632e..91d38c3eaa 100644 --- a/base_layer/p2p/src/config.rs +++ b/base_layer/p2p/src/config.rs @@ -112,7 +112,7 @@ pub struct P2pConfig { pub listener_liveness_max_sessions: usize, /// If Some, enables periodic socket-level liveness checks #[serde(with = "serializers::optional_seconds")] - pub listener_liveness_check_interval: Option, + pub listener_self_liveness_check_interval: Option, /// CIDR for addresses allowed to enter into liveness check mode on the listener. pub listener_liveness_allowlist_cidrs: StringList, /// User agent string for this node @@ -146,7 +146,7 @@ impl Default for P2pConfig { }, allow_test_addresses: false, listener_liveness_max_sessions: 0, - listener_liveness_check_interval: None, + listener_self_liveness_check_interval: None, listener_liveness_allowlist_cidrs: StringList::default(), user_agent: String::new(), auxiliary_tcp_listener_address: None, diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index d4cf62ca97..4d3ba863d1 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -559,7 +559,7 @@ impl ServiceInitializer for P2pInitializer { network_byte: self.network.as_byte(), user_agent: config.user_agent.clone(), }) - .set_liveness_check(config.listener_liveness_check_interval); + .set_self_liveness_check(config.listener_self_liveness_check_interval); if config.allow_test_addresses || config.dht.peer_validator_config.allow_test_addresses { // The default is false, so ensure that both settings are true in this case diff --git a/base_layer/wallet/src/config.rs b/base_layer/wallet/src/config.rs index 3d804cb5f5..368739bb69 100644 --- a/base_layer/wallet/src/config.rs +++ b/base_layer/wallet/src/config.rs @@ -131,7 +131,7 @@ impl Default for WalletConfig { fn default() -> Self { let p2p = P2pConfig { datastore_path: PathBuf::from("peer_db/wallet"), - listener_liveness_check_interval: None, + listener_self_liveness_check_interval: None, ..Default::default() }; Self { diff --git a/base_layer/wallet/tests/other/mod.rs b/base_layer/wallet/tests/other/mod.rs index 31eb36e322..592af4c345 100644 --- a/base_layer/wallet/tests/other/mod.rs +++ b/base_layer/wallet/tests/other/mod.rs @@ -154,7 +154,7 @@ async fn create_wallet( auxiliary_tcp_listener_address: None, rpc_max_simultaneous_sessions: 0, rpc_max_sessions_per_peer: 0, - listener_liveness_check_interval: None, + listener_self_liveness_check_interval: None, }; let sql_database_path = comms_config @@ -692,7 +692,7 @@ async fn test_import_utxo() { auxiliary_tcp_listener_address: None, rpc_max_simultaneous_sessions: 0, rpc_max_sessions_per_peer: 0, - listener_liveness_check_interval: None, + listener_self_liveness_check_interval: None, }; let config = WalletConfig { p2p: comms_config, diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index ef7cdff9de..c723d649a7 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -131,7 +131,7 @@ use tari_comms::{ transports::MemoryTransport, types::CommsPublicKey, }; -use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig}; +use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig, NetworkDiscoveryConfig}; use tari_contacts::contacts_service::{handle::ContactsServiceHandle, types::Contact}; use tari_core::{ borsh::FromBytes, @@ -4858,6 +4858,10 @@ pub unsafe extern "C" fn comms_config_create( auto_request: true, ..Default::default() }, + network_discovery: NetworkDiscoveryConfig { + initial_peer_sync_delay: Some(Duration::from_secs(25)), + ..Default::default() + }, ..Default::default() }, allow_test_addresses: true, @@ -4866,7 +4870,7 @@ pub unsafe extern "C" fn comms_config_create( user_agent: format!("tari/mobile_wallet/{}", env!("CARGO_PKG_VERSION")), rpc_max_simultaneous_sessions: 0, rpc_max_sessions_per_peer: 0, - listener_liveness_check_interval: None, + listener_self_liveness_check_interval: None, }; Box::into_raw(Box::new(config)) diff --git a/common/config/presets/c_base_node_c.toml b/common/config/presets/c_base_node_c.toml index 318b594e9a..f0754403f0 100644 --- a/common/config/presets/c_base_node_c.toml +++ b/common/config/presets/c_base_node_c.toml @@ -148,7 +148,7 @@ track_reorgs = true # CIDR for addresses allowed to enter into liveness check mode on the listener. #listener_liveness_allowlist_cidrs = [] # Enables periodic socket-level liveness checks. Default: Disabled -listener_liveness_check_interval = 15 +listener_self_liveness_check_interval = 15 # User agent string for this node #user_agent = "" @@ -289,11 +289,9 @@ database_url = "data/base_node/dht.db" # Default: 5 #network_discovery.max_sync_peers = 5 # The maximum number of peers we allow per round of sync. (Default: 500) -#max_peers_to_sync_per_round = 500 -# Initial refresh sync peers delay period if more than one peer needs to be synced, handy when a wallet needs to -# give preference to its base node peer connection. (Default: 15) -#[serde(with = "serializers::seconds")] -#initial_peer_sync_dalay = 15 +#network_discovery.max_peers_to_sync_per_round = 500 +# Initial refresh sync peers delay period, when a configured connection needs preference. (Default: Disabled) +#network_discovery.initial_peer_sync_delay = 0 # Length of time to ban a peer if the peer misbehaves at the DHT-level. Default: 6 hrs #ban_duration = 21_600 # 6 * 60 * 60 diff --git a/common/config/presets/d_console_wallet.toml b/common/config/presets/d_console_wallet.toml index d5870d665b..7d076434bf 100644 --- a/common/config/presets/d_console_wallet.toml +++ b/common/config/presets/d_console_wallet.toml @@ -198,7 +198,7 @@ event_channel_size = 3500 # CIDR for addresses allowed to enter into liveness check mode on the listener. #listener_liveness_allowlist_cidrs = [] # Enables periodic socket-level liveness checks. Default: Disabled -# listener_liveness_check_interval = 15 +# listener_self_liveness_check_interval = 15 # User agent string for this node #user_agent = "" @@ -206,6 +206,8 @@ event_channel_size = 3500 # The maximum simultaneous comms RPC sessions allowed (default value = 100). Setting this to -1 will allow unlimited # sessions. #rpc_max_simultaneous_sessions = 100 +# The maximum comms RPC sessions allowed per peer (default value = 10). +#rpc_max_sessions_per_peer = 10 [wallet.p2p.transport] # -------------- Transport configuration -------------- @@ -335,11 +337,9 @@ network_discovery.min_desired_peers = 8 # Default: 5 #network_discovery.max_sync_peers = 5 # The maximum number of peers we allow per round of sync. (Default: 500) -#max_peers_to_sync_per_round = 500 -# Initial refresh sync peers delay period if more than one peer needs to be synced, handy when a wallet needs to -# give preference to its base node peer connection. (Default: 15) -#[serde(with = "serializers::seconds")] -#initial_peer_sync_dalay = 15 +#network_discovery.max_peers_to_sync_per_round = 500 +# Initial refresh sync peers delay period, when a configured connection needs preference. (Default: Disabled) +network_discovery.initial_peer_sync_delay = 15 # Length of time to ban a peer if the peer misbehaves at the DHT-level. Default: 6 hrs #ban_duration = 21_600 # 6 * 60 * 60 diff --git a/comms/core/src/builder/comms_node.rs b/comms/core/src/builder/comms_node.rs index b9bd002a98..39252642d5 100644 --- a/comms/core/src/builder/comms_node.rs +++ b/comms/core/src/builder/comms_node.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{iter, sync::Arc, time::Duration}; +use std::{iter, sync::Arc}; use log::*; use tari_shutdown::ShutdownSignal; @@ -36,8 +36,8 @@ use crate::{ ConnectionManagerEvent, ConnectionManagerRequest, ConnectionManagerRequester, - LivenessCheck, - LivenessStatus, + SelfLivenessCheck, + SelfLivenessStatus, }, connectivity::{ConnectivityEventRx, ConnectivityManager, ConnectivityRequest, ConnectivityRequester}, multiaddr::Multiaddr, @@ -125,12 +125,6 @@ impl UnspawnedCommsNode { self } - /// Set to true to enable self liveness checking for the configured public address - pub fn set_liveness_check(mut self, interval: Option) -> Self { - self.builder = self.builder.set_liveness_check(interval); - self - } - /// Spawn a new node using the specified [Transport](crate::transports::Transport). #[allow(clippy::too_many_lines)] pub async fn spawn_with_transport(self, transport: TTransport) -> Result @@ -228,12 +222,14 @@ impl UnspawnedCommsNode { // Spawn liveness check now that we have the final address let public_addresses = node_identity.public_addresses(); let liveness_watch = if public_addresses.is_empty() { - watch::channel(LivenessStatus::Disabled).1 + watch::channel(SelfLivenessStatus::Disabled).1 } else { connection_manager_config - .liveness_self_check_interval - .map(|interval| LivenessCheck::spawn(transport, public_addresses, interval, shutdown_signal.clone())) - .unwrap_or_else(|| watch::channel(LivenessStatus::Disabled).1) + .self_liveness_self_check_interval + .map(|interval| { + SelfLivenessCheck::spawn(transport, public_addresses, interval, shutdown_signal.clone()) + }) + .unwrap_or_else(|| watch::channel(SelfLivenessStatus::Disabled).1) }; Ok(CommsNode { @@ -285,7 +281,7 @@ pub struct CommsNode { /// Shared PeerManager instance peer_manager: Arc, /// Current liveness status - liveness_watch: watch::Receiver, + liveness_watch: watch::Receiver, /// The 'reciprocal' shutdown signals for each comms service complete_signals: Vec, } @@ -321,7 +317,7 @@ impl CommsNode { } /// Returns the current liveness status - pub fn liveness_status(&self) -> LivenessStatus { + pub fn liveness_status(&self) -> SelfLivenessStatus { *self.liveness_watch.borrow() } diff --git a/comms/core/src/builder/mod.rs b/comms/core/src/builder/mod.rs index 4048905377..26f4cc503a 100644 --- a/comms/core/src/builder/mod.rs +++ b/comms/core/src/builder/mod.rs @@ -125,7 +125,6 @@ pub struct CommsBuilder { hidden_service_ctl: Option, connection_manager_config: ConnectionManagerConfig, connectivity_config: ConnectivityConfig, - shutdown_signal: Option, } @@ -288,8 +287,8 @@ impl CommsBuilder { } /// Enable and set interval for self-liveness checks, or None to disable it (default) - pub fn set_liveness_check(mut self, check_interval: Option) -> Self { - self.connection_manager_config.liveness_self_check_interval = check_interval; + pub fn set_self_liveness_check(mut self, check_interval: Option) -> Self { + self.connection_manager_config.self_liveness_self_check_interval = check_interval; self } diff --git a/comms/core/src/connection_manager/listener.rs b/comms/core/src/connection_manager/listener.rs index 18c1170f49..0631692101 100644 --- a/comms/core/src/connection_manager/listener.rs +++ b/comms/core/src/connection_manager/listener.rs @@ -55,7 +55,7 @@ use crate::connection_manager::metrics; use crate::{ bounded_executor::BoundedExecutor, connection_manager::{ - liveness::LivenessSession, + self_liveness::SelfLivenessSession, wire_mode::{WireMode, LIVENESS_WIRE_MODE}, }, multiaddr::Multiaddr, @@ -221,7 +221,7 @@ where shutdown_signal: ShutdownSignal, ) { permit.fetch_sub(1, Ordering::SeqCst); - let liveness = LivenessSession::new(socket); + let liveness = SelfLivenessSession::new(socket); debug!(target: LOG_TARGET, "Started liveness session"); tokio::spawn(async move { future::select(liveness.run(), shutdown_signal).await; @@ -296,7 +296,7 @@ where let _result = socket.shutdown().await; }, Ok(WireMode::Liveness) => { - if config.liveness_self_check_interval.is_some() || + if config.self_liveness_self_check_interval.is_some() || (liveness_session_count.load(Ordering::SeqCst) > 0 && Self::is_address_in_liveness_cidr_range(&peer_addr, &config.liveness_cidr_allowlist)) { diff --git a/comms/core/src/connection_manager/manager.rs b/comms/core/src/connection_manager/manager.rs index 42b64d4338..82b174b87e 100644 --- a/comms/core/src/connection_manager/manager.rs +++ b/comms/core/src/connection_manager/manager.rs @@ -124,7 +124,7 @@ pub struct ConnectionManagerConfig { /// CIDR blocks that allowlist liveness checks. Default: Localhost only (127.0.0.1/32) pub liveness_cidr_allowlist: Vec, /// Interval to perform self-liveness ping-pong tests. Default: None/disabled - pub liveness_self_check_interval: Option, + pub self_liveness_self_check_interval: Option, /// If set, an additional TCP-only p2p listener will be started. This is useful for local wallet connections. /// Default: None (disabled) pub auxiliary_tcp_listener_address: Option, @@ -147,7 +147,7 @@ impl Default for ConnectionManagerConfig { liveness_max_sessions: 1, time_to_first_byte: Duration::from_secs(6), liveness_cidr_allowlist: vec![cidr::AnyIpCidr::V4("127.0.0.1/32".parse().unwrap())], - liveness_self_check_interval: None, + self_liveness_self_check_interval: None, auxiliary_tcp_listener_address: None, peer_validation_config: PeerValidatorConfig::default(), noise_handshake_recv_timeout: Duration::from_secs(6), @@ -229,7 +229,7 @@ where info!(target: LOG_TARGET, "Starting auxiliary listener on {}", addr); let aux_config = ConnectionManagerConfig { // Disable liveness checks on the auxiliary listener - liveness_self_check_interval: None, + self_liveness_self_check_interval: None, ..config.clone() }; PeerListener::new( diff --git a/comms/core/src/connection_manager/mod.rs b/comms/core/src/connection_manager/mod.rs index 2b3469a3d3..3ca92fe339 100644 --- a/comms/core/src/connection_manager/mod.rs +++ b/comms/core/src/connection_manager/mod.rs @@ -51,9 +51,9 @@ pub use error::{ConnectionManagerError, PeerConnectionError}; mod peer_connection; pub use peer_connection::{ConnectionId, NegotiatedSubstream, PeerConnection, PeerConnectionRequest}; -mod liveness; -pub(crate) use liveness::LivenessCheck; -pub use liveness::LivenessStatus; +mod self_liveness; +pub(crate) use self_liveness::SelfLivenessCheck; +pub use self_liveness::SelfLivenessStatus; mod wire_mode; diff --git a/comms/core/src/connection_manager/liveness.rs b/comms/core/src/connection_manager/self_liveness.rs similarity index 82% rename from comms/core/src/connection_manager/liveness.rs rename to comms/core/src/connection_manager/self_liveness.rs index f5dd83e08a..aaebdae860 100644 --- a/comms/core/src/connection_manager/liveness.rs +++ b/comms/core/src/connection_manager/self_liveness.rs @@ -40,14 +40,14 @@ use crate::{connection_manager::wire_mode::WireMode, transports::Transport}; /// Max line length accepted by the liveness session. const MAX_LINE_LENGTH: usize = 50; -const LOG_TARGET: &str = "comms::connection_manager::liveness"; +const LOG_TARGET: &str = "comms::connection_manager::self_liveness"; /// Echo server for liveness checks -pub struct LivenessSession { +pub struct SelfLivenessSession { framed: Framed, } -impl LivenessSession +impl SelfLivenessSession where TSocket: AsyncRead + AsyncWrite + Unpin { pub fn new(socket: TSocket) -> Self { @@ -63,22 +63,22 @@ where TSocket: AsyncRead + AsyncWrite + Unpin } #[derive(Debug, Clone, Copy)] -pub enum LivenessStatus { +pub enum SelfLivenessStatus { Disabled, Checking, Unreachable, Live(Duration), } -pub struct LivenessCheck { +pub struct SelfLivenessCheck { transport: TTransport, addresses: Vec, interval: Duration, - tx_watch: watch::Sender, + tx_watch: watch::Sender, shutdown_signal: ShutdownSignal, } -impl LivenessCheck +impl SelfLivenessCheck where TTransport: Transport + Send + Sync + 'static, TTransport::Output: AsyncRead + AsyncWrite + Unpin + Send, @@ -88,8 +88,8 @@ where addresses: Vec, interval: Duration, shutdown_signal: ShutdownSignal, - ) -> watch::Receiver { - let (tx_watch, rx_watch) = watch::channel(LivenessStatus::Checking); + ) -> watch::Receiver { + let (tx_watch, rx_watch) = watch::channel(SelfLivenessStatus::Checking); let check = Self { transport, addresses, @@ -120,36 +120,36 @@ where let mut current_address_idx = 0; loop { let timer = Instant::now(); - let _ = self.tx_watch.send(LivenessStatus::Checking); + let _ = self.tx_watch.send(SelfLivenessStatus::Checking); let address = self.addresses[current_address_idx].clone(); match self.transport.dial(&address).await { Ok(mut socket) => { debug!( target: LOG_TARGET, - "🔌 liveness dial ({}) took {:.2?}", + "🔌 self liveness dial ({}) took {:.2?}", address, timer.elapsed() ); if let Err(err) = socket.write(&[WireMode::Liveness.as_byte()]).await { - warn!(target: LOG_TARGET, "🔌️ liveness failed to write byte: {}", err); - self.tx_watch.send_replace(LivenessStatus::Unreachable); + warn!(target: LOG_TARGET, "🔌️ self liveness failed to write byte: {}", err); + self.tx_watch.send_replace(SelfLivenessStatus::Unreachable); continue; } let mut framed = Framed::new(socket, LinesCodec::new_with_max_length(MAX_LINE_LENGTH)); loop { match self.ping_pong(&mut framed).await { Ok(Some(latency)) => { - debug!(target: LOG_TARGET, "⚡️️ liveness check latency {:.2?}", latency); - self.tx_watch.send_replace(LivenessStatus::Live(latency)); + debug!(target: LOG_TARGET, "⚡️️ self liveness check latency {:.2?}", latency); + self.tx_watch.send_replace(SelfLivenessStatus::Live(latency)); }, Ok(None) => { - info!(target: LOG_TARGET, "🔌️ liveness connection closed"); - self.tx_watch.send_replace(LivenessStatus::Unreachable); + info!(target: LOG_TARGET, "🔌️ self liveness connection closed"); + self.tx_watch.send_replace(SelfLivenessStatus::Unreachable); break; }, Err(err) => { - warn!(target: LOG_TARGET, "🔌️ ping pong failed: {}", err); - self.tx_watch.send_replace(LivenessStatus::Unreachable); + warn!(target: LOG_TARGET, "🔌️ self liveness ping pong failed: {}", err); + self.tx_watch.send_replace(SelfLivenessStatus::Unreachable); // let _ = framed.close().await; break; }, @@ -160,10 +160,10 @@ where }, Err(err) => { current_address_idx = (current_address_idx + 1) % self.addresses.len(); - self.tx_watch.send_replace(LivenessStatus::Unreachable); + self.tx_watch.send_replace(SelfLivenessStatus::Unreachable); warn!( target: LOG_TARGET, - "🔌️ Failed to dial public address {} for self check: {}", address, err + "🔌️ Failed to dial own public address {} for self check: {}", address, err ); }, } @@ -200,7 +200,7 @@ mod test { #[tokio::test] async fn echos() { let (inbound, outbound) = MemorySocket::new_pair(); - let liveness = LivenessSession::new(inbound); + let liveness = SelfLivenessSession::new(inbound); let join_handle = tokio::spawn(liveness.run()); let mut outbound = Framed::new(outbound, LinesCodec::new()); for _ in 0..10usize { diff --git a/comms/dht/src/config.rs b/comms/dht/src/config.rs index dc7cefc5bd..4937eaacb4 100644 --- a/comms/dht/src/config.rs +++ b/comms/dht/src/config.rs @@ -137,6 +137,7 @@ impl DhtConfig { network_discovery: NetworkDiscoveryConfig { // If a test requires the peer probe they should explicitly enable it enabled: false, + initial_peer_sync_delay: None, ..Default::default() }, peer_validator_config: PeerValidatorConfig { diff --git a/comms/dht/src/connectivity/mod.rs b/comms/dht/src/connectivity/mod.rs index b7c7f94996..9094396392 100644 --- a/comms/dht/src/connectivity/mod.rs +++ b/comms/dht/src/connectivity/mod.rs @@ -93,7 +93,6 @@ pub(crate) struct DhtConnectivity { metrics_collector: MetricsCollectorHandle, cooldown_in_effect: Option, shutdown_signal: ShutdownSignal, - first_round_completed: bool, } impl DhtConnectivity { @@ -122,7 +121,6 @@ impl DhtConnectivity { dht_events, cooldown_in_effect: None, shutdown_signal, - first_round_completed: false, } } @@ -151,7 +149,12 @@ impl DhtConnectivity { } pub async fn run(mut self, mut connectivity_events: ConnectivityEventRx) -> Result<(), DhtConnectivityError> { - debug!(target: LOG_TARGET, "DHT connectivity starting"); + // Initial discovery and refresh sync peers delay period, when a configured connection needs preference, + // usually needed for the wallet to connect to its own base node first. + if let Some(delay) = self.config.network_discovery.initial_peer_sync_delay { + tokio::time::sleep(delay).await; + debug!(target: LOG_TARGET, "DHT connectivity starting after delayed for {:.0?}", delay); + } self.refresh_neighbour_pool().await?; let mut ticker = time::interval(self.config.connectivity.update_interval); @@ -291,7 +294,7 @@ impl DhtConnectivity { match event { DhtEvent::NetworkDiscoveryPeersAdded(info) => { if info.num_new_peers > 0 { - self.refresh_peer_pools_with_initial_delay().await?; + self.refresh_peer_pools().await?; } }, _ => {}, @@ -325,8 +328,7 @@ impl DhtConnectivity { Ok(()) } - async fn refresh_peer_pools_with_initial_delay(&mut self) -> Result<(), DhtConnectivityError> { - self.handle_initial_delay().await; + async fn refresh_peer_pools(&mut self) -> Result<(), DhtConnectivityError> { info!( target: LOG_TARGET, "Reinitializing neighbour pool. (size={})", @@ -339,27 +341,6 @@ impl DhtConnectivity { Ok(()) } - async fn request_many_dials_with_initial_delay>( - &mut self, - peers: I, - ) -> Result<(), DhtConnectivityError> { - self.handle_initial_delay().await; - self.connectivity.request_many_dials(peers).await?; - Ok(()) - } - - async fn handle_initial_delay(&mut self) { - if !self.first_round_completed { - self.first_round_completed = true; - tokio::time::sleep(self.config.network_discovery.initial_peer_sync_dalay).await; - debug!( - target: LOG_TARGET, - "Delayed {:.0?} before refreshing the neighbour pool for the first time", - self.config.network_discovery.initial_peer_sync_dalay - ); - } - } - async fn refresh_neighbour_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> { if self.num_connected_neighbours() < self.config.num_neighbouring_nodes { self.refresh_neighbour_pool().await?; @@ -424,7 +405,7 @@ impl DhtConnectivity { }); if !new_neighbours.is_empty() { - self.request_many_dials_with_initial_delay(new_neighbours).await?; + self.connectivity.request_many_dials(new_neighbours).await?; } self.redial_neighbours_as_required().await?; @@ -451,7 +432,7 @@ impl DhtConnectivity { "Redialling {} disconnected peer(s)", to_redial.len() ); - self.request_many_dials_with_initial_delay(to_redial).await?; + self.connectivity.request_many_dials(to_redial).await?; } Ok(()) @@ -506,7 +487,7 @@ impl DhtConnectivity { difference.iter().for_each(|peer| { self.remove_connection_handle(peer); }); - self.request_many_dials_with_initial_delay(random_peers).await?; + self.connectivity.request_many_dials(random_peers).await?; self.random_pool_last_refresh = Some(Instant::now()); Ok(()) @@ -613,10 +594,10 @@ impl DhtConnectivity { debug!(target: LOG_TARGET, "Pool peer {} disconnected. Redialling...", node_id); // Attempt to reestablish the lost connection to the pool peer. If reconnection fails, // it is replaced with another peer (replace_pool_peer via PeerConnectFailed) - self.request_many_dials_with_initial_delay([node_id]).await?; + self.connectivity.request_many_dials([node_id]).await?; }, ConnectivityStateOnline(n) => { - self.refresh_peer_pools_with_initial_delay().await?; + self.refresh_peer_pools().await?; if self.config.auto_join && self.should_send_join() { debug!( target: LOG_TARGET, @@ -632,7 +613,7 @@ impl DhtConnectivity { }, ConnectivityStateOffline => { debug!(target: LOG_TARGET, "Node is OFFLINE"); - self.refresh_peer_pools_with_initial_delay().await?; + self.refresh_peer_pools().await?; }, _ => {}, } @@ -661,7 +642,7 @@ impl DhtConnectivity { self.random_pool.swap_remove(pos); } self.random_pool.push(new_peer.clone()); - self.request_many_dials_with_initial_delay([new_peer]).await?; + self.connectivity.request_many_dials([new_peer]).await?; }, None => { debug!( @@ -695,7 +676,7 @@ impl DhtConnectivity { self.neighbours.remove(pos); } self.insert_neighbour(node_id.clone()); - self.request_many_dials_with_initial_delay([node_id]).await?; + self.connectivity.request_many_dials([node_id]).await?; }, None => { info!( diff --git a/comms/dht/src/network_discovery/config.rs b/comms/dht/src/network_discovery/config.rs index dcebcba40e..bb2c00dc31 100644 --- a/comms/dht/src/network_discovery/config.rs +++ b/comms/dht/src/network_discovery/config.rs @@ -53,11 +53,10 @@ pub struct NetworkDiscoveryConfig { /// The maximum number of peers we allow per round of sync. /// Default: 500 pub max_peers_to_sync_per_round: u32, - /// Initial refresh sync peers delay period if more than one peer needs to be synced, handy when a specific - /// connection needs preference. - /// Default: 15 - #[serde(with = "serializers::seconds")] - pub initial_peer_sync_dalay: Duration, + /// Initial refresh sync peers delay period, when a configured connection needs preference. + /// Default: None + #[serde(with = "serializers::optional_seconds")] + pub initial_peer_sync_delay: Option, } impl Default for NetworkDiscoveryConfig { @@ -70,7 +69,7 @@ impl Default for NetworkDiscoveryConfig { on_failure_idle_period: Duration::from_secs(5), max_sync_peers: 5, max_peers_to_sync_per_round: 500, - initial_peer_sync_dalay: Duration::from_secs(15), + initial_peer_sync_delay: None, } } } diff --git a/comms/dht/src/network_discovery/initializing.rs b/comms/dht/src/network_discovery/initializing.rs index 16eec2dbed..056edd9e2a 100644 --- a/comms/dht/src/network_discovery/initializing.rs +++ b/comms/dht/src/network_discovery/initializing.rs @@ -32,11 +32,15 @@ const LOG_TARGET: &str = "comms::dht::network_discovery"; #[derive(Debug)] pub(super) struct Initializing<'a> { context: &'a mut NetworkDiscoveryContext, + initial_peer_sync_delay: Option, } impl<'a> Initializing<'a> { - pub fn new(context: &'a mut NetworkDiscoveryContext) -> Self { - Self { context } + pub fn new(context: &'a mut NetworkDiscoveryContext, initial_peer_sync_delay: Option) -> Self { + Self { + context, + initial_peer_sync_delay, + } } pub async fn next_event(&mut self) -> StateEvent { @@ -53,6 +57,13 @@ impl<'a> Initializing<'a> { } } + // Initial discovery and refresh sync peers delay period, when a configured connection needs preference, + // usually needed for the wallet to connect to its own base node first. + if let Some(delay) = self.initial_peer_sync_delay { + tokio::time::sleep(delay).await; + debug!(target: LOG_TARGET, "Discovery starting after delayed for {:.0?}", delay); + } + debug!(target: LOG_TARGET, "Node is online. Starting network discovery"); StateEvent::Initialized } diff --git a/comms/dht/src/network_discovery/state_machine.rs b/comms/dht/src/network_discovery/state_machine.rs index 78281d56b9..e41d1c2018 100644 --- a/comms/dht/src/network_discovery/state_machine.rs +++ b/comms/dht/src/network_discovery/state_machine.rs @@ -196,7 +196,12 @@ impl DhtNetworkDiscovery { async fn get_next_event(&mut self, state: &mut State) -> StateEvent { use State::{Discovering, Initializing, OnConnect, Ready, Waiting}; match state { - Initializing => self::Initializing::new(&mut self.context).next_event().await, + Initializing => { + let initial_peer_sync_delay = self.config().network_discovery.initial_peer_sync_delay; + self::Initializing::new(&mut self.context, initial_peer_sync_delay) + .next_event() + .await + }, Ready(ready) => ready.next_event().await, Discovering(discovering) => discovering.next_event().await, OnConnect(on_connect) => on_connect.next_event().await, diff --git a/comms/dht/src/network_discovery/test.rs b/comms/dht/src/network_discovery/test.rs index 3d8c7e067d..c4e21c6fff 100644 --- a/comms/dht/src/network_discovery/test.rs +++ b/comms/dht/src/network_discovery/test.rs @@ -107,6 +107,7 @@ mod state_machine { num_neighbouring_nodes: 4, network_discovery: NetworkDiscoveryConfig { min_desired_peers: NUM_PEERS, + initial_peer_sync_delay: None, ..Default::default() }, ..DhtConfig::default_local_test() @@ -230,6 +231,7 @@ mod discovery_ready { let config = NetworkDiscoveryConfig { min_desired_peers: 0, idle_after_num_rounds: 0, + initial_peer_sync_delay: None, ..Default::default() }; let (_, _, _, mut ready, context) = setup(config); @@ -250,6 +252,7 @@ mod discovery_ready { let config = NetworkDiscoveryConfig { min_desired_peers: 0, idle_after_num_rounds: 0, + initial_peer_sync_delay: None, ..Default::default() }; let (_, _, _, mut ready, context) = setup(config); From 2de8c9f0c4d82057d445dbfc27f0b78d734f78a5 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Fri, 19 Apr 2024 15:23:10 +0200 Subject: [PATCH 3/5] review comments --- .../src/net_address/multiaddr_with_stats.rs | 77 ++++++++++++++++--- .../net_address/mutliaddresses_with_stats.rs | 5 -- .../dht/src/network_discovery/initializing.rs | 10 +-- .../src/network_discovery/state_machine.rs | 7 +- comms/dht/src/peer_validator.rs | 11 ++- comms/dht/src/rpc/service.rs | 22 +----- 6 files changed, 79 insertions(+), 53 deletions(-) diff --git a/comms/core/src/net_address/multiaddr_with_stats.rs b/comms/core/src/net_address/multiaddr_with_stats.rs index b71e39ecfe..29e1fcfb96 100644 --- a/comms/core/src/net_address/multiaddr_with_stats.rs +++ b/comms/core/src/net_address/multiaddr_with_stats.rs @@ -148,6 +148,11 @@ impl MultiaddrWithStats { self.update_quality_score(); } + #[cfg(test)] + fn get_averag_latency(&self) -> Option { + self.avg_latency + } + pub fn update_initial_dial_time(&mut self, initial_dial_time: Duration) { self.last_seen = Some(Utc::now().naive_utc()); @@ -206,22 +211,40 @@ impl MultiaddrWithStats { self.clone().address } + // The quality score is a measure of the reliability of the net address. It is calculated based on the following: + // - The maximum score is 'Some(1000)' points (seen within the last 1s and latency < 100ms). + // - The minimum score without any connection errors is 'Some(100)' points (seen >= 800s ago and latency >= 10s). + // - For any sort of connection error the score is 'Some(0)' points. + // - A score of `None` means it has not been tried. fn calculate_quality_score(&self) -> Option { - // If we have never seen or attempted the peer, we start with a high score to ensure that if self.last_seen.is_none() && self.last_attempted.is_none() { return None; } - let mut score_self = 0; + // The starting score + let mut score_self = 800; + // Latency score: + // - If there is no average yet, add '100' points + // - If the average latency is + // - less than 100ms, add '100' points + // - 100ms to 10,000ms', add '99' to '1' point on a sliding scale + // - 10s or more, add '0' points if let Some(val) = self.avg_latency { - // explicitly truncate the latency to avoid casting problems + // Explicitly truncate the latency to avoid casting problems let avg_latency_millis = i32::try_from(val.as_millis()).unwrap_or(i32::MAX); score_self += cmp::max(0, 100i32.saturating_sub(avg_latency_millis / 100)); } else { score_self += 100; } + // Last seen score: + // - If the last seen time is: + // - 800s or more, subtract '700' points + // - 799s to 101s, subtract '699' to '1' point on a sliding scale + // - 100s, add or subtract nothing + // - 99s to 1s, add '1' to '99' points on a sliding scale + // - less than 1s, add '100' points let last_seen_seconds: i32 = self .last_seen .map(|x| Utc::now().naive_utc() - x) @@ -229,10 +252,11 @@ impl MultiaddrWithStats { .unwrap_or(i64::MAX / 2) .try_into() .unwrap_or(i32::MAX); - score_self += cmp::max(0, 100i32.saturating_sub(last_seen_seconds)); + score_self += cmp::max(-700, 100i32.saturating_sub(last_seen_seconds)); + // Any failure to connect results in a score of '0' points if self.last_failed_reason.is_some() { - score_self -= 100; + score_self = 0; } Some(score_self) @@ -445,13 +469,48 @@ mod test { #[test] fn test_calculate_quality_score() { - let address = "/ip4/123.0.0.123/tcp/8000".parse().unwrap(); - let mut address = MultiaddrWithStats::new(address, PeerAddressSource::Config); + let address_raw: Multiaddr = "/ip4/123.0.0.123/tcp/8000".parse().unwrap(); + let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config); assert_eq!(address.quality_score, None); + address.mark_last_seen_now(); - assert!(address.quality_score.unwrap() > 100); + assert!(address.quality_score.unwrap() >= 990); // 1000 with a margin of 10s (10) delayed last seen + + let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config); + address.update_latency(Duration::from_millis(1000)); + assert_eq!(address.get_averag_latency().unwrap(), Duration::from_millis(1000)); + assert!(address.quality_score.unwrap() >= 980); // 990 with a margin of 10s (10) delayed last seen + + let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config); + address.update_latency(Duration::from_millis(1500)); + address.update_latency(Duration::from_millis(2500)); + address.update_latency(Duration::from_millis(3500)); + assert_eq!(address.get_averag_latency().unwrap(), Duration::from_millis(2500)); + assert!(address.quality_score.unwrap() >= 965); // 975 with a margin of 10s (10) delayed last seen + + let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config); + address.update_latency(Duration::from_millis(3500)); + address.update_latency(Duration::from_millis(4500)); + address.update_latency(Duration::from_millis(5500)); + assert_eq!(address.get_averag_latency().unwrap(), Duration::from_millis(4500)); + assert!(address.quality_score.unwrap() >= 945); // 955 with a margin of 10s (10) delayed last seen + + let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config); + address.update_latency(Duration::from_millis(5500)); + address.update_latency(Duration::from_millis(6500)); + address.update_latency(Duration::from_millis(7500)); + assert_eq!(address.get_averag_latency().unwrap(), Duration::from_millis(6500)); + assert!(address.quality_score.unwrap() >= 925); // 935 with a margin of 10s (10) delayed last seen + + let mut address = MultiaddrWithStats::new(address_raw.clone(), PeerAddressSource::Config); + address.update_latency(Duration::from_millis(9000)); + address.update_latency(Duration::from_millis(10000)); + address.update_latency(Duration::from_millis(11000)); + assert_eq!(address.get_averag_latency().unwrap(), Duration::from_millis(10000)); + assert!(address.quality_score.unwrap() >= 890); // 900 with a margin of 10s (10) delayed last seen + address.mark_failed_connection_attempt("Testing".to_string()); - assert!(address.quality_score.unwrap() <= 100); + assert_eq!(address.quality_score.unwrap(), 0); let another_addr = "/ip4/1.0.0.1/tcp/8000".parse().unwrap(); let another_addr = MultiaddrWithStats::new(another_addr, PeerAddressSource::Config); diff --git a/comms/core/src/net_address/mutliaddresses_with_stats.rs b/comms/core/src/net_address/mutliaddresses_with_stats.rs index 5a2b01da82..12bfdeb8d4 100644 --- a/comms/core/src/net_address/mutliaddresses_with_stats.rs +++ b/comms/core/src/net_address/mutliaddresses_with_stats.rs @@ -378,11 +378,6 @@ mod test { &PeerAddressSource::Config, ); net_addresses.merge(&address_12); - println!("net_address12: {:?}", address_12.addresses[0]); - println!( - "net_address1: {:?}", - net_addresses.find_address_mut(&net_address1).unwrap() - ); assert!(net_addresses.contains(&net_address1)); assert!(!net_addresses.contains(&net_address12)); } diff --git a/comms/dht/src/network_discovery/initializing.rs b/comms/dht/src/network_discovery/initializing.rs index 056edd9e2a..12726ff8f8 100644 --- a/comms/dht/src/network_discovery/initializing.rs +++ b/comms/dht/src/network_discovery/initializing.rs @@ -32,15 +32,11 @@ const LOG_TARGET: &str = "comms::dht::network_discovery"; #[derive(Debug)] pub(super) struct Initializing<'a> { context: &'a mut NetworkDiscoveryContext, - initial_peer_sync_delay: Option, } impl<'a> Initializing<'a> { - pub fn new(context: &'a mut NetworkDiscoveryContext, initial_peer_sync_delay: Option) -> Self { - Self { - context, - initial_peer_sync_delay, - } + pub fn new(context: &'a mut NetworkDiscoveryContext) -> Self { + Self { context } } pub async fn next_event(&mut self) -> StateEvent { @@ -59,7 +55,7 @@ impl<'a> Initializing<'a> { // Initial discovery and refresh sync peers delay period, when a configured connection needs preference, // usually needed for the wallet to connect to its own base node first. - if let Some(delay) = self.initial_peer_sync_delay { + if let Some(delay) = self.context.config.network_discovery.initial_peer_sync_delay { tokio::time::sleep(delay).await; debug!(target: LOG_TARGET, "Discovery starting after delayed for {:.0?}", delay); } diff --git a/comms/dht/src/network_discovery/state_machine.rs b/comms/dht/src/network_discovery/state_machine.rs index e41d1c2018..78281d56b9 100644 --- a/comms/dht/src/network_discovery/state_machine.rs +++ b/comms/dht/src/network_discovery/state_machine.rs @@ -196,12 +196,7 @@ impl DhtNetworkDiscovery { async fn get_next_event(&mut self, state: &mut State) -> StateEvent { use State::{Discovering, Initializing, OnConnect, Ready, Waiting}; match state { - Initializing => { - let initial_peer_sync_delay = self.config().network_discovery.initial_peer_sync_delay; - self::Initializing::new(&mut self.context, initial_peer_sync_delay) - .next_event() - .await - }, + Initializing => self::Initializing::new(&mut self.context).next_event().await, Ready(ready) => ready.next_event().await, Discovering(discovering) => discovering.next_event().await, OnConnect(on_connect) => on_connect.next_event().await, diff --git a/comms/dht/src/peer_validator.rs b/comms/dht/src/peer_validator.rs index 770915a578..375a5cbe8d 100644 --- a/comms/dht/src/peer_validator.rs +++ b/comms/dht/src/peer_validator.rs @@ -79,11 +79,11 @@ impl<'a> PeerValidator<'a> { if let Some(existing) = &existing_peer { if existing.public_key != new_peer.public_key { return Err(DhtPeerValidatorError::NewAndExistingMismatch { - existing: format!("'{}' / '{}'", existing.node_id.to_hex(), existing.public_key.to_hex()), + existing: format!("BUG: '{}' / '{}'", existing.node_id, existing.public_key), new: format!( - "'{}' / '{}'", + "BUG: '{}' / '{}'", NodeId::from_public_key(&new_peer.public_key), - new_peer.public_key.to_hex() + new_peer.public_key ), }); } @@ -92,12 +92,11 @@ impl<'a> PeerValidator<'a> { let most_recent_claim = find_most_recent_claim(&new_peer.claims).expect("new_peer.claims is not empty"); let node_id = NodeId::from_public_key(&new_peer.public_key); - let node_id_hex = node_id.to_hex(); let mut peer = existing_peer.unwrap_or_else(|| { Peer::new( new_peer.public_key.clone(), - node_id, + node_id.clone(), MultiaddressesWithStats::default(), PeerFlags::default(), most_recent_claim.features, @@ -118,7 +117,7 @@ impl<'a> PeerValidator<'a> { trace!( target: LOG_TARGET, "Peer '{}' / '{}' added with address(es) from claim: {:?}", - node_id_hex, + node_id, new_peer.public_key.to_hex(), claim.addresses ); diff --git a/comms/dht/src/rpc/service.rs b/comms/dht/src/rpc/service.rs index 380d15e1da..e0ac04f21d 100644 --- a/comms/dht/src/rpc/service.rs +++ b/comms/dht/src/rpc/service.rs @@ -29,7 +29,7 @@ use tari_comms::{ utils, PeerManager, }; -use tari_utilities::{hex::Hex, ByteArray}; +use tari_utilities::ByteArray; use tokio::{sync::mpsc, task}; use crate::{ @@ -68,27 +68,9 @@ impl DhtRpcServiceImpl { let iter = peers .into_iter() .filter_map(|peer| { - let mut peer_info = + let peer_info = UnvalidatedPeerInfo::from_peer_limited_claims(peer, max_claims, max_addresses_per_claim); - // Filter out all identity claims with invalid signatures - let count = peer_info.claims.len(); - let peer_public_key = peer_info.public_key.clone(); - peer_info.claims.retain(|claim| { - claim - .signature - .is_valid(&peer_public_key, claim.features, claim.addresses.as_slice()) - }); - if count != peer_info.claims.len() { - warn!( - target: LOG_TARGET, - "Peer `{}` provided {} claims but only {} were valid", - peer_info.public_key.to_hex(), - count, - peer_info.claims.len() - ); - } - if peer_info.claims.is_empty() { None } else { From 088fb49cc7d96d634ae46525a96c57e076abc29b Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Fri, 19 Apr 2024 15:51:53 +0200 Subject: [PATCH 4/5] fix unit test --- comms/core/src/net_address/mutliaddresses_with_stats.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/comms/core/src/net_address/mutliaddresses_with_stats.rs b/comms/core/src/net_address/mutliaddresses_with_stats.rs index 12bfdeb8d4..8b9627e00c 100644 --- a/comms/core/src/net_address/mutliaddresses_with_stats.rs +++ b/comms/core/src/net_address/mutliaddresses_with_stats.rs @@ -371,7 +371,7 @@ mod test { .mark_last_attempted_now(); assert_eq!( net_addresses.find_address_mut(&net_address1).unwrap().quality_score(), - Some(200) + Some(1000) ); let address_12: MultiaddressesWithStats = MultiaddressesWithStats::from_addresses_with_source( vec![net_address12.clone()], From 1ffbf97865c73b21182f2d6c9addd5d1985188e8 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Fri, 19 Apr 2024 16:19:47 +0200 Subject: [PATCH 5/5] fix unit test --- base_layer/p2p/tests/services/liveness.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/base_layer/p2p/tests/services/liveness.rs b/base_layer/p2p/tests/services/liveness.rs index 7a88eca350..432b3ae68b 100644 --- a/base_layer/p2p/tests/services/liveness.rs +++ b/base_layer/p2p/tests/services/liveness.rs @@ -54,6 +54,7 @@ pub async fn setup_liveness_service( let handles = StackBuilder::new(comms.shutdown_signal()) .add_initializer(RegisterHandle::new(dht.clone())) .add_initializer(RegisterHandle::new(comms.connectivity())) + .add_initializer(RegisterHandle::new(comms.peer_manager())) .add_initializer(LivenessInitializer::new( Default::default(), Arc::clone(&subscription_factory),