Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 1, 2024
1 parent e36b472 commit 91582eb
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 67 deletions.
2 changes: 1 addition & 1 deletion applications/minotari_console_wallet/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tari_common::configuration::{ConfigOverrideProvider, Network};
use tari_common_types::tari_address::TariAddress;
use tari_core::transactions::{tari_amount, tari_amount::MicroMinotari};
use tari_key_manager::SeedWords;
use tari_network::multiaddr::Multiaddr;
use tari_network::{multiaddr::Multiaddr, ReachabilityMode};
use tari_utilities::{
hex::{Hex, HexError},
SafePassword,
Expand Down
14 changes: 12 additions & 2 deletions applications/minotari_node/src/commands/command/add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ use anyhow::Error;
use async_trait::async_trait;
use clap::Parser;
use minotari_app_utilities::utilities::UniPublicKey;
use tari_network::{multiaddr::Multiaddr, swarm::dial_opts::DialOpts, NetworkingService, ToPeerId};
use tari_network::{
multiaddr::Multiaddr,
swarm::dial_opts::{DialOpts, PeerCondition},
NetworkingService,
ToPeerId,
};

use super::{CommandContext, HandleCommand};

Expand All @@ -50,7 +55,12 @@ impl HandleCommand<ArgsAddPeer> for CommandContext {
let timer = Instant::now();
let dial = self
.network
.dial_peer(DialOpts::peer_id(peer_id).addresses(vec![args.address]).build())
.dial_peer(
DialOpts::peer_id(peer_id)
.condition(PeerCondition::Always)
.addresses(vec![args.address])
.build(),
)
.await?;
println!("Peer with node id '{}' was added to the base node. Dialing...", peer_id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

use std::{
fmt::Display,
ops::Deref,
sync::{atomic, atomic::AtomicUsize, Arc, Mutex},
time::{Duration, Instant},
};

Expand All @@ -33,10 +35,10 @@ use crate::connectivity_service::WalletConnectivityError;
#[derive(Clone)]
pub struct BaseNodePeerManager {
// The current base node that the wallet is connected to
current_peer_index: usize,
current_peer_index: Arc<AtomicUsize>,
// The other base nodes that the wallet can connect to if the selected peer is not available
peer_list: Vec<Peer>,
last_connection_attempt: Option<LastConnectionAttempt>,
peer_list: Arc<Vec<Peer>>,
last_connection_attempt: Arc<Mutex<Option<LastConnectionAttempt>>>,
}

#[derive(Clone, Debug)]
Expand All @@ -60,9 +62,9 @@ impl BaseNodePeerManager {
)));
}
Ok(Self {
current_peer_index: preferred_peer_index,
peer_list,
last_connection_attempt: None,
current_peer_index: Arc::new(AtomicUsize::new(preferred_peer_index)),
peer_list: Arc::new(peer_list),
last_connection_attempt: Arc::new(Mutex::new(None)),
})
}

Expand All @@ -71,36 +73,47 @@ impl BaseNodePeerManager {
self.get_current_peer().peer_id()
}

/// Get the current peer
/// Get the current peer.
pub fn get_current_peer(&self) -> &Peer {
self.peer_list
.get(self.current_peer_index)
.get(self.current_peer_index())
// Panic: cannot panic because this instance cannot be constructed with an empty peer_list
.unwrap_or(&self.peer_list[0])
}

/// Changes to the next peer in the list, returning that peer
pub fn select_next_peer(&mut self) -> &Peer {
self.current_peer_index = (self.current_peer_index + 1) % self.peer_list.len();
&self.peer_list[self.current_peer_index]
self.set_current_peer_index((self.current_peer_index() + 1) % self.peer_list.len());
&self.peer_list[self.current_peer_index()]
}

pub fn peer_list(&self) -> &[Peer] {
&self.peer_list
}

/// Get the base node peer manager state
pub fn get_state(&self) -> (usize, &[Peer]) {
(self.current_peer_index, &self.peer_list)
(self.current_peer_index(), &self.peer_list)
}

