Skip to content

Commit

Permalink
connection reliability
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 6, 2024
1 parent c510744 commit c7b5e86
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 110 deletions.
8 changes: 4 additions & 4 deletions applications/minotari_console_wallet/log4rs_sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ appenders:
# An appender named "libp2p" that writes to a file with a custom pattern encoder
libp2p:
kind: rolling_file
path: "{{log_dir}}/log/libp2p.log"
path: "{{log_dir}}/log/wallet/libp2p.log"
policy:
kind: compound
trigger:
Expand All @@ -110,9 +110,9 @@ appenders:
kind: fixed_window
base: 1
count: 5
pattern: "{{log_dir}}/log/libp2p.{}.log"
pattern: "{{log_dir}}/log/wallet/libp2p.{}.log"
encoder:
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{X(grpc)}] {f}.{L} {i} [{t}] {l:5} {m}{n}"
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}"

# An appender named "fail2ban" that writes to a file with a custom pattern encoder
fail2ban:
Expand Down Expand Up @@ -158,7 +158,7 @@ loggers:
- other
additive: false

# contacts
# contacts
contacts:
level: info
appenders:
Expand Down
6 changes: 3 additions & 3 deletions applications/minotari_node/log4rs_sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ appenders:
# An appender named "libp2p" that writes to a file with a custom pattern encoder
libp2p:
kind: rolling_file
path: "{{log_dir}}/log/libp2p.log"
path: "{{log_dir}}/log/base_node/libp2p.log"
policy:
kind: compound
trigger:
Expand All @@ -118,9 +118,9 @@ appenders:
kind: fixed_window
base: 1
count: 5
pattern: "{{log_dir}}/log/libp2p.{}.log"
pattern: "{{log_dir}}/log/base_node/libp2p.{}.log"
encoder:
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{X(grpc)}] {f}.{L} {i} [{t}] {l:5} {m}{n}"
pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n} // {f}:{L} "

# An appender named "fail2ban" that writes to a file with a custom pattern encoder
fail2ban:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@

use std::{
fmt::Display,
ops::Deref,
sync::{atomic, atomic::AtomicUsize, Arc, Mutex},
sync::{atomic, atomic::AtomicUsize, Arc},
time::{Duration, Instant},
};

Expand All @@ -38,13 +37,7 @@ pub struct BaseNodePeerManager {
current_peer_index: Arc<AtomicUsize>,
// The other base nodes that the wallet can connect to if the selected peer is not available
peer_list: Arc<Vec<Peer>>,
last_connection_attempt: Arc<Mutex<Option<LastConnectionAttempt>>>,
}

#[derive(Clone, Debug)]
pub struct LastConnectionAttempt {
pub peer_index: usize,
pub attempt_time: Instant,
local_last_connection_attempt: Option<Instant>,
}

