Skip to content

Commit

Permalink
fix unable to connect after remote disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 6, 2024
1 parent b81761a commit c510744
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ async fn wait_for_comms(network: &NetworkHandle) -> Result<(), CommandError> {
loop {
tokio::select! {
// Wait for the first base node to identify
Ok(NetworkEvent::IdentifiedPeer { agent_version, .. }) = events.recv() => {
Ok(NetworkEvent::PeerIdentified { agent_version, .. }) = events.recv() => {
if agent_version.contains("basenode") {
println!("✅");
return Ok(());
Expand Down
18 changes: 9 additions & 9 deletions applications/minotari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl AppState {
pub async fn upsert_contact(&mut self, alias: String, tari_emoji: String) -> Result<(), UiError> {
let mut inner = self.inner.write().await;

let address = TariAddress::from_str(&tari_emoji).map_err(|_| UiError::PublicKeyParseError)?;
let address = TariAddress::from_str(&tari_emoji).map_err(|_| UiError::TariAddressParseError)?;

let contact = Contact::new(alias, address, None, None, false);
inner.wallet.contacts_service.upsert_contact(contact).await?;
Expand Down Expand Up @@ -262,7 +262,7 @@ impl AppState {

pub async fn delete_contact(&mut self, tari_emoji: String) -> Result<(), UiError> {
let mut inner = self.inner.write().await;
let address = TariAddress::from_str(&tari_emoji).map_err(|_| UiError::PublicKeyParseError)?;
let address = TariAddress::from_str(&tari_emoji).map_err(|_| UiError::TariAddressParseError)?;

inner.wallet.contacts_service.remove_contact(address).await?;

Expand Down Expand Up @@ -298,7 +298,7 @@ impl AppState {
result_tx: watch::Sender<UiTransactionSendStatus>,
) -> Result<(), UiError> {
let inner = self.inner.write().await;
let address = TariAddress::from_str(&address).map_err(|_| UiError::PublicKeyParseError)?;
let address = TariAddress::from_str(&address).map_err(|_| UiError::TariAddressParseError)?;

let output_features = OutputFeatures { ..Default::default() };

Expand Down Expand Up @@ -329,7 +329,7 @@ impl AppState {
result_tx: watch::Sender<UiTransactionSendStatus>,
) -> Result<(), UiError> {
let inner = self.inner.write().await;
let address = TariAddress::from_str(&address).map_err(|_| UiError::PublicKeyParseError)?;
let address = TariAddress::from_str(&address).map_err(|_| UiError::TariAddressParseError)?;
let payment_id = if payment_id_str.is_empty() {
PaymentId::Empty
} else {
Expand Down Expand Up @@ -384,10 +384,10 @@ impl AppState {
let fee_per_gram = fee_per_gram * uT;
let tx_service_handle = inner.wallet.transaction_service.clone();
let claim_public_key = match claim_public_key {
None => return Err(UiError::PublicKeyParseError),
None => return Err(UiError::TariAddressParseError),
Some(claim_public_key) => match PublicKey::from_hex(claim_public_key.as_str()) {
Ok(claim_public_key) => Some(claim_public_key),
Err(_) => return Err(UiError::PublicKeyParseError),
Err(_) => return Err(UiError::TariAddressParseError),
},
};

Expand Down Expand Up @@ -1015,7 +1015,7 @@ impl AppStateInner {
.public_key()
.clone()
.try_into_sr25519()
.map_err(|_| UiError::PublicKeyParseError)?
.map_err(|_| UiError::TariAddressParseError)?
.inner_key()
.clone();
info!(
Expand Down Expand Up @@ -1047,7 +1047,7 @@ impl AppStateInner {
.public_key()
.clone()
.try_into_sr25519()
.map_err(|_| UiError::PublicKeyParseError)?
.map_err(|_| UiError::TariAddressParseError)?
.inner_key()
.clone();
self.wallet
Expand Down Expand Up @@ -1097,7 +1097,7 @@ impl AppStateInner {
.public_key()
.clone()
.try_into_sr25519()
.map_err(|_| UiError::PublicKeyParseError)?
.map_err(|_| UiError::TariAddressParseError)?
.inner_key()
.clone();
self.wallet
Expand Down
4 changes: 2 additions & 2 deletions applications/minotari_console_wallet/src/ui/ui_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub enum UiError {
WalletError(#[from] WalletError),
#[error(transparent)]
WalletStorageError(#[from] WalletStorageError),
#[error("Could not convert string into Public Key")]
PublicKeyParseError,
#[error("The provided Tari address is invalid")]
TariAddressParseError,
#[error("Could not convert string into Net Address")]
AddressParseError,
#[error("Peer did not include an address")]
Expand Down
15 changes: 9 additions & 6 deletions applications/minotari_node/src/commands/command/add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use async_trait::async_trait;
use clap::Parser;
use minotari_app_utilities::utilities::UniPublicKey;
use tari_network::{
identity,
multiaddr::Multiaddr,
swarm::dial_opts::{DialOpts, PeerCondition},
NetworkingService,
Peer,
ToPeerId,
};

Expand All @@ -53,14 +55,15 @@ impl HandleCommand<ArgsAddPeer> for CommandContext {
return Err(Error::msg("Cannot add self as peer"));
}
let timer = Instant::now();
self.network
.add_peer(Peer::new(
identity::PublicKey::from(identity::sr25519::PublicKey::from(public_key)),
vec![args.address],
))
.await?;
let dial = self
.network
.dial_peer(
DialOpts::peer_id(peer_id)
.condition(PeerCondition::Always)
.addresses(vec![args.address])
.build(),
)
.dial_peer(DialOpts::peer_id(peer_id).condition(PeerCondition::Always).build())
.await?;
println!("Peer with node id '{}' was added to the base node. Dialing...", peer_id);

Expand Down
15 changes: 11 additions & 4 deletions base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,17 @@ impl MempoolSyncProtocol {
}

async fn handle_network_event(&mut self, event: NetworkEvent) {
#[allow(clippy::single_match)]
match event {
// If this node is connecting to a peer
NetworkEvent::PeerConnected { peer_id, direction } if direction.is_outbound() => {
if !self.is_synched() && !self.has_attempted_peer(peer_id) {
NetworkEvent::PeerIdentified {
peer_id,
supported_protocols,
..
} => {
if !self.is_synched() &&
!self.has_attempted_peer(peer_id) &&
supported_protocols.iter().any(|p| *p == MEMPOOL_SYNC_PROTOCOL)
{
self.spawn_initiator_protocol(peer_id).await;
}
},
Expand Down Expand Up @@ -277,7 +284,7 @@ impl MempoolSyncProtocol {
},
}
},
Err(err) => error!(
Err(err) => warn!(
target: LOG_TARGET,
"Unable to establish mempool protocol substream to peer `{}`: {}",
peer_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ impl BaseNodePeerManager {
self.get_current_peer().peer_id()
}

pub fn select_next_peer_if_attempted(&self) -> PeerId {
pub fn select_next_peer_if_attempted(&self) -> &Peer {
if self.time_since_last_connection_attempt().is_some() {
self.select_next_peer();
}
self.get_current_peer_id()
self.get_current_peer()
}

/// Get the current peer.
Expand Down
38 changes: 28 additions & 10 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@ use std::{mem, time::Duration};
use futures::{future, future::Either};
use log::*;
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tari_network::{identity::PeerId, DialError, NetworkHandle, NetworkingService};
use tari_network::{
identity::PeerId,
swarm::dial_opts::{DialOpts, PeerCondition},
DialError,
NetworkHandle,
NetworkingService,
Peer,
};
use tari_rpc_framework::{
pool::{RpcClientLease, RpcClientPool},
RpcClient,
Expand Down Expand Up @@ -90,7 +97,6 @@ impl WalletConnectivityService {
request_receiver,
network_handle,
base_node_watch_receiver: base_node_watch.get_receiver(),
// base_node_watch,
current_pool: None,
pending_requests: Vec::new(),
online_status_watch,
Expand Down Expand Up @@ -293,7 +299,8 @@ impl WalletConnectivityService {
}

async fn setup_base_node_connection(&mut self, mut peer_manager: BaseNodePeerManager) {
let mut peer_id = peer_manager.select_next_peer_if_attempted();
let mut peer = peer_manager.select_next_peer_if_attempted();
let peer_id = peer.peer_id();

loop {
self.set_online_status(OnlineStatus::Connecting);
Expand All @@ -302,21 +309,21 @@ impl WalletConnectivityService {
debug!(
target: LOG_TARGET,
"Attempting to connect to base node peer '{}'... (last attempt {:?})",
peer_id,
peer,
maybe_last_attempt
);

peer_manager.set_last_connection_attempt();

match self.try_setup_rpc_pool(peer_id).await {
match self.try_setup_rpc_pool(peer).await {
Ok(true) => {
if let Err(e) = self.notify_pending_requests().await {
warn!(target: LOG_TARGET, "Error notifying pending RPC requests: {}", e);
}
self.set_online_status(OnlineStatus::Online);
debug!(
target: LOG_TARGET,
"Wallet is ONLINE and connected to base node '{}'", peer_id
"Wallet is ONLINE and connected to base node '{}'", peer
);
break;
},
Expand Down Expand Up @@ -352,16 +359,16 @@ impl WalletConnectivityService {
}

// Select the next peer (if available)
let next_peer_id = peer_manager.select_next_peer().peer_id();
let next_peer = peer_manager.select_next_peer();
// If we only have one peer in the list, wait a bit before retrying
if peer_id == next_peer_id {
if peer_id == next_peer.peer_id() {
debug!(target: LOG_TARGET,
"Only single peer in base node peer list. Waiting {}s before retrying again ...",
CONNECTIVITY_WAIT.as_secs()
);
time::sleep(CONNECTIVITY_WAIT).await;
}
peer_id = next_peer_id;
peer = next_peer;
}
}

Expand All @@ -372,7 +379,18 @@ impl WalletConnectivityService {
self.online_status_watch.send(status);
}

async fn try_setup_rpc_pool(&mut self, peer_id: PeerId) -> Result<bool, WalletConnectivityError> {
async fn try_setup_rpc_pool(&mut self, peer: &Peer) -> Result<bool, WalletConnectivityError> {
let peer_id = peer.peer_id();
let dial_wait = self
.network_handle
.dial_peer(
DialOpts::peer_id(peer.peer_id())
.condition(PeerCondition::Disconnected)
.addresses(peer.addresses().to_vec())
.build(),
)
.await?;
dial_wait.await?;
let container = ClientPoolContainer {
peer_id,
base_node_sync_rpc_client: self
Expand Down
2 changes: 1 addition & 1 deletion network/core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tari_swarm::libp2p::{identity, PeerId, StreamProtocol};

#[derive(Debug, Clone)]
pub enum NetworkEvent {
IdentifiedPeer {
PeerIdentified {
peer_id: PeerId,
public_key: identity::PublicKey,
agent_version: String,
Expand Down
9 changes: 9 additions & 0 deletions network/core/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ impl Peer {
&self.public_key
}

pub fn merge_addresses(&mut self, new: Vec<Multiaddr>) -> &mut Self {
for addr in new {
if self.addresses.iter().all(|a| *a != addr) {
self.addresses.push(addr);
}
}
self
}

pub fn try_to_ristretto_public_key(&self) -> Result<RistrettoPublicKey, OtherVariantError> {
let pk = self.public_key.clone().try_into_sr25519()?;
Ok(pk.inner_key().clone())
Expand Down
30 changes: 26 additions & 4 deletions network/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ where
config: crate::Config,
relays: RelayState,
seed_peers: Vec<Peer>,
added_peers: HashMap<PeerId, Peer>,
autonat_status_sender: watch::Sender<AutonatStatus>,
is_initial_bootstrap_complete: bool,
shutdown_signal: ShutdownSignal,
Expand Down Expand Up @@ -140,6 +141,7 @@ where
pending_kad_queries: HashMap::new(),
relays: RelayState::new(known_relay_nodes),
seed_peers,
added_peers: HashMap::new(),
swarm,
ban_list: HashMap::new(),
allow_list: HashSet::new(),
Expand Down Expand Up @@ -242,7 +244,16 @@ where

let (tx_waiter, rx_waiter) = oneshot::channel();
let maybe_peer_id = dial_opts.get_peer_id();
info!(target: LOG_TARGET, "🤝 Dialing peer {:?}", dial_opts);
info!(target: LOG_TARGET, "☎️ Dialing peer {:?}", dial_opts);
// Kad can remove addresses if we fail to dial a peer (e.g. if they are temporarily offline)
// So we readd peers we've explicitly added
if let Some(peer_id) = maybe_peer_id {
if let Some(addresses) = self.added_peers.get(&peer_id).map(|p| p.addresses()) {
for address in addresses {
self.swarm.add_peer_address(peer_id, address.clone());
}
}
}

match self.swarm.dial(dial_opts) {
Ok(_) => {
Expand Down Expand Up @@ -394,13 +405,21 @@ where
let num_addresses = peer.addresses().len();
let peer_id = peer.peer_id();
let mut failed = 0usize;
for address in peer.addresses {
let update = self.swarm.behaviour_mut().kad.add_address(&peer_id, address);
for address in &peer.addresses {
let update = self.swarm.behaviour_mut().kad.add_address(&peer_id, address.clone());
if matches!(update, RoutingUpdate::Failed) {
failed += 1;
}
}

match self.added_peers.entry(peer.peer_id) {
Entry::Occupied(mut p_mut) => {
p_mut.get_mut().merge_addresses(peer.addresses);
},
Entry::Vacant(entry) => {
entry.insert(peer);
},
}
if failed == 0 {
let _ignore = reply.send(Ok(()));
} else {
Expand Down Expand Up @@ -1148,7 +1167,7 @@ where
self.establish_relay_circuit_on_connect(&peer_id);
}

self.publish_event(NetworkEvent::IdentifiedPeer {
self.publish_event(NetworkEvent::PeerIdentified {
peer_id,
public_key,
agent_version,
Expand Down Expand Up @@ -1255,6 +1274,9 @@ where
error,
} => {
debug!(target: LOG_TARGET, "Inbound substream failed from peer {peer_id} with stream id {stream_id}: {error}");
if let Some(waiting_reply) = self.pending_substream_requests.remove(&stream_id) {
let _ignore = waiting_reply.send(Err(NetworkError::FailedToOpenSubstream(error)));
}
},
OutboundFailure {
error,
Expand Down
Loading

0 comments on commit c510744

Please sign in to comment.