/// Set the last connection attempt stats
pub fn set_last_connection_attempt(&mut self) {
self.last_connection_attempt = Some(LastConnectionAttempt {
peer_index: self.current_peer_index,
let mut lock = self
.last_connection_attempt
.lock()
// In the currently impossible case that a panic occurs while this mutex is unlocked, we'll simply recover to use the previous value before the panic.
.unwrap_or_else(|e| e.into_inner());
*lock = Some(LastConnectionAttempt {
peer_index: self.current_peer_index(),
attempt_time: Instant::now(),
})
});
}

/// Get the last connection attempt stats
pub fn time_since_last_connection_attempt(&self) -> Option<Duration> {
if let Some(stats) = self.last_connection_attempt.clone() {
if stats.peer_index == self.current_peer_index {
let lock = self.last_connection_attempt.lock().unwrap_or_else(|e| e.into_inner());
if let Some(stats) = lock.deref() {
if stats.peer_index == self.current_peer_index() {
Some(stats.attempt_time.elapsed())
} else {
None
Expand All @@ -109,6 +122,14 @@ impl BaseNodePeerManager {
None
}
}

fn set_current_peer_index(&self, index: usize) {
self.current_peer_index.store(index, atomic::Ordering::SeqCst);
}

fn current_peer_index(&self) -> usize {
self.current_peer_index.load(atomic::Ordering::SeqCst)
}
}

impl Display for BaseNodePeerManager {
Expand All @@ -120,7 +141,7 @@ impl Display for BaseNodePeerManager {
write!(
f,
"BaseNodePeerManager {{ current index: {}, last attempt (s): {}, peer list: {} entries }}",
self.current_peer_index,
self.current_peer_index(),
last_connection_attempt,
self.peer_list.len()
)
Expand Down
5 changes: 0 additions & 5 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ impl WalletConnectivityHandle {
#[async_trait::async_trait]
impl WalletConnectivityInterface for WalletConnectivityHandle {
fn set_base_node(&mut self, base_node_peer_manager: BaseNodePeerManager) {
if let Some(selected_peer) = self.base_node_watch.borrow().as_ref() {
if selected_peer.get_current_peer_id() == base_node_peer_manager.get_current_peer_id() {
return;
}
}
self.base_node_watch.send(Some(base_node_peer_manager));
}

Expand Down
85 changes: 46 additions & 39 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
collections::{HashMap, HashSet},
mem,
time::Duration,
};
use std::{collections::HashSet, mem, time::Duration};

use log::*;
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tari_network::{identity::PeerId, DialError, NetworkHandle, NetworkingService, ToPeerId};
use tari_network::{identity::PeerId, DialError, NetworkHandle, NetworkingService};
use tari_rpc_framework::{
pool::{RpcClientLease, RpcClientPool},
RpcClient,
Expand All @@ -47,7 +43,7 @@ use crate::{
};

const LOG_TARGET: &str = "wallet::connectivity";
pub(crate) const CONNECTIVITY_WAIT: u64 = 5;
pub(crate) const CONNECTIVITY_WAIT: Duration = Duration::from_secs(5);

/// Connection status of the Base Node
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
Expand All @@ -63,12 +59,13 @@ pub struct WalletConnectivityService {
network_handle: NetworkHandle,
base_node_watch_receiver: watch::Receiver<Option<BaseNodePeerManager>>,
base_node_watch: Watch<Option<BaseNodePeerManager>>,
pools: HashMap<PeerId, ClientPoolContainer>,
current_pool: Option<ClientPoolContainer>,
online_status_watch: Watch<OnlineStatus>,
pending_requests: Vec<ReplyOneshot>,
}

struct ClientPoolContainer {
pub peer_id: PeerId,
pub base_node_wallet_rpc_client: RpcClientPool<NetworkHandle, BaseNodeWalletRpcClient>,
pub base_node_sync_rpc_client: RpcClientPool<NetworkHandle, BaseNodeSyncRpcClient>,
}
Expand All @@ -94,7 +91,7 @@ impl WalletConnectivityService {
network_handle,
base_node_watch_receiver: base_node_watch.get_receiver(),
base_node_watch,
pools: HashMap::new(),
current_pool: None,
pending_requests: Vec::new(),
online_status_watch,
}
Expand All @@ -116,7 +113,7 @@ impl WalletConnectivityService {
if self.base_node_watch_receiver.borrow().is_some() {
// This will block the rest until the connection is established. This is what we want.
trace!(target: LOG_TARGET, "start: base_node_watch_receiver.changed");
self.check_connection().await;
self.check_connection_and_connect_if_required().await;
}
},

Expand All @@ -126,20 +123,24 @@ impl WalletConnectivityService {

_ = check_connection.tick() => {
trace!(target: LOG_TARGET, "start: check_connection.tick");
self.check_connection().await;
self.check_connection_and_connect_if_required().await;
}
}
}
}

async fn check_connection(&mut self) {
async fn check_connection_and_connect_if_required(&mut self) {
if let Some(peer_manager) = self.get_base_node_peer_manager() {
let current_base_node = peer_manager.get_current_peer_id();
trace!(target: LOG_TARGET, "check_connection: has current_base_node");
if let Ok(Some(_)) = self.network_handle.get_connection(current_base_node).await {
trace!(target: LOG_TARGET, "check_connection: has connection");
if let Ok(Some(conn)) = self.network_handle.get_connection(current_base_node).await {
trace!(target: LOG_TARGET, "check_connection: has connection with ID {}", conn.connection_id);
trace!(target: LOG_TARGET, "check_connection: is connected");
if self.pools.contains_key(&current_base_node) {
if self
.current_pool
.as_ref()
.map_or(false, |p| p.peer_id == current_base_node)
{
trace!(target: LOG_TARGET, "check_connection: has rpc pool");
trace!(target: LOG_TARGET, "check_connection: rpc pool is already connected");
self.set_online_status(OnlineStatus::Online);
Expand All @@ -153,8 +154,7 @@ impl WalletConnectivityService {
"check_connection: current base node has no connection, setup connection to: '{}'",
peer_manager
);
self.set_online_status(OnlineStatus::Connecting);
self.setup_base_node_connection().await;
self.setup_base_node_connection(peer_manager).await;
} else {
self.set_online_status(OnlineStatus::Offline);
debug!(target: LOG_TARGET, "Base node peer manager has not been set, cannot connect");
Expand Down Expand Up @@ -200,8 +200,8 @@ impl WalletConnectivityService {
return;
};

match self.pools.get(&node_id) {
Some(pools) => match pools.base_node_wallet_rpc_client.get().await {
match self.current_pool {
Some(ref pools) => match pools.base_node_wallet_rpc_client.get().await {
Ok(client) => {
debug!(target: LOG_TARGET, "Obtained pool RPC 'wallet' connection to base node '{}'", node_id);
let _result = reply.send(client);
Expand Down Expand Up @@ -241,8 +241,8 @@ impl WalletConnectivityService {
return;
};

match self.pools.get(&node_id) {
Some(pools) => match pools.base_node_sync_rpc_client.get().await {
match self.current_pool {
Some(ref pools) => match pools.base_node_sync_rpc_client.get().await {
Ok(client) => {
debug!(target: LOG_TARGET, "Obtained pool RPC 'sync' connection to base node '{}'", node_id);
let _result = reply.send(client);
Expand Down Expand Up @@ -278,49 +278,50 @@ impl WalletConnectivityService {
}

fn get_base_node_peer_manager(&self) -> Option<BaseNodePeerManager> {
self.base_node_watch_receiver.borrow().as_ref().map(|p| p.clone())
self.base_node_watch_receiver.borrow().as_ref().cloned()
}

async fn disconnect_base_node(&mut self, peer_id: PeerId) {
if let Some(pool) = self.pools.remove(&peer_id) {
if let Some(pool) = self.current_pool.take() {
pool.close().await;
}
if let Err(e) = self.network_handle.disconnect_peer(peer_id).await {
error!(target: LOG_TARGET, "Failed to disconnect base node: {}", e);
}
}

async fn setup_base_node_connection(&mut self) {
let Some(mut peer_manager) = self.get_base_node_peer_manager() else {
debug!(target: LOG_TARGET, "No base node peer manager set");
return;
};
async fn setup_base_node_connection(&mut self, mut peer_manager: BaseNodePeerManager) {
loop {
let peer_id = if let Some(_time) = peer_manager.time_since_last_connection_attempt() {
let next_peer_id = peer_manager.select_next_peer().peer_id();
if peer_manager.get_current_peer().peer_id() == next_peer_id {
self.set_online_status(OnlineStatus::Connecting);
let maybe_last_attempt = peer_manager.time_since_last_connection_attempt();
let peer_id = if maybe_last_attempt.is_some() {
if peer_manager.peer_list().len() == 1 {
// If we only have one peer in the list, wait a bit before retrying
debug!(target: LOG_TARGET,
"Retrying after {}s ...",
Duration::from_secs(CONNECTIVITY_WAIT).as_secs()
CONNECTIVITY_WAIT.as_secs()
);
time::sleep(Duration::from_secs(CONNECTIVITY_WAIT)).await;
time::sleep(CONNECTIVITY_WAIT).await;
}
next_peer_id

// Switch to next peer (if available)
peer_manager.select_next_peer().peer_id()
} else {
peer_manager.get_current_peer_id().to_peer_id()
peer_manager.get_current_peer_id()
};
peer_manager.set_last_connection_attempt();

debug!(
target: LOG_TARGET,
"Attempting to connect to base node peer '{}'... (last attempt {:?})",
peer_id,
peer_manager.time_since_last_connection_attempt()
maybe_last_attempt
);
self.disconnect_base_node(peer_id).await;

peer_manager.set_last_connection_attempt();

match self.try_setup_rpc_pool(peer_id).await {
Ok(true) => {
// NOTE: the "base node peer manager" is not shared so we need to update it
self.base_node_watch.send(Some(peer_manager.clone()));
if let Err(e) = self.notify_pending_requests().await {
warn!(target: LOG_TARGET, "Error notifying pending RPC requests: {}", e);
Expand All @@ -338,14 +339,17 @@ impl WalletConnectivityService {
"The peer has changed while connecting. Attempting to connect to new base node."
);
self.disconnect_base_node(peer_id).await;
self.set_online_status(OnlineStatus::Offline);
},
Err(WalletConnectivityError::DialError(DialError::Aborted)) => {
debug!(target: LOG_TARGET, "Dial was cancelled.");
self.disconnect_base_node(peer_id).await;
self.set_online_status(OnlineStatus::Offline);
},
Err(e) => {
warn!(target: LOG_TARGET, "{}", e);
self.disconnect_base_node(peer_id).await;
self.set_online_status(OnlineStatus::Offline);
},
}
if self.peer_list_change_detected(&peer_manager) {
Expand Down Expand Up @@ -386,6 +390,7 @@ impl WalletConnectivityService {

async fn try_setup_rpc_pool(&mut self, peer_id: PeerId) -> Result<bool, WalletConnectivityError> {
let container = ClientPoolContainer {
peer_id,
base_node_sync_rpc_client: self
.network_handle
.create_rpc_client_pool(1, RpcClient::builder(peer_id)),
Expand All @@ -405,7 +410,9 @@ impl WalletConnectivityService {
"Established peer connection to base node '{}'",
peer_id
);
self.pools.insert(peer_id, container);
if let Some(container) = self.current_pool.replace(container) {
container.close().await;
}

trace!(target: LOG_TARGET, "Created RPC pools for '{}'", peer_id);
Ok(true)
Expand Down
Loading

0 comments on commit 91582eb

Please sign in to comment.