impl BaseNodePeerManager {
Expand All @@ -64,7 +57,7 @@ impl BaseNodePeerManager {
Ok(Self {
current_peer_index: Arc::new(AtomicUsize::new(preferred_peer_index)),
peer_list: Arc::new(peer_list),
last_connection_attempt: Arc::new(Mutex::new(None)),
local_last_connection_attempt: None,
})
}

Expand All @@ -73,7 +66,7 @@ impl BaseNodePeerManager {
self.get_current_peer().peer_id()
}

pub fn select_next_peer_if_attempted(&self) -> &Peer {
pub fn select_next_peer_if_attempted(&mut self) -> &Peer {
if self.time_since_last_connection_attempt().is_some() {
self.select_next_peer();
}
Expand All @@ -89,8 +82,12 @@ impl BaseNodePeerManager {
}

/// Changes to the next peer in the list, returning that peer
pub fn select_next_peer(&self) -> &Peer {
pub fn select_next_peer(&mut self) -> &Peer {
self.set_current_peer_index((self.current_peer_index() + 1) % self.peer_list.len());
if self.peer_list.len() > 1 {
// Reset the last attempt since we've moved onto another peer
self.local_last_connection_attempt = None;
}
&self.peer_list[self.current_peer_index()]
}

Expand All @@ -104,31 +101,13 @@ impl BaseNodePeerManager {
}

/// Set the last connection attempt stats
pub fn set_last_connection_attempt(&self) {
let mut lock = self
.last_connection_attempt
.lock()
// In the currently impossible case that a panic occurs while this mutex is unlocked, we'll
// simply recover to use the previous value before the panic.
.unwrap_or_else(|e| e.into_inner());
*lock = Some(LastConnectionAttempt {
peer_index: self.current_peer_index(),
attempt_time: Instant::now(),
});
pub fn set_last_connection_attempt(&mut self) {
self.local_last_connection_attempt = Some(Instant::now());
}

/// Get the last connection attempt stats
/// Get the last connection attempt for the current peer
pub fn time_since_last_connection_attempt(&self) -> Option<Duration> {
let lock = self.last_connection_attempt.lock().unwrap_or_else(|e| e.into_inner());
if let Some(stats) = lock.deref() {
if stats.peer_index == self.current_peer_index() {
Some(stats.attempt_time.elapsed())
} else {
None
}
} else {
None
}
self.local_last_connection_attempt.as_ref().map(|t| t.elapsed())
}

fn set_current_peer_index(&self, index: usize) {
Expand Down
46 changes: 25 additions & 21 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{mem, time::Duration};
use std::{mem, pin::pin, time::Duration};

use futures::{future, future::Either};
use log::*;
Expand All @@ -39,7 +39,7 @@ use tari_rpc_framework::{
RpcConnector,
};
use tokio::{
sync::{mpsc, oneshot, watch},
sync::{mpsc, oneshot},
time,
time::MissedTickBehavior,
};
Expand All @@ -65,7 +65,7 @@ pub struct WalletConnectivityService {
config: BaseNodeServiceConfig,
request_receiver: mpsc::Receiver<WalletConnectivityRequest>,
network_handle: NetworkHandle,
base_node_watch_receiver: watch::Receiver<Option<BaseNodePeerManager>>,
base_node_watch: Watch<Option<BaseNodePeerManager>>,
current_pool: Option<ClientPoolContainer>,
online_status_watch: Watch<OnlineStatus>,
pending_requests: Vec<ReplyOneshot>,
Expand Down Expand Up @@ -96,7 +96,7 @@ impl WalletConnectivityService {
config,
request_receiver,
network_handle,
base_node_watch_receiver: base_node_watch.get_receiver(),
base_node_watch,
current_pool: None,
pending_requests: Vec::new(),
online_status_watch,
Expand All @@ -115,8 +115,8 @@ impl WalletConnectivityService {
// BIASED: select branches are in order of priority
biased;

Ok(_) = self.base_node_watch_receiver.changed() => {
if self.base_node_watch_receiver.borrow().is_some() {
_ = self.base_node_watch.changed() => {
if self.base_node_watch.borrow().is_some() {
// This will block the rest until the connection is established. This is what we want.
trace!(target: LOG_TARGET, "start: base_node_watch_receiver.changed");
self.check_connection_and_connect_if_required().await;
Expand Down Expand Up @@ -204,7 +204,7 @@ impl WalletConnectivityService {
val
} else {
self.pending_requests.push(reply.into());
warn!(target: LOG_TARGET, "{} wallet requests waiting for connection", self.pending_requests.len());
debug!(target: LOG_TARGET, "{} wallet requests waiting for connection", self.pending_requests.len());
return;
};

Expand Down Expand Up @@ -279,14 +279,11 @@ impl WalletConnectivityService {
}

fn current_base_node(&self) -> Option<PeerId> {
self.base_node_watch_receiver
.borrow()
.as_ref()
.map(|p| p.get_current_peer_id())
self.base_node_watch.borrow().as_ref().map(|p| p.get_current_peer_id())
}

fn get_base_node_peer_manager(&self) -> Option<BaseNodePeerManager> {
self.base_node_watch_receiver.borrow().as_ref().cloned()
self.base_node_watch.borrow().as_ref().cloned()
}

async fn disconnect_base_node(&mut self, peer_id: PeerId) {
Expand All @@ -299,7 +296,7 @@ impl WalletConnectivityService {
}

async fn setup_base_node_connection(&mut self, mut peer_manager: BaseNodePeerManager) {
let mut peer = peer_manager.select_next_peer_if_attempted();
let mut peer = peer_manager.select_next_peer_if_attempted().clone();
let peer_id = peer.peer_id();

loop {
Expand All @@ -315,7 +312,7 @@ impl WalletConnectivityService {

peer_manager.set_last_connection_attempt();

match self.try_setup_rpc_pool(peer).await {
match self.try_setup_rpc_pool(&peer).await {
Ok(true) => {
if let Err(e) = self.notify_pending_requests().await {
warn!(target: LOG_TARGET, "Error notifying pending RPC requests: {}", e);
Expand Down Expand Up @@ -345,6 +342,7 @@ impl WalletConnectivityService {
};
self.disconnect_base_node(peer_id).await;
self.set_online_status(OnlineStatus::Offline);
continue;
},
Err(WalletConnectivityError::DialError(DialError::Aborted)) => {
debug!(target: LOG_TARGET, "Dial was cancelled.");
Expand All @@ -359,14 +357,17 @@ impl WalletConnectivityService {
}

// Select the next peer (if available)
let next_peer = peer_manager.select_next_peer();
let next_peer = peer_manager.select_next_peer().clone();
// If we only have one peer in the list, wait a bit before retrying
if peer_id == next_peer.peer_id() {
debug!(target: LOG_TARGET,
"Only single peer in base node peer list. Waiting {}s before retrying again ...",
CONNECTIVITY_WAIT.as_secs()
);
time::sleep(CONNECTIVITY_WAIT).await;
} else {
// Ensure that all services are aware of the next peer being attempted
self.base_node_watch.mark_changed();
}
peer = next_peer;
}
Expand All @@ -385,12 +386,12 @@ impl WalletConnectivityService {
.network_handle
.dial_peer(
DialOpts::peer_id(peer.peer_id())
.condition(PeerCondition::Disconnected)
.condition(PeerCondition::DisconnectedAndNotDialing)
.addresses(peer.addresses().to_vec())
.build(),
)
.await?;
dial_wait.await?;

let container = ClientPoolContainer {
peer_id,
base_node_sync_rpc_client: self
Expand All @@ -403,10 +404,13 @@ impl WalletConnectivityService {

// Create the first RPC session to ensure that we can connect.
{
let connect_fut = container.base_node_wallet_rpc_client.get();
futures::pin_mut!(connect_fut);
let bn_changed_fut = self.base_node_watch_receiver.changed();
futures::pin_mut!(bn_changed_fut);
let mut bn_changed_fut = pin!(self.base_node_watch.changed());
match future::select(dial_wait, &mut bn_changed_fut).await {
Either::Left((result, _)) => result?,
Either::Right(_) => return Ok(false),
};
debug!(target: LOG_TARGET, "Dial succeeded for {peer_id}");
let connect_fut = pin!(container.base_node_wallet_rpc_client.get());
let client = match future::select(connect_fut, bn_changed_fut).await {
Either::Left((result, _)) => result?,
Either::Right(_) => return Ok(false),
Expand Down
16 changes: 16 additions & 0 deletions base_layer/wallet/src/util/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ impl<T> Watch<T> {
&self.1
}

fn receiver_mut(&mut self) -> &mut watch::Receiver<T> {
&mut self.1
}

/// Marks the state as changed.
///
/// After invoking this method [`has_changed()`](Self::has_changed)
/// returns `true` and [`changed()`](Self::changed) returns
/// immediately, regardless of whether a new value has been sent.
///
/// This is useful for triggering an initial change notification after
/// subscribing to synchronize new receivers.
pub fn mark_changed(&mut self) {
self.receiver_mut().mark_changed();
}

pub fn get_receiver(&self) -> watch::Receiver<T> {
self.receiver().clone()
}
Expand Down
5 changes: 4 additions & 1 deletion network/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ pub enum DialError {
DialPeerConditionFalse(dial_opts::PeerCondition),
#[error("Pending connection attempt has been aborted.")]
Aborted,
#[error("The peer identity obtained ({obtained}) on the connection did not match the one that was expected.")]
#[error(
"The peer ID obtained ({obtained}) on the connection did not match the one that was expected. This is usually \
because a different peer is listening on the provided address."
)]
WrongPeerId { obtained: PeerId, endpoint: ConnectedPoint },
#[error("One of the [`NetworkBehaviour`]s rejected the outbound connection: {cause}.")]
Denied { cause: String },
Expand Down
29 changes: 18 additions & 11 deletions network/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,18 @@ where
}
let _ignore = reply.send(Ok(rx_waiter.into()));
},
Err(err @ DialError::DialPeerConditionFalse(_)) => {
debug!(target :LOG_TARGET, "{err}");
if let Some(peer_id) = maybe_peer_id {
if self.active_connections.contains_key(&peer_id) {
let _ignore = tx_waiter.send(Ok(()));
} else {
// We can add to pending because an event will occur
self.pending_dial_requests.entry(peer_id).or_default().push(tx_waiter);
}
}
let _ignore = reply.send(Ok(rx_waiter.into()));
},
Err(err) => {
info!(target: LOG_TARGET, "🚨 Failed to dial peer: {}", err);
let _ignore = reply.send(Err(err.into()));
Expand Down Expand Up @@ -678,9 +690,10 @@ where
peer_id,
endpoint,
cause,
connection_id,
..
} => {
info!(target: LOG_TARGET, "🔌 Connection closed: peer_id={}, endpoint={:?}, cause={:?}", peer_id, endpoint, cause);
info!(target: LOG_TARGET, "🔌 Connection closed: id={}, peer_id={}, endpoint={:?}, cause={:?}", connection_id, peer_id, endpoint, cause);
match self.active_connections.entry(peer_id) {
Entry::Occupied(mut entry) => {
entry.get_mut().retain(|c| c.endpoint != endpoint);
Expand Down Expand Up @@ -1072,13 +1085,10 @@ where
supported_protocols: vec![],
});

let Some(waiters) = self.pending_dial_requests.remove(&peer_id) else {
debug!(target: LOG_TARGET, "No pending dial requests initiated by this service for peer {}", peer_id);
return Ok(());
};

for waiter in waiters {
let _ignore = waiter.send(Ok(()));
if let Some(waiters) = self.pending_dial_requests.remove(&peer_id) {
for waiter in waiters {
let _ignore = waiter.send(Ok(()));
}
}

self.publish_event(NetworkEvent::PeerConnected {
Expand Down Expand Up @@ -1274,9 +1284,6 @@ where
error,
} => {
debug!(target: LOG_TARGET, "Inbound substream failed from peer {peer_id} with stream id {stream_id}: {error}");
if let Some(waiting_reply) = self.pending_substream_requests.remove(&stream_id) {
let _ignore = waiting_reply.send(Err(NetworkError::FailedToOpenSubstream(error)));
}
},
OutboundFailure {
error,
Expand Down
Loading

0 comments on commit c7b5e86

Please sign in to comment.