From 39604cc5b4fd2a0aa1e5a789c3375e7e5f287dde Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Mon, 22 Apr 2024 12:54:02 +0200 Subject: [PATCH] Limit wallet peer connections Added functionality to limit the number of base node peer connections that a wallet can have, based on a config setting. The furtherest nodes will be disconnected, but nodes on the allow list (e.g. connected base node) will be ignored. --- .../src/grpc/wallet_grpc_server.rs | 2 +- .../src/ui/state/wallet_event_monitor.rs | 2 +- .../src/commands/command/dial_peer.rs | 2 +- base_layer/chat_ffi/src/byte_vector.rs | 2 +- .../contacts/src/contacts_service/service.rs | 2 +- .../base_node/sync/block_sync/synchronizer.rs | 2 +- .../sync/header_sync/synchronizer.rs | 2 +- .../sync/horizon_state_sync/synchronizer.rs | 2 +- .../src/chain_storage/blockchain_database.rs | 2 +- .../core/src/chain_storage/lmdb_db/lmdb_db.rs | 4 +- .../priority/prioritized_transaction.rs | 2 +- .../core/src/transactions/crypto_factories.rs | 2 +- .../transaction_components/encrypted_data.rs | 2 +- .../transaction_protocol/sender.rs | 2 +- base_layer/core/tests/tests/node_service.rs | 2 +- base_layer/mmr/src/backend.rs | 2 +- .../mmr/src/sparse_merkle_tree/proofs.rs | 2 +- base_layer/p2p/src/initialization.rs | 5 + .../p2p/src/services/liveness/service.rs | 3 +- base_layer/p2p/src/transport.rs | 2 +- .../src/connectivity_service/service.rs | 5 +- .../wallet/src/connectivity_service/test.rs | 3 +- .../src/output_manager_service/service.rs | 4 +- .../utxo_scanner_service/utxo_scanner_task.rs | 5 +- .../transaction_service_tests/service.rs | 8 +- base_layer/wallet_ffi/src/lib.rs | 37 ++- common/config/presets/c_base_node_c.toml | 6 +- common/config/presets/d_console_wallet.toml | 16 +- comms/core/examples/stress/service.rs | 2 +- comms/core/src/builder/mod.rs | 14 + comms/core/src/builder/tests.rs | 4 +- comms/core/src/connection_manager/dialer.rs | 31 +- comms/core/src/connection_manager/error.rs | 4 +- comms/core/src/connection_manager/listener.rs | 1 + comms/core/src/connection_manager/manager.rs | 18 +- .../src/connection_manager/peer_connection.rs | 39 ++- .../core/src/connection_manager/requester.rs | 16 +- .../tests/listener_dialer.rs | 7 +- .../src/connection_manager/tests/manager.rs | 18 +- comms/core/src/connectivity/config.rs | 4 + .../core/src/connectivity/connection_pool.rs | 19 +- comms/core/src/connectivity/manager.rs | 117 ++++++-- comms/core/src/connectivity/requester.rs | 21 +- comms/core/src/connectivity/test.rs | 21 +- comms/core/src/lib.rs | 7 + .../src/net_address/multiaddr_with_stats.rs | 1 + .../net_address/mutliaddresses_with_stats.rs | 6 + comms/core/src/peer_manager/peer.rs | 5 + comms/core/src/peer_manager/peer_query.rs | 20 +- comms/core/src/protocol/messaging/outbound.rs | 9 +- comms/core/src/protocol/messaging/protocol.rs | 3 + comms/core/src/protocol/rpc/client/tests.rs | 7 +- comms/core/src/protocol/rpc/context.rs | 13 +- comms/core/src/protocol/rpc/server/mock.rs | 2 +- .../protocol/rpc/test/comms_integration.rs | 2 +- .../test_utils/mocks/connection_manager.rs | 4 +- .../test_utils/mocks/connectivity_manager.rs | 2 +- .../src/test_utils/mocks/peer_connection.rs | 5 +- comms/core/tests/tests/rpc.rs | 2 +- comms/core/tests/tests/rpc_stress.rs | 2 +- comms/core/tests/tests/substream_stress.rs | 2 +- comms/dht/examples/memory_net/utilities.rs | 9 +- comms/dht/src/actor.rs | 4 +- comms/dht/src/config.rs | 4 + comms/dht/src/connectivity/mod.rs | 284 +++++++++++++----- comms/dht/src/connectivity/test.rs | 10 +- .../dht/src/network_discovery/discovering.rs | 8 +- comms/dht/src/network_discovery/ready.rs | 34 ++- .../src/network_discovery/state_machine.rs | 13 +- comms/dht/src/store_forward/service.rs | 2 +- comms/dht/tests/dht.rs | 16 +- 71 files changed, 694 insertions(+), 248 deletions(-) diff --git a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs index f74f7936642..563bb004e78 100644 --- a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -160,7 +160,7 @@ impl WalletGrpcServer { fn get_consensus_constants(&self) -> Result<&ConsensusConstants, WalletStorageError> { // If we don't have the chain metadata, we hope that VNReg consensus constants did not change - worst case, we - // spend more than we need to or the the transaction is rejected. + // spend more than we need to or the transaction is rejected. let height = self .wallet .db diff --git a/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs index 768d9c3b17a..91cdca75a82 100644 --- a/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -200,7 +200,7 @@ impl WalletEventMonitor { ); match msg { ConnectivityEvent::PeerConnected(_) | - ConnectivityEvent::PeerDisconnected(_) => { + ConnectivityEvent::PeerDisconnected(..) => { self.trigger_peer_state_refresh().await; }, // Only the above variants trigger state refresh diff --git a/applications/minotari_node/src/commands/command/dial_peer.rs b/applications/minotari_node/src/commands/command/dial_peer.rs index a37feb0bf8b..699cd3f53f9 100644 --- a/applications/minotari_node/src/commands/command/dial_peer.rs +++ b/applications/minotari_node/src/commands/command/dial_peer.rs @@ -53,7 +53,7 @@ impl CommandContext { let start = Instant::now(); println!("☎️ Dialing peer..."); - match connectivity.dial_peer(dest_node_id).await { + match connectivity.dial_peer(dest_node_id, false).await { Ok(connection) => { println!("⚡️ Peer connected in {}ms!", start.elapsed().as_millis()); println!("Connection: {}", connection); diff --git a/base_layer/chat_ffi/src/byte_vector.rs b/base_layer/chat_ffi/src/byte_vector.rs index 233840c66d2..cc666adbb58 100644 --- a/base_layer/chat_ffi/src/byte_vector.rs +++ b/base_layer/chat_ffi/src/byte_vector.rs @@ -100,7 +100,7 @@ pub unsafe extern "C" fn chat_byte_vector_destroy(bytes: *mut ChatByteVector) { /// /// # Safety /// None -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn chat_byte_vector_get_at( diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index c09e13fa322..47acbbf4ea2 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -564,7 +564,7 @@ where T: ContactsBackend + 'static fn handle_connectivity_event(&mut self, event: ConnectivityEvent) { use ConnectivityEvent::{PeerBanned, PeerDisconnected}; match event { - PeerDisconnected(node_id) | PeerBanned(node_id) => { + PeerDisconnected(node_id, _) | PeerBanned(node_id) => { if let Some(pos) = self.liveness_data.iter().position(|p| *p.node_id() == node_id) { debug!( target: LOG_TARGET, diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index 796337cffa5..98941b0b316 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -217,7 +217,7 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> { } async fn connect_to_sync_peer(&self, peer: NodeId) -> Result { - let connection = self.connectivity.dial_peer(peer).await?; + let connection = self.connectivity.dial_peer(peer, false).await?; Ok(connection) } diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index 12514a63bfa..826b4799470 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -230,7 +230,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { async fn dial_sync_peer(&self, node_id: &NodeId) -> Result { let timer = Instant::now(); debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id); - let conn = self.connectivity.dial_peer(node_id.clone()).await?; + let conn = self.connectivity.dial_peer(node_id.clone(), false).await?; info!( target: LOG_TARGET, "Successfully dialed sync peer {} in {:.2?}", diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs index 6e126426394..a780364f15a 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs @@ -276,7 +276,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { async fn dial_sync_peer(&self, node_id: &NodeId) -> Result { let timer = Instant::now(); debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id); - let conn = self.connectivity.dial_peer(node_id.clone()).await?; + let conn = self.connectivity.dial_peer(node_id.clone(), false).await?; info!( target: LOG_TARGET, "Successfully dialed sync peer {} in {:.2?}", diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 3439e072d1b..f869ce5a771 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -2407,7 +2407,7 @@ fn get_previous_timestamps( Ok(timestamps) } -/// Gets all blocks ordered from the the block that connects (via prev_hash) to the main chain, to the orphan tip. +/// Gets all blocks ordered from the block that connects (via prev_hash) to the main chain, to the orphan tip. #[allow(clippy::ptr_arg)] fn get_orphan_link_main_chain( db: &T, diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index 6f8e4bd1596..0db74da2fed 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -2530,9 +2530,9 @@ impl BlockchainBackend for LMDBDatabase { } trace!( target: LOG_TARGET, - "Finished calculating new smt (size: {}), took: #{}s", + "Finished calculating new smt (size: {}), took: {:.2?}", smt.size(), - start.elapsed().as_millis() + start.elapsed() ); Ok(smt) } diff --git a/base_layer/core/src/mempool/priority/prioritized_transaction.rs b/base_layer/core/src/mempool/priority/prioritized_transaction.rs index 0c78db88b73..e3fb1664eb5 100644 --- a/base_layer/core/src/mempool/priority/prioritized_transaction.rs +++ b/base_layer/core/src/mempool/priority/prioritized_transaction.rs @@ -35,7 +35,7 @@ use crate::transactions::{ }; /// Create a unique unspent transaction priority based on the transaction fee, maturity of the oldest input UTXO and the -/// excess_sig. The excess_sig is included to ensure the the priority key unique so it can be used with a BTreeMap. +/// excess_sig. The excess_sig is included to ensure the priority key unique so it can be used with a BTreeMap. /// Normally, duplicate keys will be overwritten in a BTreeMap. #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)] pub struct FeePriority(Vec); diff --git a/base_layer/core/src/transactions/crypto_factories.rs b/base_layer/core/src/transactions/crypto_factories.rs index c2cc0f98411..7b1e560f508 100644 --- a/base_layer/core/src/transactions/crypto_factories.rs +++ b/base_layer/core/src/transactions/crypto_factories.rs @@ -31,7 +31,7 @@ impl CryptoFactories { /// /// ## Parameters /// - /// * `max_proof_range`: Sets the the maximum value in range proofs, where `max = 2^max_proof_range` + /// * `max_proof_range`: Sets the maximum value in range proofs, where `max = 2^max_proof_range` pub fn new(max_proof_range: usize) -> Self { Self { commitment: Arc::new(CommitmentFactory::default()), diff --git a/base_layer/core/src/transactions/transaction_components/encrypted_data.rs b/base_layer/core/src/transactions/transaction_components/encrypted_data.rs index 273fd14740e..f3df4a9ef29 100644 --- a/base_layer/core/src/transactions/transaction_components/encrypted_data.rs +++ b/base_layer/core/src/transactions/transaction_components/encrypted_data.rs @@ -23,7 +23,7 @@ // Portions of this file were originally copyrighted (c) 2018 The Grin Developers, issued under the Apache License, // Version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0. -//! Encrypted data using the the extended-nonce variant XChaCha20-Poly1305 encryption with secure random nonce. +//! Encrypted data using the extended-nonce variant XChaCha20-Poly1305 encryption with secure random nonce. use std::mem::size_of; diff --git a/base_layer/core/src/transactions/transaction_protocol/sender.rs b/base_layer/core/src/transactions/transaction_protocol/sender.rs index 2e6ae882949..01a15de0435 100644 --- a/base_layer/core/src/transactions/transaction_protocol/sender.rs +++ b/base_layer/core/src/transactions/transaction_protocol/sender.rs @@ -472,7 +472,7 @@ impl SenderTransactionProtocol { Ok((public_nonce, public_excess)) } - /// Add partial signatures, add the the recipient info to sender state and move to the Finalizing state + /// Add partial signatures, add the recipient info to sender state and move to the Finalizing state pub async fn add_single_recipient_info( &mut self, mut rec: RecipientSignedMessage, diff --git a/base_layer/core/tests/tests/node_service.rs b/base_layer/core/tests/tests/node_service.rs index cc36517d531..73a9557f373 100644 --- a/base_layer/core/tests/tests/node_service.rs +++ b/base_layer/core/tests/tests/node_service.rs @@ -420,7 +420,7 @@ async fn propagate_and_forward_invalid_block() { alice_node .comms .connectivity() - .dial_peer(bob_node.node_identity.node_id().clone()) + .dial_peer(bob_node.node_identity.node_id().clone(), false) .await .unwrap(); wait_until_online(&[&alice_node, &bob_node, &carol_node, &dan_node]).await; diff --git a/base_layer/mmr/src/backend.rs b/base_layer/mmr/src/backend.rs index 69235daf012..fe5943420b6 100644 --- a/base_layer/mmr/src/backend.rs +++ b/base_layer/mmr/src/backend.rs @@ -41,7 +41,7 @@ pub trait ArrayLike { /// Return the item at the given index fn get(&self, index: usize) -> Result, Self::Error>; - /// Remove all stored items from the the backend. + /// Remove all stored items from the backend. fn clear(&mut self) -> Result<(), Self::Error>; /// Finds the index of the specified stored item, it will return None if the object could not be found. diff --git a/base_layer/mmr/src/sparse_merkle_tree/proofs.rs b/base_layer/mmr/src/sparse_merkle_tree/proofs.rs index cf7b3405fe5..0cd10a9c04b 100644 --- a/base_layer/mmr/src/sparse_merkle_tree/proofs.rs +++ b/base_layer/mmr/src/sparse_merkle_tree/proofs.rs @@ -98,7 +98,7 @@ pub struct InclusionProof { /// ``` pub struct ExclusionProof { siblings: Vec, - // The terminal node of the tree proof, or `None` if the the node is `Empty`. + // The terminal node of the tree proof, or `None` if the node is `Empty`. leaf: Option>, phantom: std::marker::PhantomData, } diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index 5a67de59ae8..75465d69d68 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -562,6 +562,11 @@ impl ServiceInitializer for P2pInitializer { network_byte: self.network.as_byte(), user_agent: self.user_agent.clone(), }) + .with_minimize_connections(if self.config.dht.minimize_connections { + Some(self.config.dht.num_neighbouring_nodes + self.config.dht.num_random_nodes) + } else { + None + }) .set_self_liveness_check(config.listener_self_liveness_check_interval); if config.allow_test_addresses || config.dht.peer_validator_config.allow_test_addresses { diff --git a/base_layer/p2p/src/services/liveness/service.rs b/base_layer/p2p/src/services/liveness/service.rs index dc4797b60ae..033b0d0bb09 100644 --- a/base_layer/p2p/src/services/liveness/service.rs +++ b/base_layer/p2p/src/services/liveness/service.rs @@ -28,6 +28,7 @@ use tari_comms::{ connectivity::{ConnectivityRequester, ConnectivitySelection}, peer_manager::NodeId, types::CommsPublicKey, + Minimized, PeerManager, }; use tari_comms_dht::{ @@ -360,7 +361,7 @@ where target: LOG_TARGET, "Disconnecting peer {} that failed {} rounds of pings", node_id, max_allowed_ping_failures ); - conn.disconnect().await?; + conn.disconnect(Minimized::No).await?; } } self.state.clear_failed_pings(); diff --git a/base_layer/p2p/src/transport.rs b/base_layer/p2p/src/transport.rs index a220fa9d0eb..939a96329e9 100644 --- a/base_layer/p2p/src/transport.rs +++ b/base_layer/p2p/src/transport.rs @@ -147,7 +147,7 @@ pub struct TorTransportConfig { /// When set to true, outbound TCP connections bypass the tor proxy. Defaults to false for better privacy, setting /// to true may improve network performance for TCP nodes. pub proxy_bypass_for_outbound_tcp: bool, - /// If set, instructs tor to forward traffic the the provided address. Otherwise, an OS-assigned port on 127.0.0.1 + /// If set, instructs tor to forward traffic the provided address. Otherwise, an OS-assigned port on 127.0.0.1 /// is used. pub forward_address: Option, /// If set, the listener will bind to this address instead of the forward_address. diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index bb845a53f95..1b740194b83 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -27,6 +27,7 @@ use tari_comms::{ connectivity::{ConnectivityError, ConnectivityRequester}, peer_manager::{NodeId, Peer}, protocol::rpc::{RpcClientLease, RpcClientPool}, + Minimized, PeerConnection, }; use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; @@ -225,7 +226,7 @@ impl WalletConnectivityService { async fn disconnect_base_node(&mut self, node_id: NodeId) { if let Ok(Some(mut connection)) = self.connectivity.get_connection(node_id.clone()).await { - match connection.disconnect().await { + match connection.disconnect(Minimized::No).await { Ok(_) => debug!(target: LOG_TARGET, "Disconnected base node peer {}", node_id), Err(e) => error!(target: LOG_TARGET, "Failed to disconnect base node: {}", e), } @@ -321,7 +322,7 @@ impl WalletConnectivityService { _ = self.base_node_watch.changed() => { Ok(None) } - result = self.connectivity.dial_peer(peer) => { + result = self.connectivity.dial_peer(peer, false) => { Ok(Some(result?)) } } diff --git a/base_layer/wallet/src/connectivity_service/test.rs b/base_layer/wallet/src/connectivity_service/test.rs index ed4c0218354..a0da32e40dd 100644 --- a/base_layer/wallet/src/connectivity_service/test.rs +++ b/base_layer/wallet/src/connectivity_service/test.rs @@ -34,6 +34,7 @@ use tari_comms::{ mocks::{create_connectivity_mock, ConnectivityManagerMockState}, node_identity::build_node_identity, }, + Minimized, }; use tari_shutdown::Shutdown; use tari_test_utils::runtime::spawn_until_shutdown; @@ -177,7 +178,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() { mock_state.add_active_connection(conn.clone()).await; // Empty out all the calls let _result = mock_state.take_calls().await; - conn.disconnect().await.unwrap(); + conn.disconnect(Minimized::No).await.unwrap(); let barrier = Arc::new(Barrier::new(2)); let pending_request = task::spawn({ diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 98ec0f2e206..a24deb7df62 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -1293,7 +1293,7 @@ where let uo_len = uo.len(); trace!( target: LOG_TARGET, - "select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {})", + "select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {} ms)", uo_len, start_new.elapsed().as_millis(), start.elapsed().as_millis(), @@ -1362,7 +1362,7 @@ where let enough_spendable = utxos_total_value > amount + fee_with_change; trace!( target: LOG_TARGET, - "select_utxos profile - final_selection: {} outputs from {}, {} ms (at {})", + "select_utxos profile - final_selection: {} outputs from {}, {} ms (at {} ms)", utxos.len(), uo_len, start_new.elapsed().as_millis(), diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index ebd3e6f5442..69c829d913b 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -38,6 +38,7 @@ use tari_comms::{ protocol::rpc::RpcClientLease, traits::OrOptional, types::CommsPublicKey, + Minimized, PeerConnection, }; use tari_core::{ @@ -182,7 +183,7 @@ where target: LOG_TARGET, "Attempting UTXO sync with seed peer {} ({})", self.peer_index, peer, ); - match self.resources.comms_connectivity.dial_peer(peer.clone()).await { + match self.resources.comms_connectivity.dial_peer(peer.clone(), true).await { Ok(conn) => Ok(conn), Err(e) => { self.publish_event(UtxoScannerEvent::ConnectionFailedToBaseNode { @@ -193,7 +194,7 @@ where }); if let Ok(Some(connection)) = self.resources.comms_connectivity.get_connection(peer.clone()).await { - if connection.clone().disconnect().await.is_ok() { + if connection.clone().disconnect(Minimized::No).await.is_ok() { debug!(target: LOG_TARGET, "Disconnected base node peer {}", peer); } } diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index 5a258728946..1a8d9765e84 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -578,7 +578,7 @@ async fn manage_single_transaction() { let _peer_connection = bob_comms .connectivity() - .dial_peer(alice_node_identity.node_id().clone()) + .dial_peer(alice_node_identity.node_id().clone(), false) .await .unwrap(); @@ -742,7 +742,7 @@ async fn large_interactive_transaction() { // Verify that Alice and Bob are connected let _peer_connection = bob_comms .connectivity() - .dial_peer(alice_node_identity.node_id().clone()) + .dial_peer(alice_node_identity.node_id().clone(), false) .await .unwrap(); @@ -2092,7 +2092,7 @@ async fn manage_multiple_transactions() { let _peer_connection = bob_comms .connectivity() - .dial_peer(alice_node_identity.node_id().clone()) + .dial_peer(alice_node_identity.node_id().clone(), false) .await .unwrap(); sleep(Duration::from_secs(3)).await; @@ -2100,7 +2100,7 @@ async fn manage_multiple_transactions() { // Connect alice to carol let _peer_connection = alice_comms .connectivity() - .dial_peer(carol_node_identity.node_id().clone()) + .dial_peer(carol_node_identity.node_id().clone(), false) .await .unwrap(); diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index c8f79f34520..70a1d924d30 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -27,7 +27,7 @@ //! becoming a `CompletedTransaction` with the `Completed` status. This means that the transaction has been //! negotiated between the parties and is now ready to be broadcast to the Base Layer. The funds are still encumbered //! as pending because the transaction has not been mined yet. -//! 3. The finalized `CompletedTransaction` will be sent back to the the receiver so that they have a copy. +//! 3. The finalized `CompletedTransaction` will be sent back to the receiver so that they have a copy. //! 4. The wallet will broadcast the `CompletedTransaction` to a Base Node to be added to the mempool. Its status will //! move from `Completed` to `Broadcast`. //! 5. Wait until the transaction is mined. The `CompleteTransaction` status will then move from `Broadcast` to `Mined` @@ -131,7 +131,13 @@ use tari_comms::{ transports::MemoryTransport, types::CommsPublicKey, }; -use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig, NetworkDiscoveryConfig}; +use tari_comms_dht::{ + store_forward::SafConfig, + DbConnectionUrl, + DhtConfig, + DhtConnectivityConfig, + NetworkDiscoveryConfig, +}; use tari_contacts::contacts_service::{handle::ContactsServiceHandle, types::Contact}; use tari_core::{ borsh::FromBytes, @@ -818,7 +824,7 @@ pub unsafe extern "C" fn byte_vector_destroy(bytes: *mut ByteVector) { /// /// # Safety /// None -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn byte_vector_get_at(ptr: *mut ByteVector, position: c_uint, error_out: *mut c_int) -> c_uchar { @@ -1778,7 +1784,7 @@ pub unsafe extern "C" fn unblinded_outputs_get_length( /// /// # Safety /// The ```contact_destroy``` method must be called when finished with a TariContact to prevent a memory leak -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn unblinded_outputs_get_at( @@ -2884,7 +2890,7 @@ pub unsafe extern "C" fn contacts_get_length(contacts: *mut TariContacts, error_ /// /// # Safety /// The ```contact_destroy``` method must be called when finished with a TariContact to prevent a memory leak -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn contacts_get_at( @@ -3185,7 +3191,7 @@ pub unsafe extern "C" fn completed_transactions_get_length( /// # Safety /// The ```completed_transaction_destroy``` method must be called when finished with a TariCompletedTransaction to /// prevent a memory leak -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn completed_transactions_get_at( @@ -3278,7 +3284,7 @@ pub unsafe extern "C" fn pending_outbound_transactions_get_length( /// # Safety /// The ```pending_outbound_transaction_destroy``` method must be called when finished with a /// TariPendingOutboundTransaction to prevent a memory leak -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn pending_outbound_transactions_get_at( @@ -3370,7 +3376,7 @@ pub unsafe extern "C" fn pending_inbound_transactions_get_length( /// # Safety /// The ```pending_inbound_transaction_destroy``` method must be called when finished with a /// TariPendingOutboundTransaction to prevent a memory leak -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn pending_inbound_transactions_get_at( @@ -4851,6 +4857,9 @@ pub unsafe extern "C" fn comms_config_create( max_concurrent_inbound_tasks: 25, max_concurrent_outbound_tasks: 50, dht: DhtConfig { + num_neighbouring_nodes: 6, + num_random_nodes: 2, + minimize_connections: true, discovery_request_timeout: Duration::from_secs(discovery_timeout_in_secs), database_url: DbConnectionUrl::File(dht_database_path), auto_join: true, @@ -4861,9 +4870,15 @@ pub unsafe extern "C" fn comms_config_create( ..Default::default() }, network_discovery: NetworkDiscoveryConfig { + min_desired_peers: 16, initial_peer_sync_delay: Some(Duration::from_secs(25)), ..Default::default() }, + connectivity: DhtConnectivityConfig { + update_interval: Duration::from_secs(5 * 60), + minimum_desired_tcpv4_node_ratio: 0.0, + ..Default::default() + }, ..Default::default() }, allow_test_addresses: true, @@ -11541,7 +11556,7 @@ mod test { .block_on( alice_wallet_comms .connectivity() - .dial_peer(bob_node_identity.node_id().clone()), + .dial_peer(bob_node_identity.node_id().clone(), false), ) .is_ok(); } @@ -11550,7 +11565,7 @@ mod test { .block_on( bob_wallet_comms .connectivity() - .dial_peer(alice_node_identity.node_id().clone()), + .dial_peer(alice_node_identity.node_id().clone(), false), ) .is_ok(); } @@ -11619,7 +11634,7 @@ mod test { let bob_comms_dial_peer = bob_wallet_runtime.block_on( bob_wallet_comms .connectivity() - .dial_peer(alice_node_identity.node_id().clone()), + .dial_peer(alice_node_identity.node_id().clone(), false), ); if let Ok(mut connection_to_alice) = bob_comms_dial_peer { if bob_wallet_runtime diff --git a/common/config/presets/c_base_node_c.toml b/common/config/presets/c_base_node_c.toml index c086b27e990..c125d4e4cd5 100644 --- a/common/config/presets/c_base_node_c.toml +++ b/common/config/presets/c_base_node_c.toml @@ -189,7 +189,7 @@ listener_self_liveness_check_interval = 15 # When using the tor transport and set to true, outbound TCP connections bypass the tor proxy. Defaults to false for # better privacy #tor.proxy_bypass_for_outbound_tcp = false -# If set, instructs tor to forward traffic the the provided address. (e.g. "/dns4/my-base-node/tcp/32123") (default = OS-assigned port) +# If set, instructs tor to forward traffic the provided address. (e.g. "/dns4/my-base-node/tcp/32123") (default = OS-assigned port) #tor.forward_address = # If set, the listener will bind to this address instead of the forward_address. You need to make sure that this listener is connectable from the forward_address. #tor.listener_address_override = @@ -213,7 +213,9 @@ database_url = "data/base_node/dht.db" # The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour. Default: 8 #num_neighbouring_nodes = 8 # Number of random peers to include. Default: 4 -#num_random_nodes= 4 +#num_random_nodes = 4 +# Connections above the configured number of neighbouring and random nodes will be removed (default: false) +#minimize_connections = false # Send to this many peers when using the broadcast strategy. Default: 8 #broadcast_factor = 8 # Send to this many peers when using the propagate strategy. Default: 4 diff --git a/common/config/presets/d_console_wallet.toml b/common/config/presets/d_console_wallet.toml index 7d076434bfc..6f2ae82e148 100644 --- a/common/config/presets/d_console_wallet.toml +++ b/common/config/presets/d_console_wallet.toml @@ -242,7 +242,7 @@ event_channel_size = 3500 # When using the tor transport and set to true, outbound TCP connections bypass the tor proxy. Defaults to false for # better privacy #tor.proxy_bypass_for_outbound_tcp = false -# If set, instructs tor to forward traffic the the provided address. (e.g. "/ip4/127.0.0.1/tcp/0") (default = ) +# If set, instructs tor to forward traffic the provided address. (e.g. "/ip4/127.0.0.1/tcp/0") (default = ) #tor.forward_address = # Use a SOCKS5 proxy transport. This transport recognises any addresses supported by the proxy. @@ -262,9 +262,11 @@ database_url = "data/wallet/dht.db" # The size of the buffer (channel) which holds pending outbound message requests. Default: 20 #outbound_buffer_size = 20 # The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour. Default: 8 -#num_neighbouring_nodes = 8 +num_neighbouring_nodes = 6 # Number of random peers to include. Default: 4 -#num_random_nodes= 4 +num_random_nodes = 2 +# Connections above the configured number of neighbouring and random nodes will be removed (default: false) +minimize_connections = true # Send to this many peers when using the broadcast strategy. Default: 8 #broadcast_factor = 8 # Send to this many peers when using the propagate strategy. Default: 4 @@ -311,7 +313,7 @@ database_url = "data/wallet/dht.db" #join_cooldown_interval = 120 # 10 * 60 # The interval to update the neighbouring and random pools, if necessary. Default: 2 minutes -#connectivity.update_interval = 120 # 2 * 60 +connectivity.update_interval = 300 # 2 * 60 # The interval to change the random pool peers. Default = 2 hours #connectivity.random_pool_refresh_interval = 7_200 # 2 * 60 * 60 # Length of cooldown when high connection failure rates are encountered. Default: 45s @@ -319,13 +321,13 @@ database_url = "data/wallet/dht.db" # The minimum desired ratio of TCPv4 to Tor connections. TCPv4 addresses have some significant cost to create, # making sybil attacks costly. This setting does not guarantee this ratio is maintained. # Currently, it only emits a warning if the ratio is below this setting. Default: 0.1 (10%) -#connectivity.minimum_desired_tcpv4_node_ratio = 0.1 +connectivity.minimum_desired_tcpv4_node_ratio = 0.0 # True to enable network discovery, false to disable it. Default: true #network_discovery.enabled = true # A threshold for the minimum number of peers this node should ideally be aware of. If below this threshold a # more "aggressive" strategy is employed. Default: 50 -network_discovery.min_desired_peers = 8 +network_discovery.min_desired_peers = 16 # The period to wait once the number of rounds given by `idle_after_num_rounds` has completed. Default: 30 mins #network_discovery.idle_period = 1_800 # 30 * 60 # The minimum number of network discovery rounds to perform before idling (going to sleep). If there are less @@ -339,7 +341,7 @@ network_discovery.min_desired_peers = 8 # The maximum number of peers we allow per round of sync. (Default: 500) #network_discovery.max_peers_to_sync_per_round = 500 # Initial refresh sync peers delay period, when a configured connection needs preference. (Default: Disabled) -network_discovery.initial_peer_sync_delay = 15 +network_discovery.initial_peer_sync_delay = 25 # Length of time to ban a peer if the peer misbehaves at the DHT-level. Default: 6 hrs #ban_duration = 21_600 # 6 * 60 * 60 diff --git a/comms/core/examples/stress/service.rs b/comms/core/examples/stress/service.rs index 2199638f4b0..0f6f84c536c 100644 --- a/comms/core/examples/stress/service.rs +++ b/comms/core/examples/stress/service.rs @@ -227,7 +227,7 @@ impl StressTestService { self.comms_node.peer_manager().add_peer(peer).await?; println!("Dialing peer `{}`...", node_id.short_str()); let start = Instant::now(); - let conn = self.comms_node.connectivity().dial_peer(node_id).await?; + let conn = self.comms_node.connectivity().dial_peer(node_id, false).await?; println!("Dial completed successfully in {:.2?}", start.elapsed()); let outbound_tx = self.outbound_tx.clone(); let inbound_rx = self.inbound_rx.clone(); diff --git a/comms/core/src/builder/mod.rs b/comms/core/src/builder/mod.rs index 26f4cc503af..43b78874b02 100644 --- a/comms/core/src/builder/mod.rs +++ b/comms/core/src/builder/mod.rs @@ -70,6 +70,7 @@ use crate::{ /// # #[tokio::main] /// # async fn main() { /// use std::env::temp_dir; +/// use tari_comms::connectivity::ConnectivityConfig; /// /// use tari_storage::{ /// lmdb_store::{LMDBBuilder, LMDBConfig}, @@ -126,6 +127,7 @@ pub struct CommsBuilder { connection_manager_config: ConnectionManagerConfig, connectivity_config: ConnectivityConfig, shutdown_signal: Option, + maintain_n_closest_connections_only: Option, } impl Default for CommsBuilder { @@ -139,6 +141,7 @@ impl Default for CommsBuilder { connection_manager_config: ConnectionManagerConfig::default(), connectivity_config: ConnectivityConfig::default(), shutdown_signal: None, + maintain_n_closest_connections_only: None, } } } @@ -292,6 +295,17 @@ impl CommsBuilder { self } + /// The closest number of peer connections to maintain; connections above the threshold will be removed + pub fn with_minimize_connections(mut self, connections: Option) -> Self { + self.maintain_n_closest_connections_only = connections; + self.connectivity_config.maintain_n_closest_connections_only = connections; + if let Some(val) = connections { + self.connectivity_config.reaper_min_connection_threshold = val; + } + self.connectivity_config.connection_pool_refresh_interval = Duration::from_secs(180); + self + } + fn make_peer_manager(&mut self) -> Result, CommsBuilderError> { let file_lock = self.peer_storage_file_lock.take(); diff --git a/comms/core/src/builder/tests.rs b/comms/core/src/builder/tests.rs index 02626c75e74..de4f49a3d8d 100644 --- a/comms/core/src/builder/tests.rs +++ b/comms/core/src/builder/tests.rs @@ -166,7 +166,7 @@ async fn peer_to_peer_custom_protocols() { let mut conn_man_events2 = comms_node2.subscribe_connection_manager_events(); let mut conn1 = conn_man_requester1 - .dial_peer(node_identity2.node_id().clone()) + .dial_peer(node_identity2.node_id().clone(), false) .await .unwrap(); @@ -347,7 +347,7 @@ async fn peer_to_peer_messaging_simultaneous() { comms_node1 .connectivity() - .dial_peer(comms_node2.node_identity().node_id().clone()) + .dial_peer(comms_node2.node_identity().node_id().clone(), false) .await .unwrap(); // Simultaneously send messages between the two nodes diff --git a/comms/core/src/connection_manager/dialer.rs b/comms/core/src/connection_manager/dialer.rs index 8226eab9d5e..1f70e51b128 100644 --- a/comms/core/src/connection_manager/dialer.rs +++ b/comms/core/src/connection_manager/dialer.rs @@ -81,6 +81,7 @@ pub(crate) enum DialerRequest { Dial( Box, Option>>, + bool, // ignore_saf ), CancelPendingDial(NodeId), NotifyNewInboundConnection(Box), @@ -175,8 +176,8 @@ where use DialerRequest::{CancelPendingDial, Dial, NotifyNewInboundConnection}; debug!(target: LOG_TARGET, "Connection dialer got request: {:?}", request); match request { - Dial(peer, reply_tx) => { - self.handle_dial_peer_request(pending_dials, peer, reply_tx); + Dial(peer, reply_tx, ignore_saf) => { + self.handle_dial_peer_request(pending_dials, peer, reply_tx, ignore_saf); }, CancelPendingDial(peer_id) => { self.cancel_dial(&peer_id); @@ -317,6 +318,7 @@ where pending_dials: &mut DialFuturesUnordered, peer: Box, reply_tx: Option>>, + ignore_saf: bool, ) { if self.is_pending_dial(&peer.node_id) { debug!( @@ -377,6 +379,7 @@ where supported_protocols, &config, cancel_signal, + ignore_saf, ) .await; @@ -427,6 +430,7 @@ where our_supported_protocols: Arc>, config: &ConnectionManagerConfig, cancel_signal: ShutdownSignal, + ignore_saf: bool, ) -> Result<(PeerConnection, ValidatedPeerIdentityExchange), ConnectionManagerError> { static CONNECTION_DIRECTION: ConnectionDirection = ConnectionDirection::Outbound; debug!( @@ -478,6 +482,7 @@ where conn_man_notifier, our_supported_protocols, peer_identity.metadata.supported_protocols.clone(), + ignore_saf, ); Ok((peer_connection, peer_identity)) @@ -491,7 +496,11 @@ where config: &ConnectionManagerConfig, ) -> (DialState, DialResult) { // Container for dial - let mut dial_state = Some(dial_state); + let mut dial_state = { + let mut temp_state = dial_state; + temp_state.peer_mut().addresses.reset_connection_attempts(); + Some(temp_state) + }; let mut transport = Some(transport); loop { @@ -539,7 +548,7 @@ where } } - /// Attempts to dial a peer sequentially on all addresses. + /// Attempts to dial a peer sequentially on all addresses; if connections are to be minimized only. /// Returns ownership of the given `DialState` and a success or failure result for the dial, /// or None if the dial was cancelled inflight async fn dial_peer( @@ -552,6 +561,18 @@ where Result<(NoiseSocket, Multiaddr), ConnectionManagerError>, ) { let addresses = dial_state.peer().addresses.clone().into_vec(); + if addresses.is_empty() { + let node_id_hex = dial_state.peer().node_id.clone().to_hex(); + debug!( + target: LOG_TARGET, + "No more contactable addresses for peer '{}'", + node_id_hex + ); + return ( + dial_state, + Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)), + ); + } let cancel_signal = dial_state.get_cancel_signal(); for address in addresses { debug!( @@ -598,7 +619,7 @@ where let noise_upgrade_time = timer.elapsed(); debug!( - "Dial - upgraded noise: {} on address: {} on tcp after: {}", + "Dial - upgraded noise: {} on address: {} on tcp after: {} ms", node_id.short_str(), moved_address, timer.elapsed().as_millis() diff --git a/comms/core/src/connection_manager/error.rs b/comms/core/src/connection_manager/error.rs index 80cddff8275..7d2bb4af766 100644 --- a/comms/core/src/connection_manager/error.rs +++ b/comms/core/src/connection_manager/error.rs @@ -74,7 +74,7 @@ pub enum ConnectionManagerError { IdentityProtocolError(#[from] IdentityProtocolError), #[error("The dial was cancelled")] DialCancelled, - #[error("Invalid multiaddr: {0}")] + #[error("Invalid multiaddr")] InvalidMultiaddr(String), #[error("Failed to send wire format byte")] WireFormatSendFailed, @@ -82,6 +82,8 @@ pub enum ConnectionManagerError { ListenerOneshotCancelled, #[error("Peer validation error: {0}")] PeerValidationError(#[from] PeerValidatorError), + #[error("No contactable addresses for peer {0} left")] + NoContactableAddressesForPeer(String), } impl From for ConnectionManagerError { diff --git a/comms/core/src/connection_manager/listener.rs b/comms/core/src/connection_manager/listener.rs index 937b1d9f8df..6310111bb0c 100644 --- a/comms/core/src/connection_manager/listener.rs +++ b/comms/core/src/connection_manager/listener.rs @@ -415,6 +415,7 @@ where conn_man_notifier, our_supported_protocols, valid_peer_identity.metadata.supported_protocols, + false, ); peer_manager.add_peer(peer).await?; diff --git a/comms/core/src/connection_manager/manager.rs b/comms/core/src/connection_manager/manager.rs index 82b174b87e6..5937f312bcd 100644 --- a/comms/core/src/connection_manager/manager.rs +++ b/comms/core/src/connection_manager/manager.rs @@ -54,6 +54,7 @@ use crate::{ peer_validator::PeerValidatorConfig, protocol::{NodeNetworkInfo, ProtocolEvent, ProtocolId, Protocols}, transports::{TcpTransport, Transport}, + Minimized, PeerManager, }; @@ -67,7 +68,7 @@ const DIALER_REQUEST_CHANNEL_SIZE: usize = 32; pub enum ConnectionManagerEvent { // Peer connection PeerConnected(Box), - PeerDisconnected(ConnectionId, NodeId), + PeerDisconnected(ConnectionId, NodeId, Minimized), PeerConnectFailed(NodeId, ConnectionManagerError), PeerInboundConnectFailed(ConnectionManagerError), @@ -84,7 +85,9 @@ impl fmt::Display for ConnectionManagerEvent { use ConnectionManagerEvent::*; match self { PeerConnected(conn) => write!(f, "PeerConnected({})", conn), - PeerDisconnected(id, node_id) => write!(f, "PeerDisconnected({}, {})", id, node_id.short_str()), + PeerDisconnected(id, node_id, minimized) => { + write!(f, "PeerDisconnected({}, {}, {:?})", id, node_id.short_str(), minimized) + }, PeerConnectFailed(node_id, err) => write!(f, "PeerConnectFailed({}, {:?})", node_id.short_str(), err), PeerInboundConnectFailed(err) => write!(f, "PeerInboundConnectFailed({:?})", err), NewInboundSubstream(node_id, protocol, _) => write!( @@ -378,11 +381,15 @@ where use ConnectionManagerRequest::{CancelDial, DialPeer, NotifyListening}; trace!(target: LOG_TARGET, "Connection manager got request: {:?}", request); match request { - DialPeer { node_id, reply_tx } => { + DialPeer { + node_id, + reply_tx, + ignore_saf, + } => { let tracing_id = tracing::Span::current().id(); let span = span!(Level::TRACE, "connection_manager::handle_request"); span.follows_from(tracing_id); - self.dial_peer(node_id, reply_tx).instrument(span).await + self.dial_peer(node_id, reply_tx, ignore_saf).instrument(span).await }, CancelDial(node_id) => { if let Err(err) = self.dialer_tx.send(DialerRequest::CancelPendingDial(node_id)).await { @@ -493,10 +500,11 @@ where &mut self, node_id: NodeId, reply: Option>>, + ignore_saf: bool, ) { match self.peer_manager.find_by_node_id(&node_id).await { Ok(Some(peer)) => { - self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply)) + self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply, ignore_saf)) .await; }, Ok(None) => { diff --git a/comms/core/src/connection_manager/peer_connection.rs b/comms/core/src/connection_manager/peer_connection.rs index c8e86f2394d..915bdb25f58 100644 --- a/comms/core/src/connection_manager/peer_connection.rs +++ b/comms/core/src/connection_manager/peer_connection.rs @@ -58,6 +58,7 @@ use crate::{ peer_manager::{NodeId, PeerFeatures}, protocol::{ProtocolId, ProtocolNegotiation}, utils::atomic_ref_counter::AtomicRefCounter, + Minimized, }; const LOG_TARGET: &str = "comms::connection_manager::peer_connection"; @@ -75,6 +76,7 @@ pub fn create( event_notifier: mpsc::Sender, our_supported_protocols: Arc>, their_supported_protocols: Vec, + ignore_saf: bool, ) -> PeerConnection { trace!( target: LOG_TARGET, @@ -93,6 +95,7 @@ pub fn create( peer_addr, direction, substream_counter, + ignore_saf, ); let peer_actor = PeerConnectionActor::new( id, @@ -118,7 +121,7 @@ pub enum PeerConnectionRequest { reply_tx: oneshot::Sender, PeerConnectionError>>, }, /// Disconnect all substreams and close the transport connection - Disconnect(bool, oneshot::Sender>), + Disconnect(bool, oneshot::Sender>, Minimized), } /// ID type for peer connections @@ -136,6 +139,8 @@ pub struct PeerConnection { started_at: Instant, substream_counter: AtomicRefCounter, handle_counter: Arc<()>, + ignore_saf: bool, + on_allow_list: bool, } impl PeerConnection { @@ -147,6 +152,7 @@ impl PeerConnection { address: Multiaddr, direction: ConnectionDirection, substream_counter: AtomicRefCounter, + ignore_saf: bool, ) -> Self { Self { id, @@ -158,6 +164,8 @@ impl PeerConnection { started_at: Instant::now(), substream_counter, handle_counter: Arc::new(()), + ignore_saf, + on_allow_list: false, } } @@ -169,6 +177,14 @@ impl PeerConnection { self.peer_features } + pub fn set_peer_on_allow_list(&mut self, allowed: bool) { + self.on_allow_list = allowed + } + + pub fn get_peer_on_allow_list(&self) -> bool { + self.on_allow_list + } + pub fn direction(&self) -> ConnectionDirection { self.direction } @@ -185,6 +201,10 @@ impl PeerConnection { &self.address } + pub fn ignore_saf(&self) -> bool { + self.ignore_saf + } + pub fn id(&self) -> ConnectionId { self.id } @@ -276,20 +296,20 @@ impl PeerConnection { /// Immediately disconnects the peer connection. This can only fail if the peer connection worker /// is shut down (and the peer is already disconnected) - pub async fn disconnect(&mut self) -> Result<(), PeerConnectionError> { + pub async fn disconnect(&mut self, minimized: Minimized) -> Result<(), PeerConnectionError> { let (reply_tx, reply_rx) = oneshot::channel(); self.request_tx - .send(PeerConnectionRequest::Disconnect(false, reply_tx)) + .send(PeerConnectionRequest::Disconnect(false, reply_tx, minimized)) .await?; reply_rx .await .map_err(|_| PeerConnectionError::InternalReplyCancelled)? } - pub(crate) async fn disconnect_silent(&mut self) -> Result<(), PeerConnectionError> { + pub(crate) async fn disconnect_silent(&mut self, minimized: Minimized) -> Result<(), PeerConnectionError> { let (reply_tx, reply_rx) = oneshot::channel(); self.request_tx - .send(PeerConnectionRequest::Disconnect(true, reply_tx)) + .send(PeerConnectionRequest::Disconnect(true, reply_tx, minimized)) .await?; reply_rx .await @@ -388,7 +408,7 @@ impl PeerConnectionActor { } } - if let Err(err) = self.disconnect(false).await { + if let Err(err) = self.disconnect(false, Minimized::No).await { warn!( target: LOG_TARGET, "[{}] Failed to politely close connection to peer '{}' because '{}'", @@ -413,7 +433,7 @@ impl PeerConnectionActor { "Reply oneshot closed when sending reply", ); }, - Disconnect(silent, reply_tx) => { + Disconnect(silent, reply_tx, minimized) => { debug!( target: LOG_TARGET, "[{}] Disconnect{}requested for {} connection to peer '{}'", @@ -422,7 +442,7 @@ impl PeerConnectionActor { self.direction, self.peer_node_id.short_str() ); - let _result = reply_tx.send(self.disconnect(silent).await); + let _result = reply_tx.send(self.disconnect(silent, minimized).await); }, } } @@ -518,7 +538,7 @@ impl PeerConnectionActor { /// # Arguments /// /// silent - true to suppress the PeerDisconnected event, false to publish the event - async fn disconnect(&mut self, silent: bool) -> Result<(), PeerConnectionError> { + async fn disconnect(&mut self, silent: bool, minimized: Minimized) -> Result<(), PeerConnectionError> { self.request_rx.close(); match self.control.close().await { Err(yamux::ConnectionError::Closed) => { @@ -536,6 +556,7 @@ impl PeerConnectionActor { self.notify_event(ConnectionManagerEvent::PeerDisconnected( self.id, self.peer_node_id.clone(), + minimized, )) .await; } diff --git a/comms/core/src/connection_manager/requester.rs b/comms/core/src/connection_manager/requester.rs index 40a09da7f9d..1796d33812e 100644 --- a/comms/core/src/connection_manager/requester.rs +++ b/comms/core/src/connection_manager/requester.rs @@ -37,6 +37,7 @@ pub enum ConnectionManagerRequest { DialPeer { node_id: NodeId, reply_tx: Option>>, + ignore_saf: bool, }, /// Cancels a pending dial if one exists CancelDial(NodeId), @@ -75,9 +76,13 @@ impl ConnectionManagerRequester { } /// Attempt to connect to a remote peer - pub async fn dial_peer(&mut self, node_id: NodeId) -> Result { + pub async fn dial_peer( + &mut self, + node_id: NodeId, + bypass_saf: bool, + ) -> Result { let (reply_tx, reply_rx) = oneshot::channel(); - self.send_dial_peer(node_id, Some(reply_tx)).await?; + self.send_dial_peer(node_id, Some(reply_tx), bypass_saf).await?; reply_rx .await .map_err(|_| ConnectionManagerError::ActorRequestCanceled)? @@ -97,9 +102,14 @@ impl ConnectionManagerRequester { &mut self, node_id: NodeId, reply_tx: Option>>, + ignore_saf: bool, ) -> Result<(), ConnectionManagerError> { self.sender - .send(ConnectionManagerRequest::DialPeer { node_id, reply_tx }) + .send(ConnectionManagerRequest::DialPeer { + node_id, + reply_tx, + ignore_saf, + }) .await .map_err(|_| ConnectionManagerError::SendToActorFailed)?; Ok(()) diff --git a/comms/core/src/connection_manager/tests/listener_dialer.rs b/comms/core/src/connection_manager/tests/listener_dialer.rs index a9ff5d4b12f..1a442eff725 100644 --- a/comms/core/src/connection_manager/tests/listener_dialer.rs +++ b/comms/core/src/connection_manager/tests/listener_dialer.rs @@ -46,6 +46,7 @@ use crate::{ protocol::ProtocolId, test_utils::{build_peer_manager, node_identity::build_node_identity}, transports::MemoryTransport, + Minimized, }; #[tokio::test] @@ -129,7 +130,7 @@ async fn smoke() { let (reply_tx, reply_rx) = oneshot::channel(); request_tx - .send(DialerRequest::Dial(Box::new(peer), Some(reply_tx))) + .send(DialerRequest::Dial(Box::new(peer), Some(reply_tx), false)) .await .unwrap(); @@ -161,7 +162,7 @@ async fn smoke() { assert_eq!(buf, *b"HELLO"); } - conn1.disconnect().await.unwrap(); + conn1.disconnect(Minimized::No).await.unwrap(); shutdown.trigger(); @@ -237,7 +238,7 @@ async fn banned() { let (reply_tx, reply_rx) = oneshot::channel(); request_tx - .send(DialerRequest::Dial(Box::new(peer), Some(reply_tx))) + .send(DialerRequest::Dial(Box::new(peer), Some(reply_tx), false)) .await .unwrap(); diff --git a/comms/core/src/connection_manager/tests/manager.rs b/comms/core/src/connection_manager/tests/manager.rs index 1abde14e219..270b172702a 100644 --- a/comms/core/src/connection_manager/tests/manager.rs +++ b/comms/core/src/connection_manager/tests/manager.rs @@ -76,7 +76,7 @@ async fn connect_to_nonexistent_peer() { rt_handle.spawn(connection_manager.run()); - let err = requester.dial_peer(NodeId::default()).await.unwrap_err(); + let err = requester.dial_peer(NodeId::default(), false).await.unwrap_err(); unpack_enum!(ConnectionManagerError::PeerManagerError(PeerManagerError::PeerNotFoundError) = err); shutdown.trigger(); @@ -150,7 +150,10 @@ async fn dial_success() { .await .unwrap(); - let mut conn_out = conn_man1.dial_peer(node_identity2.node_id().clone()).await.unwrap(); + let mut conn_out = conn_man1 + .dial_peer(node_identity2.node_id().clone(), false) + .await + .unwrap(); assert_eq!(conn_out.peer_node_id(), node_identity2.node_id()); let peer2 = peer_manager1 .find_by_node_id(conn_out.peer_node_id()) @@ -272,7 +275,10 @@ async fn dial_success_aux_tcp_listener() { ); conn_man2.wait_until_listening().await.unwrap(); - let mut connection = conn_man2.dial_peer(node_identity1.node_id().clone()).await.unwrap(); + let mut connection = conn_man2 + .dial_peer(node_identity1.node_id().clone(), false) + .await + .unwrap(); assert_eq!(connection.peer_node_id(), node_identity1.node_id()); let mut substream_out = connection.open_substream(&TEST_PROTO).await.unwrap(); @@ -356,8 +362,8 @@ async fn simultaneous_dial_events() { // Dial at the same time let (result1, result2) = future::join( - conn_man1.dial_peer(node_identities[1].node_id().clone()), - conn_man2.dial_peer(node_identities[0].node_id().clone()), + conn_man1.dial_peer(node_identities[1].node_id().clone(), false), + conn_man2.dial_peer(node_identities[0].node_id().clone(), false), ) .await; @@ -420,7 +426,7 @@ async fn dial_cancelled() { let node_id = node_identity2.node_id().clone(); async move { ready_tx.send(()).unwrap(); - cm.dial_peer(node_id).await + cm.dial_peer(node_id, false).await } }); diff --git a/comms/core/src/connectivity/config.rs b/comms/core/src/connectivity/config.rs index 2ebc47fe918..02a65b3c7d2 100644 --- a/comms/core/src/connectivity/config.rs +++ b/comms/core/src/connectivity/config.rs @@ -49,6 +49,9 @@ pub struct ConnectivityConfig { /// next connection attempt. /// Default: 24 hours pub expire_peer_last_seen_duration: Duration, + /// The closest number of peer connections to maintain; connections above the threshold will be removed + /// (default: disabled) + pub maintain_n_closest_connections_only: Option, } impl Default for ConnectivityConfig { @@ -62,6 +65,7 @@ impl Default for ConnectivityConfig { max_failures_mark_offline: 1, connection_tie_break_linger: Duration::from_secs(2), expire_peer_last_seen_duration: Duration::from_secs(24 * 60 * 60), + maintain_n_closest_connections_only: None, } } } diff --git a/comms/core/src/connectivity/connection_pool.rs b/comms/core/src/connectivity/connection_pool.rs index fb8fe017c50..72633fc2e00 100644 --- a/comms/core/src/connectivity/connection_pool.rs +++ b/comms/core/src/connectivity/connection_pool.rs @@ -24,7 +24,7 @@ use std::{collections::HashMap, fmt, time::Duration}; use nom::lib::std::collections::hash_map::Entry; -use crate::{peer_manager::NodeId, PeerConnection}; +use crate::{peer_manager::NodeId, Minimized, PeerConnection}; /// Status type for connections #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -34,7 +34,7 @@ pub enum ConnectionStatus { Connected, Retrying, Failed, - Disconnected, + Disconnected(Minimized), } impl fmt::Display for ConnectionStatus { @@ -124,7 +124,7 @@ impl ConnectionPool { entry_mut.status = if conn.is_connected() { ConnectionStatus::Connected } else { - ConnectionStatus::Disconnected + ConnectionStatus::Disconnected(Minimized::No) }; entry_mut.set_connection(conn); entry_mut.status @@ -232,12 +232,23 @@ impl ConnectionPool { self.count_filtered(|c| c.is_connected()) } + pub fn set_peer_allow_list_state(&mut self, node_id: &NodeId, allowed: bool) { + if let Some(peer_state) = self.connections.get_mut(node_id) { + if let Some(connection) = peer_state.connection_mut() { + connection.set_peer_on_allow_list(allowed); + } + } + } + pub fn count_failed(&self) -> usize { self.count_filtered(|c| c.status() == ConnectionStatus::Failed) } pub fn count_disconnected(&self) -> usize { - self.count_filtered(|c| c.status() == ConnectionStatus::Disconnected) + self.count_filtered(|c| { + c.status() == ConnectionStatus::Disconnected(Minimized::Yes) || + c.status() == ConnectionStatus::Disconnected(Minimized::No) + }) } pub fn count_entries(&self) -> usize { diff --git a/comms/core/src/connectivity/manager.rs b/comms/core/src/connectivity/manager.rs index 1e9b7d18e3a..0e1ed315900 100644 --- a/comms/core/src/connectivity/manager.rs +++ b/comms/core/src/connectivity/manager.rs @@ -55,6 +55,7 @@ use crate::{ }, peer_manager::NodeId, utils::datetime::format_duration, + Minimized, NodeIdentity, PeerConnection, PeerManager, @@ -222,11 +223,19 @@ impl ConnectivityManagerActor { GetConnectivityStatus(reply) => { let _ = reply.send(self.status); }, - DialPeer { node_id, reply_tx } => { + DialPeer { + node_id, + reply_tx, + bypass_saf, + } => { let tracing_id = tracing::Span::current().id(); let span = span!(Level::TRACE, "handle_dial_peer"); span.follows_from(tracing_id); - self.handle_dial_peer(node_id, reply_tx).instrument(span).await; + self.handle_dial_peer(node_id.clone(), reply_tx, bypass_saf) + .instrument(span) + .await; + self.pool + .set_peer_allow_list_state(&node_id, self.allow_list.contains(&node_id)); }, SelectConnections(selection, reply) => { let _result = reply.send(self.select_connections(selection).await); @@ -269,13 +278,15 @@ impl ConnectivityManagerActor { }, AddPeerToAllowList(node_id) => { if !self.allow_list.contains(&node_id) { - self.allow_list.push(node_id) + self.allow_list.push(node_id.clone()); } + self.pool.set_peer_allow_list_state(&node_id, true); }, RemovePeerFromAllowList(node_id) => { if let Some(index) = self.allow_list.iter().position(|x| *x == node_id) { self.allow_list.remove(index); } + self.pool.set_peer_allow_list_state(&node_id, false); }, GetActiveConnections(reply) => { let _result = reply.send( @@ -293,6 +304,7 @@ impl ConnectivityManagerActor { &mut self, node_id: NodeId, reply_tx: Option>>, + bypass_saf: bool, ) { match self.peer_manager.is_peer_banned(&node_id).await { Ok(true) => { @@ -335,7 +347,11 @@ impl ConnectivityManagerActor { }, } - if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply_tx).await { + if let Err(err) = self + .connection_manager + .send_dial_peer(node_id, reply_tx, bypass_saf) + .await + { error!( target: LOG_TARGET, "Failed to send dial request to connection manager: {:?}", err @@ -352,7 +368,7 @@ impl ConnectivityManagerActor { if !conn.is_connected() { continue; } - match conn.disconnect_silent().await { + match conn.disconnect_silent(Minimized::No).await { Ok(_) => { node_ids.push(conn.peer_node_id().clone()); }, @@ -369,7 +385,7 @@ impl ConnectivityManagerActor { } for node_id in node_ids { - self.publish_event(ConnectivityEvent::PeerDisconnected(node_id)); + self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, Minimized::No)); } } @@ -389,11 +405,68 @@ impl ConnectivityManagerActor { if self.config.is_connection_reaping_enabled { self.reap_inactive_connections().await; } + if let Some(threshold) = self.config.maintain_n_closest_connections_only { + self.maintain_n_closest_peer_connections_only(threshold).await; + } self.update_connectivity_status(); self.update_connectivity_metrics(); Ok(()) } + async fn maintain_n_closest_peer_connections_only(&mut self, threshold: usize) { + // Select all active peer connections + let mut connections = match self + .select_connections(ConnectivitySelection::closest_to( + self.node_identity.node_id().clone(), + self.pool.count_connected_nodes(), + vec![], + )) + .await + { + Ok(peers) => peers, + Err(e) => { + warn!( + target: LOG_TARGET, + "Connectivity error trying to maintain {} closest peers ({:?})", + threshold, + e + ); + return; + }, + }; + + // Remove peers that on are the allow list + let mut nodes_to_ignore = vec![]; + for conn in &connections { + if self.allow_list.contains(conn.peer_node_id()) { + nodes_to_ignore.push(conn.peer_node_id().clone()); + } + } + connections.retain(|conn| !nodes_to_ignore.contains(conn.peer_node_id())); + // Remove peers that are not communication nodes + connections.retain(|conn| conn.peer_features().is_node()); + + // Disconnect all remaining peers above the threshold + for conn in connections.iter_mut().skip(threshold) { + debug!( + target: LOG_TARGET, + "minimize_connections: Disconnecting '{}' because the node is not among the {} closest peers", + conn.peer_node_id(), + threshold + ); + if let Err(err) = conn.disconnect(Minimized::Yes).await { + // Already disconnected + debug!( + target: LOG_TARGET, + "Peer '{}' already disconnected. Error: {:?}", + conn.peer_node_id().short_str(), + err + ); + } + self.pool.remove(conn.peer_node_id()); + } + } + async fn reap_inactive_connections(&mut self) { let excess_connections = self .pool @@ -418,7 +491,7 @@ impl ConnectivityManagerActor { conn.peer_node_id().short_str(), conn.handle_count() ); - if let Err(err) = conn.disconnect().await { + if let Err(err) = conn.disconnect(Minimized::Yes).await { // Already disconnected debug!( target: LOG_TARGET, @@ -432,7 +505,9 @@ impl ConnectivityManagerActor { fn clean_connection_pool(&mut self) { let cleared_states = self.pool.filter_drain(|state| { - state.status() == ConnectionStatus::Failed || state.status() == ConnectionStatus::Disconnected + state.status() == ConnectionStatus::Failed || + state.status() == ConnectionStatus::Disconnected(Minimized::Yes) || + state.status() == ConnectionStatus::Disconnected(Minimized::No) }); if !cleared_states.is_empty() { @@ -560,7 +635,7 @@ impl ConnectivityManagerActor { TieBreak::UseNew | TieBreak::None => {}, } }, - PeerDisconnected(id, node_id) => { + PeerDisconnected(id, node_id, _minimized) => { if let Some(conn) = self.pool.get_connection(node_id) { if conn.id() != *id { debug!( @@ -586,7 +661,7 @@ impl ConnectivityManagerActor { } let (node_id, mut new_status, connection) = match event { - PeerDisconnected(_, node_id) => (node_id, ConnectionStatus::Disconnected, None), + PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None), PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())), PeerConnectFailed(node_id, ConnectionManagerError::DialCancelled) => { @@ -632,9 +707,10 @@ impl ConnectivityManagerActor { use ConnectionStatus::{Connected, Disconnected, Failed}; match (old_status, new_status) { - (_, Connected) => match self.pool.get_connection(&node_id).cloned() { - Some(conn) => { + (_, Connected) => match self.pool.get_connection_mut(&node_id).cloned() { + Some(mut conn) => { self.mark_connection_success(conn.peer_node_id().clone()); + conn.set_peer_on_allow_list(self.allow_list.contains(&node_id)); self.publish_event(ConnectivityEvent::PeerConnected(conn.into())); }, None => unreachable!( @@ -642,11 +718,14 @@ impl ConnectivityManagerActor { ConnectionPool::get_connection is Some" ), }, - (Connected, Disconnected) => { - self.publish_event(ConnectivityEvent::PeerDisconnected(node_id)); + (Connected, Disconnected(..)) => { + self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, match new_status { + ConnectionStatus::Disconnected(reason) => reason, + _ => Minimized::No, + })); }, // Was not connected so don't broadcast event - (_, Disconnected) => {}, + (_, Disconnected(..)) => {}, (_, Failed) => { self.publish_event(ConnectivityEvent::PeerConnectFailed(node_id)); }, @@ -692,7 +771,7 @@ impl ConnectivityManagerActor { existing_conn.direction(), ); - let _result = existing_conn.disconnect_silent().await; + let _result = existing_conn.disconnect_silent(Minimized::Yes).await; self.pool.remove(existing_conn.peer_node_id()); TieBreak::UseNew } else { @@ -708,7 +787,7 @@ impl ConnectivityManagerActor { existing_conn.direction(), ); - let _result = new_conn.clone().disconnect_silent().await; + let _result = new_conn.clone().disconnect_silent(Minimized::Yes).await; TieBreak::KeepExisting } }, @@ -887,7 +966,7 @@ impl ConnectivityManagerActor { self.publish_event(ConnectivityEvent::PeerBanned(node_id.clone())); if let Some(conn) = self.pool.get_connection_mut(node_id) { - conn.disconnect().await?; + conn.disconnect(Minimized::Yes).await?; let status = self.pool.get_connection_status(node_id); debug!( target: LOG_TARGET, @@ -903,7 +982,7 @@ impl ConnectivityManagerActor { let status = self.pool.get_connection_status(node_id); if matches!( status, - ConnectionStatus::NotConnected | ConnectionStatus::Failed | ConnectionStatus::Disconnected + ConnectionStatus::NotConnected | ConnectionStatus::Failed | ConnectionStatus::Disconnected(_) ) { to_remove.push(node_id.clone()); } diff --git a/comms/core/src/connectivity/requester.rs b/comms/core/src/connectivity/requester.rs index b2eff5f35e4..f62a379a6db 100644 --- a/comms/core/src/connectivity/requester.rs +++ b/comms/core/src/connectivity/requester.rs @@ -41,6 +41,7 @@ use super::{ use crate::{ connection_manager::ConnectionManagerError, peer_manager::{NodeId, Peer}, + Minimized, PeerConnection, }; @@ -54,7 +55,7 @@ pub type ConnectivityEventTx = broadcast::Sender; /// Node connectivity events emitted by the ConnectivityManager. #[derive(Debug, Clone)] pub enum ConnectivityEvent { - PeerDisconnected(NodeId), + PeerDisconnected(NodeId, Minimized), PeerConnected(Box), PeerConnectFailed(NodeId), PeerBanned(NodeId), @@ -69,7 +70,7 @@ impl fmt::Display for ConnectivityEvent { #[allow(clippy::enum_glob_use)] use ConnectivityEvent::*; match self { - PeerDisconnected(node_id) => write!(f, "PeerDisconnected({})", node_id), + PeerDisconnected(node_id, minimized) => write!(f, "PeerDisconnected({}, {:?})", node_id, minimized), PeerConnected(node_id) => write!(f, "PeerConnected({})", node_id), PeerConnectFailed(node_id) => write!(f, "PeerConnectFailed({})", node_id), PeerBanned(node_id) => write!(f, "PeerBanned({})", node_id), @@ -88,6 +89,7 @@ pub enum ConnectivityRequest { DialPeer { node_id: NodeId, reply_tx: Option>>, + bypass_saf: bool, }, GetConnectivityStatus(oneshot::Sender), SelectConnections( @@ -127,7 +129,7 @@ impl ConnectivityRequester { } /// Dial a single peer - pub async fn dial_peer(&self, peer: NodeId) -> Result { + pub async fn dial_peer(&self, peer: NodeId, bypass_saf: bool) -> Result { let mut num_cancels = 0; loop { let (reply_tx, reply_rx) = oneshot::channel(); @@ -135,6 +137,7 @@ impl ConnectivityRequester { .send(ConnectivityRequest::DialPeer { node_id: peer.clone(), reply_tx: Some(reply_tx), + bypass_saf, }) .await .map_err(|_| ConnectivityError::ActorDisconnected)?; @@ -157,22 +160,26 @@ impl ConnectivityRequester { /// Dial many peers, returning a Stream that emits the dial Result as each dial completes. #[allow(clippy::let_with_type_underscore)] - pub fn dial_many_peers>( + pub fn dial_many_peers>( &self, peers: I, ) -> impl Stream> + '_ { peers .into_iter() - .map(|peer| self.dial_peer(peer)) + .map(|peer| self.dial_peer(peer.0, peer.1)) .collect::>() } /// Send a request to dial many peers without waiting for the response. - pub async fn request_many_dials>(&self, peers: I) -> Result<(), ConnectivityError> { + pub async fn request_many_dials>( + &self, + peers: I, + ) -> Result<(), ConnectivityError> { future::join_all(peers.into_iter().map(|peer| { self.sender.send(ConnectivityRequest::DialPeer { - node_id: peer, + node_id: peer.0, reply_tx: None, + bypass_saf: peer.1, }) })) .await diff --git a/comms/core/src/connectivity/test.rs b/comms/core/src/connectivity/test.rs index ef5dd65d9f7..e42be587665 100644 --- a/comms/core/src/connectivity/test.rs +++ b/comms/core/src/connectivity/test.rs @@ -43,6 +43,7 @@ use crate::{ mocks::{create_connection_manager_mock, create_peer_connection_mock_pair, ConnectionManagerMockState}, node_identity::{build_many_node_identities, build_node_identity}, }, + Minimized, NodeIdentity, PeerManager, }; @@ -171,12 +172,12 @@ async fn online_then_offline_then_online() { .collect::>(); connectivity - .dial_many_peers(peers.iter().map(|p| p.node_id.clone())) + .dial_many_peers(peers.iter().map(|p| (p.node_id.clone(), false))) .collect::>() .await; connectivity - .dial_many_peers(clients.iter().map(|p| p.node_id().clone())) + .dial_many_peers(clients.iter().map(|p| (p.node_id().clone(), false))) .collect::>() .await; @@ -203,6 +204,7 @@ async fn online_then_offline_then_online() { cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected( conn.id(), conn.peer_node_id().clone(), + Minimized::No, )); } @@ -224,6 +226,7 @@ async fn online_then_offline_then_online() { cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected( conn.id(), conn.peer_node_id().clone(), + Minimized::No, )); } @@ -332,7 +335,7 @@ async fn peer_selection() { .collect::>(); connectivity - .dial_many_peers(peers.iter().take(5).map(|p| p.node_id.clone())) + .dial_many_peers(peers.iter().take(5).map(|p| (p.node_id.clone(), false))) .collect::>() .await; @@ -395,7 +398,7 @@ async fn pool_management() { .collect::>(); connectivity - .dial_many_peers(peers.iter().take(5).map(|p| p.node_id.clone())) + .dial_many_peers(peers.iter().take(5).map(|p| (p.node_id.clone(), false))) .collect::>() .await; @@ -420,10 +423,11 @@ async fn pool_management() { if conn != important_connection { assert_eq!(conn.handle_count(), 2); // The peer connection mock does not "automatically" publish event to connectivity manager - conn.disconnect().await.unwrap(); + conn.disconnect(Minimized::No).await.unwrap(); cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected( conn.id(), conn.peer_node_id().clone(), + Minimized::No, )); } } @@ -432,7 +436,7 @@ async fn pool_management() { let events = collect_try_recv!(event_stream, take = 9, timeout = Duration::from_secs(10)); for event in events { - unpack_enum!(ConnectivityEvent::PeerDisconnected(_) = event); + unpack_enum!(ConnectivityEvent::PeerDisconnected(..) = event); } assert_eq!(important_connection.handle_count(), 2); @@ -440,15 +444,16 @@ async fn pool_management() { let conns = connectivity.get_active_connections().await.unwrap(); assert_eq!(conns.len(), 1); - important_connection.disconnect().await.unwrap(); + important_connection.disconnect(Minimized::No).await.unwrap(); cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected( important_connection.id(), important_connection.peer_node_id().clone(), + Minimized::No, )); drop(important_connection); let mut events = collect_try_recv!(event_stream, take = 1, timeout = Duration::from_secs(10)); - unpack_enum!(ConnectivityEvent::PeerDisconnected(_) = events.remove(0)); + unpack_enum!(ConnectivityEvent::PeerDisconnected(..) = events.remove(0)); let conns = connectivity.get_active_connections().await.unwrap(); assert!(conns.is_empty()); } diff --git a/comms/core/src/lib.rs b/comms/core/src/lib.rs index b9c493a09a1..face0f13c59 100644 --- a/comms/core/src/lib.rs +++ b/comms/core/src/lib.rs @@ -64,3 +64,10 @@ pub use async_trait::async_trait; pub use bytes::{Buf, BufMut, Bytes, BytesMut}; #[cfg(feature = "rpc")] pub use tower::make::MakeService; + +/// Was the connection closed due to minimize_connections +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum Minimized { + Yes, + No, +} diff --git a/comms/core/src/net_address/multiaddr_with_stats.rs b/comms/core/src/net_address/multiaddr_with_stats.rs index 29e1fcfb960..fc5d3345e59 100644 --- a/comms/core/src/net_address/multiaddr_with_stats.rs +++ b/comms/core/src/net_address/multiaddr_with_stats.rs @@ -183,6 +183,7 @@ impl MultiaddrWithStats { /// Reset the connection attempts on this net address for a later session of retries pub fn reset_connection_attempts(&mut self) { self.connection_attempts = 0; + self.last_failed_reason = None; } /// Mark that a connection could not be established with this net address diff --git a/comms/core/src/net_address/mutliaddresses_with_stats.rs b/comms/core/src/net_address/mutliaddresses_with_stats.rs index 8b9627e00cb..6aad909315a 100644 --- a/comms/core/src/net_address/mutliaddresses_with_stats.rs +++ b/comms/core/src/net_address/mutliaddresses_with_stats.rs @@ -467,9 +467,15 @@ mod test { assert_eq!(net_addresses.addresses[0].connection_attempts(), 2); assert_eq!(net_addresses.addresses[1].connection_attempts(), 1); assert_eq!(net_addresses.addresses[2].connection_attempts(), 1); + assert!(net_addresses.addresses[0].last_failed_reason().is_some()); + assert!(net_addresses.addresses[1].last_failed_reason().is_some()); + assert!(net_addresses.addresses[2].last_failed_reason().is_some()); net_addresses.reset_connection_attempts(); assert_eq!(net_addresses.addresses[0].connection_attempts(), 0); assert_eq!(net_addresses.addresses[1].connection_attempts(), 0); assert_eq!(net_addresses.addresses[2].connection_attempts(), 0); + assert!(net_addresses.addresses[0].last_failed_reason().is_none()); + assert!(net_addresses.addresses[1].last_failed_reason().is_none()); + assert!(net_addresses.addresses[2].last_failed_reason().is_none()); } } diff --git a/comms/core/src/peer_manager/peer.rs b/comms/core/src/peer_manager/peer.rs index f4786d57ce3..657722deb4a 100644 --- a/comms/core/src/peer_manager/peer.rs +++ b/comms/core/src/peer_manager/peer.rs @@ -208,6 +208,11 @@ impl Peer { self.addresses.last_seen() } + /// Provides info about the failure status of all addresses + pub fn all_addresses_failed(&self) -> bool { + self.addresses.iter().all(|a| a.last_failed_reason().is_some()) + } + /// Provides that length of time since the last successful interaction with the peer pub fn last_seen_since(&self) -> Option { self.last_seen() diff --git a/comms/core/src/peer_manager/peer_query.rs b/comms/core/src/peer_manager/peer_query.rs index 71f56b3fd7b..fe049e7e377 100644 --- a/comms/core/src/peer_manager/peer_query.rs +++ b/comms/core/src/peer_manager/peer_query.rs @@ -246,7 +246,7 @@ mod test { #[test] fn limit_query() { - // Create 20 peers were the 1st and last one is bad + // Create some good peers let db = HashmapDatabase::new(); let mut id_counter = 0; @@ -262,11 +262,7 @@ mod test { #[test] fn select_where_query() { - // Create peer manager with random peers - let mut sample_peers = Vec::new(); - // Create 20 peers were the 1st and last one is bad - let _rng = rand::rngs::OsRng; - sample_peers.push(create_test_peer(true)); + // Create some good and bad peers let db = HashmapDatabase::new(); let mut id_counter = 0; @@ -292,11 +288,7 @@ mod test { #[test] fn select_where_limit_query() { - // Create peer manager with random peers - let mut sample_peers = Vec::new(); - // Create 20 peers were the 1st and last one is bad - let _rng = rand::rngs::OsRng; - sample_peers.push(create_test_peer(true)); + // Create some good and bad peers let db = HashmapDatabase::new(); let mut id_counter = 0; @@ -333,11 +325,7 @@ mod test { #[test] fn sort_by_query() { - // Create peer manager with random peers - let mut sample_peers = Vec::new(); - // Create 20 peers were the 1st and last one is bad - let _rng = rand::rngs::OsRng; - sample_peers.push(create_test_peer(true)); + // Create some good and bad peers let db = HashmapDatabase::new(); let mut id_counter = 0; diff --git a/comms/core/src/protocol/messaging/outbound.rs b/comms/core/src/protocol/messaging/outbound.rs index b560b7867e0..ba4480ff1ca 100644 --- a/comms/core/src/protocol/messaging/outbound.rs +++ b/comms/core/src/protocol/messaging/outbound.rs @@ -53,6 +53,7 @@ pub struct OutboundMessaging { retry_queue_tx: mpsc::UnboundedSender, peer_node_id: NodeId, protocol_id: ProtocolId, + bypass_saf: bool, } impl OutboundMessaging { @@ -63,6 +64,7 @@ impl OutboundMessaging { retry_queue_tx: mpsc::UnboundedSender, peer_node_id: NodeId, protocol_id: ProtocolId, + bypass_saf: bool, ) -> Self { Self { connectivity, @@ -71,6 +73,7 @@ impl OutboundMessaging { retry_queue_tx, peer_node_id, protocol_id, + bypass_saf, } } @@ -166,7 +169,11 @@ impl OutboundMessaging { async fn try_dial_peer(&mut self) -> Result { loop { - match self.connectivity.dial_peer(self.peer_node_id.clone()).await { + match self + .connectivity + .dial_peer(self.peer_node_id.clone(), self.bypass_saf) + .await + { Ok(conn) => break Ok(conn), Err(ConnectivityError::DialCancelled) => { debug!( diff --git a/comms/core/src/protocol/messaging/protocol.rs b/comms/core/src/protocol/messaging/protocol.rs index 0fc6a7d5a9b..81c4745c3c3 100644 --- a/comms/core/src/protocol/messaging/protocol.rs +++ b/comms/core/src/protocol/messaging/protocol.rs @@ -289,6 +289,7 @@ impl MessagingProtocol { peer_node_id, self.retry_queue_tx.clone(), self.protocol_id.clone(), + false, // TODO: Hansie complete this ); break entry.insert(sender); }, @@ -318,6 +319,7 @@ impl MessagingProtocol { peer_node_id: NodeId, retry_queue_tx: mpsc::UnboundedSender, protocol_id: ProtocolId, + bypass_saf: bool, ) -> mpsc::UnboundedSender { let (msg_tx, msg_rx) = mpsc::unbounded_channel(); let outbound_messaging = OutboundMessaging::new( @@ -327,6 +329,7 @@ impl MessagingProtocol { retry_queue_tx, peer_node_id, protocol_id, + bypass_saf, ); tokio::spawn(outbound_messaging.run()); msg_tx diff --git a/comms/core/src/protocol/rpc/client/tests.rs b/comms/core/src/protocol/rpc/client/tests.rs index ecd484f3ddc..4f8d0a14917 100644 --- a/comms/core/src/protocol/rpc/client/tests.rs +++ b/comms/core/src/protocol/rpc/client/tests.rs @@ -76,7 +76,10 @@ async fn setup(num_concurrent_sessions: usize) -> (PeerConnection, PeerConnectio mod lazy_pool { use super::*; - use crate::protocol::rpc::client::pool::{LazyPool, RpcClientPoolError}; + use crate::{ + protocol::rpc::client::pool::{LazyPool, RpcClientPoolError}, + Minimized, + }; #[tokio::test] async fn it_connects_lazily() { @@ -168,7 +171,7 @@ mod lazy_pool { let (mut peer_conn, _, _shutdown) = setup(2).await; let mut pool = LazyPool::::new(peer_conn.clone(), 2, Default::default()); let mut _conn1 = pool.get_least_used_or_connect().await.unwrap(); - peer_conn.disconnect().await.unwrap(); + peer_conn.disconnect(Minimized::No).await.unwrap(); let err = pool.get_least_used_or_connect().await.unwrap_err(); unpack_enum!(RpcClientPoolError::PeerConnectionDropped { .. } = err); } diff --git a/comms/core/src/protocol/rpc/context.rs b/comms/core/src/protocol/rpc/context.rs index 47ef988fb66..7b7a8fd4017 100644 --- a/comms/core/src/protocol/rpc/context.rs +++ b/comms/core/src/protocol/rpc/context.rs @@ -36,7 +36,7 @@ use crate::{ #[async_trait] pub trait RpcCommsProvider: Send + Sync { async fn fetch_peer(&self, node_id: &NodeId) -> Result; - async fn dial_peer(&mut self, node_id: &NodeId) -> Result; + async fn dial_peer(&mut self, node_id: &NodeId, bypass_saf: bool) -> Result; async fn select_connections(&mut self, selection: ConnectivitySelection) -> Result, RpcError>; } @@ -70,8 +70,11 @@ impl RpcCommsProvider for RpcCommsBackend { .map_err(Into::into) } - async fn dial_peer(&mut self, node_id: &NodeId) -> Result { - self.connectivity.dial_peer(node_id.clone()).await.map_err(Into::into) + async fn dial_peer(&mut self, node_id: &NodeId, bypass_saf: bool) -> Result { + self.connectivity + .dial_peer(node_id.clone(), bypass_saf) + .await + .map_err(Into::into) } async fn select_connections(&mut self, selection: ConnectivitySelection) -> Result, RpcError> { @@ -112,8 +115,8 @@ impl RequestContext { } #[allow(dead_code)] - async fn dial_peer(&mut self, node_id: &NodeId) -> Result { - self.backend.dial_peer(node_id).await + async fn dial_peer(&mut self, node_id: &NodeId, bypass_saf: bool) -> Result { + self.backend.dial_peer(node_id, bypass_saf).await } #[allow(dead_code)] diff --git a/comms/core/src/protocol/rpc/server/mock.rs b/comms/core/src/protocol/rpc/server/mock.rs index 94862b7081f..9c4f0d1aebf 100644 --- a/comms/core/src/protocol/rpc/server/mock.rs +++ b/comms/core/src/protocol/rpc/server/mock.rs @@ -192,7 +192,7 @@ impl RpcCommsProvider for MockCommsProvider { unimplemented!() } - async fn dial_peer(&mut self, _: &NodeId) -> Result { + async fn dial_peer(&mut self, _: &NodeId, _bypass_saf: bool) -> Result { unimplemented!() } diff --git a/comms/core/src/protocol/rpc/test/comms_integration.rs b/comms/core/src/protocol/rpc/test/comms_integration.rs index f2d6f6dae55..93c68ba4f39 100644 --- a/comms/core/src/protocol/rpc/test/comms_integration.rs +++ b/comms/core/src/protocol/rpc/test/comms_integration.rs @@ -74,7 +74,7 @@ async fn run_service() { let mut conn = comms2 .connectivity() - .dial_peer(comms1.node_identity().node_id().clone()) + .dial_peer(comms1.node_identity().node_id().clone(), false) .await .unwrap(); diff --git a/comms/core/src/test_utils/mocks/connection_manager.rs b/comms/core/src/test_utils/mocks/connection_manager.rs index a84a2a65f60..6a3c441632b 100644 --- a/comms/core/src/test_utils/mocks/connection_manager.rs +++ b/comms/core/src/test_utils/mocks/connection_manager.rs @@ -131,7 +131,9 @@ impl ConnectionManagerMock { self.state.inc_call_count(); self.state.add_call(format!("{:?}", req)).await; match req { - DialPeer { node_id, mut reply_tx } => { + DialPeer { + node_id, mut reply_tx, .. + } => { // Send Ok(&mut conn) if we have an active connection, otherwise Err(DialConnectFailedAllAddresses) let result = self .state diff --git a/comms/core/src/test_utils/mocks/connectivity_manager.rs b/comms/core/src/test_utils/mocks/connectivity_manager.rs index ae81bc05db1..2aa67123273 100644 --- a/comms/core/src/test_utils/mocks/connectivity_manager.rs +++ b/comms/core/src/test_utils/mocks/connectivity_manager.rs @@ -231,7 +231,7 @@ impl ConnectivityManagerMock { use ConnectivityRequest::*; self.state.add_call(format!("{:?}", req)).await; match req { - DialPeer { node_id, reply_tx } => { + DialPeer { node_id, reply_tx, .. } => { self.state.add_dialed_peer(node_id.clone()).await; // No reply, no reason to do anything in the mock if reply_tx.is_none() { diff --git a/comms/core/src/test_utils/mocks/peer_connection.rs b/comms/core/src/test_utils/mocks/peer_connection.rs index dbd67f50458..9a37136d227 100644 --- a/comms/core/src/test_utils/mocks/peer_connection.rs +++ b/comms/core/src/test_utils/mocks/peer_connection.rs @@ -65,6 +65,7 @@ pub fn create_dummy_peer_connection(node_id: NodeId) -> (PeerConnection, mpsc::R addr, ConnectionDirection::Inbound, AtomicRefCounter::new(), + false, ), rx, ) @@ -102,6 +103,7 @@ pub async fn create_peer_connection_mock_pair( listen_addr.clone(), ConnectionDirection::Inbound, mock_state_in.substream_counter(), + false, ), mock_state_in, PeerConnection::new( @@ -112,6 +114,7 @@ pub async fn create_peer_connection_mock_pair( listen_addr, ConnectionDirection::Outbound, mock_state_out.substream_counter(), + false, ), mock_state_out, ) @@ -221,7 +224,7 @@ impl PeerConnectionMock { reply_tx.send(Err(err)).unwrap(); }, }, - Disconnect(_, reply_tx) => { + Disconnect(_, reply_tx, _minimized) => { self.receiver.close(); reply_tx.send(self.state.disconnect().await).unwrap(); }, diff --git a/comms/core/tests/tests/rpc.rs b/comms/core/tests/tests/rpc.rs index d4845d226fc..ea3bf8d440a 100644 --- a/comms/core/tests/tests/rpc.rs +++ b/comms/core/tests/tests/rpc.rs @@ -76,7 +76,7 @@ async fn client_prematurely_ends_session() { let mut conn1_2 = node1 .connectivity() - .dial_peer(node2.node_identity().node_id().clone()) + .dial_peer(node2.node_identity().node_id().clone(), false) .await .unwrap(); diff --git a/comms/core/tests/tests/rpc_stress.rs b/comms/core/tests/tests/rpc_stress.rs index 9a445e8f140..239f74d840f 100644 --- a/comms/core/tests/tests/rpc_stress.rs +++ b/comms/core/tests/tests/rpc_stress.rs @@ -100,7 +100,7 @@ async fn run_stress_test(test_params: Params) { let conn1_2 = node1 .connectivity() - .dial_peer(node2.node_identity().node_id().clone()) + .dial_peer(node2.node_identity().node_id().clone(), false) .await .unwrap(); diff --git a/comms/core/tests/tests/substream_stress.rs b/comms/core/tests/tests/substream_stress.rs index 488ec9064cf..d64a5cb5bad 100644 --- a/comms/core/tests/tests/substream_stress.rs +++ b/comms/core/tests/tests/substream_stress.rs @@ -72,7 +72,7 @@ async fn run_stress_test(num_substreams: usize, num_iterations: usize, payload_s let mut conn = node1 .connectivity() - .dial_peer(node2.node_identity().node_id().clone()) + .dial_peer(node2.node_identity().node_id().clone(), false) .await .unwrap(); diff --git a/comms/dht/examples/memory_net/utilities.rs b/comms/dht/examples/memory_net/utilities.rs index 3d07a3edd53..7812a53e8a1 100644 --- a/comms/dht/examples/memory_net/utilities.rs +++ b/comms/dht/examples/memory_net/utilities.rs @@ -631,8 +631,13 @@ fn connection_manager_logger( println!("'{}' connected to '{}'", node_name, get_name(conn.peer_node_id()),); }, }, - PeerDisconnected(_, node_id) => { - println!("'{}' disconnected from '{}'", get_name(node_id), node_name); + PeerDisconnected(_, node_id, minimized) => { + println!( + "'{}' disconnected from '{}', {:?}", + get_name(node_id), + node_name, + minimized + ); }, PeerConnectFailed(node_id, err) => { println!( diff --git a/comms/dht/src/actor.rs b/comms/dht/src/actor.rs index 819cf7075b4..2ae9ad59d28 100644 --- a/comms/dht/src/actor.rs +++ b/comms/dht/src/actor.rs @@ -833,7 +833,7 @@ impl DiscoveryDialTask { pub async fn run(&mut self, public_key: CommsPublicKey) -> Result { if self.peer_manager.exists(&public_key).await { let node_id = NodeId::from_public_key(&public_key); - match self.connectivity.dial_peer(node_id).await { + match self.connectivity.dial_peer(node_id, false).await { Ok(conn) => Ok(conn), Err(ConnectivityError::ConnectionFailed(err)) => match err { ConnectionManagerError::ConnectFailedMaximumAttemptsReached | @@ -870,7 +870,7 @@ impl DiscoveryDialTask { node_id, timer.elapsed() ); - let conn = self.connectivity.dial_peer(node_id).await?; + let conn = self.connectivity.dial_peer(node_id, false).await?; Ok(conn) } } diff --git a/comms/dht/src/config.rs b/comms/dht/src/config.rs index 4937eaacb44..ec502499bc7 100644 --- a/comms/dht/src/config.rs +++ b/comms/dht/src/config.rs @@ -50,6 +50,9 @@ pub struct DhtConfig { /// Number of random peers to include /// Default: 4 pub num_random_nodes: usize, + /// Connections above the configured number of neighbouring and random nodes will be removed + /// (default: false) + pub minimize_connections: bool, /// Send to this many peers when using the broadcast strategy /// Default: 8 pub broadcast_factor: usize, @@ -169,6 +172,7 @@ impl Default for DhtConfig { protocol_version: DhtProtocolVersion::latest(), num_neighbouring_nodes: 8, num_random_nodes: 4, + minimize_connections: false, propagation_factor: 4, broadcast_factor: 8, outbound_buffer_size: 20, diff --git a/comms/dht/src/connectivity/mod.rs b/comms/dht/src/connectivity/mod.rs index 90943963925..f9de2f4f303 100644 --- a/comms/dht/src/connectivity/mod.rs +++ b/comms/dht/src/connectivity/mod.rs @@ -47,7 +47,8 @@ use tari_comms::{ ConnectivitySelection, }, multiaddr, - peer_manager::{NodeDistance, NodeId, PeerManagerError, PeerQuery, PeerQuerySortBy}, + peer_manager::{NodeDistance, NodeId, Peer, PeerManagerError, PeerQuery, PeerQuerySortBy}, + Minimized, NodeIdentity, PeerConnection, PeerManager, @@ -84,6 +85,8 @@ pub(crate) struct DhtConnectivity { neighbours: Vec, /// A randomly-selected set of peers, excluding neighbouring peers. random_pool: Vec, + /// The random pool history. + previous_random: Vec, /// Used to track when the random peer pool was last refreshed random_pool_last_refresh: Option, /// Holds references to peer connections that should be kept alive @@ -121,6 +124,7 @@ impl DhtConnectivity { dht_events, cooldown_in_effect: None, shutdown_signal, + previous_random: vec![], } } @@ -361,6 +365,7 @@ impl DhtConnectivity { } async fn refresh_neighbour_pool(&mut self) -> Result<(), DhtConnectivityError> { + self.remove_allow_list_peers_from_pools(); let mut new_neighbours = self .fetch_neighbouring_peers(self.config.num_neighbouring_nodes, &[]) .await?; @@ -385,14 +390,9 @@ impl DhtConnectivity { debug!( target: LOG_TARGET, - "Adding {} neighbouring peer(s), removing {} peers", - new_neighbours.len(), - difference.len() - ); - debug!( - target: LOG_TARGET, - "Adding {} peer(s) to DHT connectivity manager: {}", + "Adding {} neighbouring peer(s), removing {} peers: {}", new_neighbours.len(), + difference.len(), new_neighbours .iter() .map(ToString::to_string) @@ -401,14 +401,26 @@ impl DhtConnectivity { ); new_neighbours.iter().cloned().for_each(|peer| { - self.insert_neighbour(peer); + self.insert_neighbour_ordered_by_distance(peer); }); + self.redial_neighbours_as_required().await?; - if !new_neighbours.is_empty() { - self.connectivity.request_many_dials(new_neighbours).await?; - } + Ok(()) + } - self.redial_neighbours_as_required().await?; + async fn dial_multiple_peers(&self, peers_to_dial: &[NodeId]) -> Result<(), DhtConnectivityError> { + let mut peers_to_dial = peers_to_dial.iter().map(|v| (v.clone(), false)).collect::>(); + + if self.config.minimize_connections { + let connected_nodes = self.nodes_with_active_connections().await?; + let threshold = self.config.num_neighbouring_nodes; + if connected_nodes.len() >= threshold { + peers_to_dial = peers_to_dial.into_iter().map(|v| (v.0, true)).collect::>(); + } + } + if !peers_to_dial.is_empty() { + self.connectivity.request_many_dials(peers_to_dial).await?; + } Ok(()) } @@ -432,7 +444,7 @@ impl DhtConnectivity { "Redialling {} disconnected peer(s)", to_redial.len() ); - self.connectivity.request_many_dials(to_redial).await?; + self.dial_multiple_peers(&to_redial).await?; } Ok(()) @@ -451,9 +463,12 @@ impl DhtConnectivity { } async fn refresh_random_pool(&mut self) -> Result<(), DhtConnectivityError> { - let mut random_peers = self - .fetch_random_peers(self.config.num_random_nodes, &self.neighbours) - .await?; + self.remove_allow_list_peers_from_pools(); + let mut exclude = self.neighbours.clone(); + if self.config.minimize_connections { + exclude.extend(self.previous_random.iter().cloned()); + } + let mut random_peers = self.fetch_random_peers(self.config.num_random_nodes, &exclude).await?; if random_peers.is_empty() { info!( target: LOG_TARGET, @@ -482,12 +497,14 @@ impl DhtConnectivity { random_peers, difference ); - self.random_pool.extend(random_peers.clone()); + for peer in &random_peers { + self.insert_random_peer_ordered_by_distance(peer.clone()); + } // Drop any connection handles that removed from the random pool difference.iter().for_each(|peer| { self.remove_connection_handle(peer); }); - self.connectivity.request_many_dials(random_peers).await?; + self.dial_multiple_peers(&random_peers).await?; self.random_pool_last_refresh = Some(Instant::now()); Ok(()) @@ -506,7 +523,7 @@ impl DhtConnectivity { if self.is_pool_peer(conn.peer_node_id()) { debug!( target: LOG_TARGET, - "Added peer {} to connection handles", + "Added pool peer {} to connection handles", conn.peer_node_id() ); self.insert_connection_handle(conn); @@ -523,21 +540,59 @@ impl DhtConnectivity { ); let peer_to_insert = conn.peer_node_id().clone(); - self.insert_connection_handle(conn); - if let Some(node_id) = self.insert_neighbour(peer_to_insert.clone()) { - // If we kicked a neighbour out of our neighbour pool but the random pool is not full. - // Add the neighbour to the random pool, otherwise remove the handle from the connection pool - if self.random_pool.len() < self.config.num_random_nodes { - debug!( - target: LOG_TARGET, - "Moving peer '{}' from neighbouring pool to random pool", peer_to_insert - ); - self.random_pool.push(node_id); - } else { - self.remove_connection_handle(&node_id) - } + if let Some(node_id) = self.insert_neighbour_ordered_by_distance(peer_to_insert.clone()) { + // If we kicked a neighbour out of our neighbour pool, add it to the random pool if + // it is not full or if it is closer than the furthest random peer. + debug!( + target: LOG_TARGET, + "Moving peer '{}' from neighbouring pool to random pool if not full or closer", peer_to_insert + ); + self.insert_random_peer_ordered_by_distance(node_id) } } + self.insert_connection_handle(conn); + + Ok(()) + } + + async fn nodes_with_active_connections(&self) -> Result, DhtConnectivityError> { + let query = PeerQuery::new() + .select_where(|peer| { + self.connection_handles + .iter() + .any(|conn| conn.peer_node_id() == &peer.node_id && peer.features.is_node()) + }) + .sort_by(PeerQuerySortBy::DistanceFrom(self.node_identity.node_id())); + let mut peers_by_distance = self.peer_manager.perform_query(query).await?; + let peer_allow_list = self.peer_allow_list(); + peers_by_distance.retain(|p| !peer_allow_list.contains(&p.node_id)); + debug!( + target: LOG_TARGET, + "minimize_connections: Filtered peers: {}, Handles: {}", + peers_by_distance.len(), + self.connection_handles.len(), + ); + Ok(peers_by_distance) + } + + async fn minimize_connections(&mut self) -> Result<(), DhtConnectivityError> { + // Retrieve all communication node peers with an active connection status + let mut peers_by_distance = self.nodes_with_active_connections().await?; + + // Remove all above threshold connections + let threshold = self.config.num_neighbouring_nodes + self.config.num_random_nodes; + for peer in peers_by_distance.iter_mut().skip(threshold) { + debug!( + target: LOG_TARGET, + "minimize_connections: Disconnecting '{}' because the node is not among the {} closest peers", + peer.node_id, + threshold + ); + // Remove from managed pool if applicable + self.replace_pool_peer(&peer.node_id).await?; + // In case the connections was not managed, remove the connection handle + self.remove_connection_handle(&peer.node_id); + } Ok(()) } @@ -562,7 +617,26 @@ impl DhtConnectivity { debug!(target: LOG_TARGET, "Connectivity event: {}", event); match event { PeerConnected(conn) => { - self.handle_new_peer_connected(*conn).await?; + self.handle_new_peer_connected(*conn.clone()).await?; + debug!( + target: LOG_TARGET, + "Peer: node_id '{}', allow_list '{}', ignore_saf '{}', connected '{}'", + conn.peer_node_id(), + conn.get_peer_on_allow_list(), + conn.ignore_saf(), + conn.is_connected(), + ); + if conn.get_peer_on_allow_list() { + debug!( + target: LOG_TARGET, + "Allow list: ({})", + self.peer_allow_list().iter().map(ToString::to_string).collect::>().join(", ") + ); + } + + if self.config.minimize_connections { + self.minimize_connections().await?; + } }, PeerConnectFailed(node_id) => { self.connection_handles.retain(|c| *c.peer_node_id() != node_id); @@ -579,7 +653,17 @@ impl DhtConnectivity { self.replace_pool_peer(&node_id).await?; self.log_status(); }, - PeerDisconnected(node_id) => { + PeerDisconnected(node_id, minimized) => { + if let Some(conn) = self.connection_handles.iter().find(|c| c.peer_node_id() == &node_id) { + debug!( + target: LOG_TARGET, + "Peer: node_id '{}', allow_list '{}', ignore_saf '{}', connected '{}'", + conn.peer_node_id(), + conn.get_peer_on_allow_list(), + conn.ignore_saf(), + conn.is_connected(), + ); + } self.connection_handles.retain(|c| *c.peer_node_id() != node_id); if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() { debug!( @@ -591,10 +675,22 @@ impl DhtConnectivity { debug!(target: LOG_TARGET, "{} is not managed by the DHT. Ignoring", node_id); return Ok(()); } + if minimized == Minimized::Yes || self.config.minimize_connections { + debug!( + target: LOG_TARGET, + "Peer '{}' was disconnected because it was minimized, will not reconnect.", + node_id + ); + // Remove from managed pool if applicable + self.replace_pool_peer(&node_id).await?; + // In case the connections was not managed, remove the connection handle + self.remove_connection_handle(&node_id); + return Ok(()); + } debug!(target: LOG_TARGET, "Pool peer {} disconnected. Redialling...", node_id); // Attempt to reestablish the lost connection to the pool peer. If reconnection fails, // it is replaced with another peer (replace_pool_peer via PeerConnectFailed) - self.connectivity.request_many_dials([node_id]).await?; + self.dial_multiple_peers(&[node_id]).await?; }, ConnectivityStateOnline(n) => { self.refresh_peer_pools().await?; @@ -621,15 +717,24 @@ impl DhtConnectivity { Ok(()) } + fn peer_allow_list(&self) -> Vec { + self.connection_handles + .iter() + .filter(|c| c.get_peer_on_allow_list()) + .map(|p| p.peer_node_id().clone()) + .collect::>() + } + async fn replace_pool_peer(&mut self, current_peer: &NodeId) -> Result<(), DhtConnectivityError> { if self.random_pool.contains(current_peer) { - let exclude = self.get_pool_peers(); - let pos = self - .random_pool - .iter() - .position(|n| n == current_peer) - .expect("unreachable panic"); - self.random_pool.swap_remove(pos); + let mut exclude = self.get_pool_peers(); + if self.config.minimize_connections { + exclude.extend(self.previous_random.iter().cloned()); + self.previous_random.push(current_peer.clone()); + } + + self.random_pool.retain(|n| n != current_peer); + self.remove_connection_handle(current_peer); debug!( target: LOG_TARGET, @@ -637,12 +742,8 @@ impl DhtConnectivity { ); match self.fetch_random_peers(1, &exclude).await?.pop() { Some(new_peer) => { - self.remove_connection_handle(current_peer); - if let Some(pos) = self.random_pool.iter().position(|n| n == current_peer) { - self.random_pool.swap_remove(pos); - } - self.random_pool.push(new_peer.clone()); - self.connectivity.request_many_dials([new_peer]).await?; + self.insert_random_peer_ordered_by_distance(new_peer.clone()); + self.dial_multiple_peers(&[new_peer]).await?; }, None => { debug!( @@ -658,25 +759,18 @@ impl DhtConnectivity { if self.neighbours.contains(current_peer) { let exclude = self.get_pool_peers(); - let pos = self - .neighbours - .iter() - .position(|n| n == current_peer) - .expect("unreachable panic"); - self.neighbours.remove(pos); + + self.neighbours.retain(|n| n != current_peer); + self.remove_connection_handle(current_peer); debug!( target: LOG_TARGET, "Peer '{}' in neighbour pool is offline. Adding a new peer if possible", current_peer ); match self.fetch_neighbouring_peers(1, &exclude).await?.pop() { - Some(node_id) => { - self.remove_connection_handle(current_peer); - if let Some(pos) = self.neighbours.iter().position(|n| n == current_peer) { - self.neighbours.remove(pos); - } - self.insert_neighbour(node_id.clone()); - self.connectivity.request_many_dials([node_id]).await?; + Some(new_peer) => { + self.insert_neighbour_ordered_by_distance(new_peer.clone()); + self.dial_multiple_peers(&[new_peer]).await?; }, None => { info!( @@ -690,39 +784,72 @@ impl DhtConnectivity { } } + self.log_status(); + Ok(()) } - fn insert_neighbour(&mut self, node_id: NodeId) -> Option { + fn insert_neighbour_ordered_by_distance(&mut self, node_id: NodeId) -> Option { let dist = node_id.distance(self.node_identity.node_id()); let pos = self .neighbours .iter() .position(|node_id| node_id.distance(self.node_identity.node_id()) > dist); - let removed_peer = if self.neighbours.len() + 1 > self.config.num_neighbouring_nodes { + match pos { + Some(idx) => { + self.neighbours.insert(idx, node_id); + }, + None => { + self.neighbours.push(node_id); + }, + } + + if self.neighbours.len() > self.config.num_neighbouring_nodes { self.neighbours.pop() } else { None - }; + } + } + + fn insert_random_peer_ordered_by_distance(&mut self, node_id: NodeId) { + let dist = node_id.distance(self.node_identity.node_id()); + let pos = self + .random_pool + .iter() + .position(|node_id| node_id.distance(self.node_identity.node_id()) > dist); match pos { Some(idx) => { - self.neighbours.insert(idx, node_id); + self.random_pool.insert(idx, node_id); }, None => { - self.neighbours.push(node_id); + self.random_pool.push(node_id); }, } - removed_peer + if self.random_pool.len() > self.config.num_random_nodes { + if let Some(removed_peer) = self.random_pool.pop() { + if self.config.minimize_connections { + self.previous_random.push(removed_peer.clone()); + } + } + } + } + + fn remove_allow_list_peers_from_pools(&mut self) { + let allow_list = self.peer_allow_list(); + self.neighbours.retain(|n| !allow_list.contains(n)); + self.random_pool.retain(|n| !allow_list.contains(n)); } - fn is_pool_peer(&self, node_id: &NodeId) -> bool { + fn is_pool_peer(&mut self, node_id: &NodeId) -> bool { + self.remove_allow_list_peers_from_pools(); self.neighbours.contains(node_id) || self.random_pool.contains(node_id) } - fn get_pool_peers(&self) -> Vec { + fn get_pool_peers(&mut self) -> Vec { + self.remove_allow_list_peers_from_pools(); self.neighbours.iter().chain(self.random_pool.iter()).cloned().collect() } @@ -750,6 +877,8 @@ impl DhtConnectivity { let peer_manager = &self.peer_manager; let node_id = self.node_identity.node_id(); let connected = self.connected_peers_iter().collect::>(); + let mut excluded = excluded.to_vec(); + excluded.extend(self.peer_allow_list()); // Fetch to all n nearest neighbour Communication Nodes // which are eligible for connection. @@ -781,7 +910,7 @@ impl DhtConnectivity { return false; } // we have tried to connect to this peer, and we have never made a successful attempt at connection - if peer.last_connect_attempt().is_some() && peer.last_seen().is_none() { + if peer.all_addresses_failed() { return false; } @@ -790,6 +919,15 @@ impl DhtConnectivity { return false; } + if self.config.minimize_connections { + // If the peer is not closer, return false + let dist = self.node_identity.node_id().distance(&peer.node_id); + let neighbour_distance = self.get_neighbour_max_distance(); + if dist >= neighbour_distance { + return false; + } + } + true }) .sort_by(PeerQuerySortBy::DistanceFrom(node_id)) @@ -801,7 +939,9 @@ impl DhtConnectivity { } async fn fetch_random_peers(&self, n: usize, excluded: &[NodeId]) -> Result, DhtConnectivityError> { - let peers = self.peer_manager.random_peers(n, excluded).await?; + let mut excluded = excluded.to_vec(); + excluded.extend(self.peer_allow_list()); + let peers = self.peer_manager.random_peers(n, &excluded).await?; Ok(peers.into_iter().map(|p| p.node_id).collect()) } diff --git a/comms/dht/src/connectivity/test.rs b/comms/dht/src/connectivity/test.rs index 3120aa075b4..376f05929f5 100644 --- a/comms/dht/src/connectivity/test.rs +++ b/comms/dht/src/connectivity/test.rs @@ -31,6 +31,7 @@ use tari_comms::{ mocks::{create_connectivity_mock, create_dummy_peer_connection, ConnectivityManagerMockState}, node_identity::ordered_node_identities_by_distance, }, + Minimized, NodeIdentity, PeerManager, }; @@ -192,6 +193,7 @@ async fn replace_peer_when_peer_goes_offline() { connectivity.publish_event(ConnectivityEvent::PeerDisconnected( node_identities[4].node_id().clone(), + Minimized::No, )); async_assert!( @@ -242,12 +244,16 @@ async fn insert_neighbour() { // First 8 inserts should not remove a peer (because num_neighbouring_nodes == 8) for ni in shuffled.iter().take(8) { - assert!(dht_connectivity.insert_neighbour(ni.node_id().clone()).is_none()); + assert!(dht_connectivity + .insert_neighbour_ordered_by_distance(ni.node_id().clone()) + .is_none()); } // Next 2 inserts will always remove a node id for ni in shuffled.iter().skip(8) { - assert!(dht_connectivity.insert_neighbour(ni.node_id().clone()).is_some()) + assert!(dht_connectivity + .insert_neighbour_ordered_by_distance(ni.node_id().clone()) + .is_some()) } // Check the first 7 node ids match our neighbours, the last element depends on distance and ordering of inserts diff --git a/comms/dht/src/network_discovery/discovering.rs b/comms/dht/src/network_discovery/discovering.rs index 44006396272..86b8a374621 100644 --- a/comms/dht/src/network_discovery/discovering.rs +++ b/comms/dht/src/network_discovery/discovering.rs @@ -291,6 +291,7 @@ impl Discovering { } fn dial_all_candidates(&self) -> impl Stream> + 'static { + let ignore_saf_if_further = self.params.ignore_saf_if_further.clone(); let pending_dials = self .params .peers @@ -298,7 +299,12 @@ impl Discovering { .map(|peer| { let connectivity = self.context.connectivity.clone(); let peer = peer.clone(); - async move { connectivity.dial_peer(peer).await } + let bypass_saf = if let Some(distance) = &ignore_saf_if_further { + &self.context.node_identity.node_id().distance(&peer) >= distance + } else { + false + }; + async move { connectivity.dial_peer(peer, bypass_saf).await } }) .collect::>(); diff --git a/comms/dht/src/network_discovery/ready.rs b/comms/dht/src/network_discovery/ready.rs index 318e377df5f..81ba0b9e5f5 100644 --- a/comms/dht/src/network_discovery/ready.rs +++ b/comms/dht/src/network_discovery/ready.rs @@ -21,7 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use log::*; -use tari_comms::peer_manager::PeerFeatures; +use tari_comms::peer_manager::{NodeDistance, PeerFeatures}; use super::{ state_machine::{DiscoveryParams, NetworkDiscoveryContext, StateEvent}, @@ -59,6 +59,36 @@ impl DiscoveryReady { let num_peers = self.context.peer_manager.count().await; debug!(target: LOG_TARGET, "Peer list currently contains {} entries", num_peers); + let ignore_saf_if_further = if self.config().minimize_connections { + let active_connections = self.context.connectivity.get_active_connections().await?; + let mut active_connections_with_distance = active_connections + .into_iter() + .map(|conn| { + let distance = self + .context + .node_identity + .node_id() + .distance(&conn.peer_node_id().clone()); + (conn, distance) + }) + .collect::>(); + active_connections_with_distance.sort_by(|a, b| a.1.cmp(&b.1)); + let saf_ignore_distance = active_connections_with_distance + .get(self.context.config.num_neighbouring_nodes - 1) + .map(|(_, distance)| distance) + .cloned() + .unwrap_or_else(|| { + active_connections_with_distance + .last() + .map(|(_, distance)| distance) + .cloned() + .unwrap_or(NodeDistance::max_distance()) + }); + Some(saf_ignore_distance) + } else { + None + }; + // We don't have many peers - let's aggressively probe for them if num_peers < self.context.config.network_discovery.min_desired_peers { if self.context.num_rounds() >= self.config().network_discovery.idle_after_num_rounds { @@ -99,6 +129,7 @@ impl DiscoveryReady { return Ok(StateEvent::BeginDiscovery(DiscoveryParams { num_peers_to_request: self.config().network_discovery.max_peers_to_sync_per_round, peers, + ignore_saf_if_further, })); } @@ -189,6 +220,7 @@ impl DiscoveryReady { Ok(StateEvent::BeginDiscovery(DiscoveryParams { num_peers_to_request: self.config().network_discovery.max_peers_to_sync_per_round, peers, + ignore_saf_if_further, })) } diff --git a/comms/dht/src/network_discovery/state_machine.rs b/comms/dht/src/network_discovery/state_machine.rs index 78281d56b96..953b720fc36 100644 --- a/comms/dht/src/network_discovery/state_machine.rs +++ b/comms/dht/src/network_discovery/state_machine.rs @@ -32,7 +32,12 @@ use std::{ use futures::{future, future::Either}; use log::*; -use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId, NodeIdentity, PeerManager}; +use tari_comms::{ + connectivity::ConnectivityRequester, + peer_manager::{NodeDistance, NodeId}, + NodeIdentity, + PeerManager, +}; use tari_shutdown::ShutdownSignal; use tokio::{ sync::{broadcast, RwLock}, @@ -295,6 +300,7 @@ where Fut: Future + Unpin { #[derive(Debug, Clone)] pub struct DiscoveryParams { pub peers: Vec, + pub ignore_saf_if_further: Option, pub num_peers_to_request: u32, } @@ -302,13 +308,14 @@ impl Display for DiscoveryParams { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "DiscoveryParams({} peer(s) ({}), num_peers_to_request = {})", + "DiscoveryParams({} peer(s) ({}), num_peers_to_request = {}, ignore_saf_if_further ({}))", self.peers.len(), self.peers.iter().fold(String::new(), |mut peers, p| { let _ = write!(peers, "{p}, "); peers }), - self.num_peers_to_request + self.num_peers_to_request, + self.ignore_saf_if_further.is_some(), ) } } diff --git a/comms/dht/src/store_forward/service.rs b/comms/dht/src/store_forward/service.rs index f3b09b36438..05bbe82df6f 100644 --- a/comms/dht/src/store_forward/service.rs +++ b/comms/dht/src/store_forward/service.rs @@ -372,7 +372,7 @@ impl StoreAndForwardService { // Whenever we connect to a peer, request SAF messages let features = self.peer_manager.get_peer_features(conn.peer_node_id()).await?; - if features.contains(PeerFeatures::DHT_STORE_FORWARD) { + if features.contains(PeerFeatures::DHT_STORE_FORWARD) && !conn.ignore_saf() { info!( target: LOG_TARGET, "Connected peer '{}' is a SAF node. Requesting stored messages.", diff --git a/comms/dht/tests/dht.rs b/comms/dht/tests/dht.rs index 09ed64aa60e..ecb2e5dbbdc 100644 --- a/comms/dht/tests/dht.rs +++ b/comms/dht/tests/dht.rs @@ -141,7 +141,7 @@ async fn test_dht_discover_propagation() { node_D .comms .connectivity() - .dial_peer(node_C.comms.node_identity().node_id().clone()) + .dial_peer(node_C.comms.node_identity().node_id().clone(), false) .await .unwrap(); @@ -328,7 +328,7 @@ async fn test_dht_propagate_dedup() { node1 .comms .connectivity() - .dial_peer(node2.node_identity().node_id().clone()) + .dial_peer(node2.node_identity().node_id().clone(), false) .await .unwrap(); } @@ -462,21 +462,21 @@ async fn test_dht_do_not_store_invalid_message_in_dedup() { node_A .comms .connectivity() - .dial_peer(node_B.node_identity().node_id().clone()) + .dial_peer(node_B.node_identity().node_id().clone(), false) .await .unwrap(); node_A .comms .connectivity() - .dial_peer(node_C.node_identity().node_id().clone()) + .dial_peer(node_C.node_identity().node_id().clone(), false) .await .unwrap(); node_B .comms .connectivity() - .dial_peer(node_C.node_identity().node_id().clone()) + .dial_peer(node_C.node_identity().node_id().clone(), false) .await .unwrap(); @@ -626,7 +626,7 @@ async fn test_dht_repropagate() { node1 .comms .connectivity() - .dial_peer(node2.node_identity().node_id().clone()) + .dial_peer(node2.node_identity().node_id().clone(), false) .await .unwrap(); } @@ -729,7 +729,7 @@ async fn test_dht_propagate_message_contents_not_malleable_ban() { node_A .comms .connectivity() - .dial_peer(node_B.node_identity().node_id().clone()) + .dial_peer(node_B.node_identity().node_id().clone(), false) .await .unwrap(); @@ -834,7 +834,7 @@ async fn test_dht_header_not_malleable() { node_A .comms .connectivity() - .dial_peer(node_B.node_identity().node_id().clone()) + .dial_peer(node_B.node_identity().node_id().clone(), false) .await .unwrap();