diff --git a/comms/core/src/connection_manager/peer_connection.rs b/comms/core/src/connection_manager/peer_connection.rs index d5a127cdca9..b1e1435d2c0 100644 --- a/comms/core/src/connection_manager/peer_connection.rs +++ b/comms/core/src/connection_manager/peer_connection.rs @@ -532,7 +532,6 @@ impl PeerConnectionActor { self.peer_node_id.short_str(), e ); - return Ok(()); }, e => trace!(target: LOG_TARGET, "On disconnect: ({})", e), } diff --git a/comms/core/src/connectivity/connection_pool.rs b/comms/core/src/connectivity/connection_pool.rs index 6242ad33796..fbbaf47f3dd 100644 --- a/comms/core/src/connectivity/connection_pool.rs +++ b/comms/core/src/connectivity/connection_pool.rs @@ -37,6 +37,12 @@ pub enum ConnectionStatus { Disconnected(Minimized), } +impl ConnectionStatus { + pub fn is_connected(&self) -> bool { + matches!(self, ConnectionStatus::Connected) + } +} + impl fmt::Display for ConnectionStatus { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self) @@ -58,7 +64,7 @@ impl PeerConnectionState { /// Return true if the underlying connection exists and is connected, otherwise false pub fn is_connected(&self) -> bool { - self.connection().filter(|c| c.is_connected()).is_some() + self.status.is_connected() && self.connection().map_or(false, |c| c.is_connected()) } pub fn connection_mut(&mut self) -> Option<&mut PeerConnection> { diff --git a/comms/core/src/connectivity/manager.rs b/comms/core/src/connectivity/manager.rs index 30eced00d46..c916a2ec0fb 100644 --- a/comms/core/src/connectivity/manager.rs +++ b/comms/core/src/connectivity/manager.rs @@ -188,11 +188,9 @@ impl ConnectivityManagerActor { self.handle_request(req).await; }, - event = connection_manager_events.recv() => { - if let Ok(event) = event { - if let Err(err) = self.handle_connection_manager_event(&event).await { - error!(target:LOG_TARGET, "Error handling connection manager event: {:?}", err); - } + Ok(event) = connection_manager_events.recv() => { + if let Err(err) = self.handle_connection_manager_event(&event).await { + error!(target:LOG_TARGET, "Error handling connection manager event: {:?}", err); } }, @@ -738,24 +736,37 @@ impl ConnectivityManagerActor { } async fn on_new_connection(&mut self, new_conn: &PeerConnection) -> TieBreak { - match self.pool.get_connection(new_conn.peer_node_id()).cloned() { - Some(existing_conn) if !existing_conn.is_connected() => { + match self.pool.get(new_conn.peer_node_id()).cloned() { + Some(existing_state) if !existing_state.is_connected() => { debug!( target: LOG_TARGET, "Tie break: Existing connection (id: {}, peer: {}, direction: {}) was not connected, resolving \ tie break by using the new connection. (New: id: {}, peer: {}, direction: {})", - existing_conn.id(), - existing_conn.peer_node_id(), - existing_conn.direction(), + existing_state.connection().map(|c| c.id()).unwrap_or_default(), + existing_state.node_id(), + existing_state.connection().map(|c| c.direction().as_str()).unwrap_or("--"), new_conn.id(), new_conn.peer_node_id(), new_conn.direction(), ); - self.pool.remove(existing_conn.peer_node_id()); + self.pool.remove(existing_state.node_id()); TieBreak::UseNew }, - Some(mut existing_conn) => { - if self.tie_break_existing_connection(&existing_conn, new_conn) { + Some(mut existing_state) => { + let Some(existing_conn) = existing_state.connection_mut() else { + error!( + target: LOG_TARGET, + "INVARIANT ERROR in Tie break: PeerConnection is None but state is CONNECTED: Existing connection (id: {}, peer: {}, direction: {}), new connection. (id: {}, peer: {}, direction: {})", + existing_state.connection().map(|c| c.id()).unwrap_or_default(), + existing_state.node_id(), + existing_state.connection().map(|c| c.direction().as_str()).unwrap_or("--"), + new_conn.id(), + new_conn.peer_node_id(), + new_conn.direction(), + ); + return TieBreak::UseNew; + }; + if self.tie_break_existing_connection(existing_conn, new_conn) { warn!( target: LOG_TARGET, "Tie break: Keep new connection (id: {}, peer: {}, direction: {}). Disconnect existing \