From 4fb8fd7556cd6cdfffad7bfd8b08f36cd1e091df Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Thu, 14 Nov 2024 08:53:14 +0400 Subject: [PATCH] fixes --- network/core/src/worker.rs | 6 +++- network/libp2p-messaging/src/behaviour.rs | 19 ++++++------ network/libp2p-peersync/src/behaviour.rs | 35 ++++++++++------------ network/libp2p-peersync/src/peer_record.rs | 19 ++++++++---- network/libp2p-substream/src/behaviour.rs | 4 +++ 5 files changed, 46 insertions(+), 37 deletions(-) diff --git a/network/core/src/worker.rs b/network/core/src/worker.rs index fa6e87ef8e..f282422910 100644 --- a/network/core/src/worker.rs +++ b/network/core/src/worker.rs @@ -975,8 +975,12 @@ where use autonat::Event::*; match event { StatusChanged { old, new } => { - if let Some(public_address) = self.swarm.behaviour().autonat.public_address() { + if let Some(public_address) = self.swarm.behaviour().autonat.public_address().cloned() { info!(target: LOG_TARGET, "🌍️ Autonat: Our public address is {public_address}"); + self.swarm + .behaviour_mut() + .peer_sync + .add_known_local_public_addresses(vec![public_address]); } self.autonat_status_sender.send_if_modified(|prev| { diff --git a/network/libp2p-messaging/src/behaviour.rs b/network/libp2p-messaging/src/behaviour.rs index b450eb8910..bad7560f8a 100644 --- a/network/libp2p-messaging/src/behaviour.rs +++ b/network/libp2p-messaging/src/behaviour.rs @@ -15,6 +15,7 @@ use libp2p::{ ConnectionDenied, ConnectionHandler, ConnectionId, + DialError, DialFailure, FromSwarm, NetworkBehaviour, @@ -128,12 +129,12 @@ where TCodec: Codec + Send + Clone + 'static }, None => { let stream_id = self.next_outbound_stream_id(); - tracing::debug!("create a new outbound dial {stream_id}"); let (sink, stream) = stream::channel(stream_id, peer_id); - self.pending_events.push_back(ToSwarm::Dial { - opts: DialOpts::peer_id(peer_id).build(), - }); + let opts = DialOpts::peer_id(peer_id).build(); + let connection_id = opts.connection_id(); + self.pending_events.push_back(ToSwarm::Dial { opts }); + tracing::debug!("create a new outbound dial (conn_id={connection_id}, stream {stream_id})"); self.pending_outbound_dials.insert(peer_id, (sink.clone(), stream)); sink @@ -198,13 +199,11 @@ where TCodec: Codec + Send + Clone + 'static } fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) { + if matches!(error, DialError::DialPeerConditionFalse(_)) { + return; + } + if let Some(peer) = peer_id { - // If there are pending outgoing messages when a dial failure occurs, - // it is implied that we are not connected to the peer, since pending - // outgoing messages are drained when a connection is established and - // only created when a peer is not connected when a request is made. - // Thus these requests must be considered failed, even if there is - // another, concurrent dialing attempt ongoing. if let Some((_sink, stream)) = self.pending_outbound_dials.remove(&peer) { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { diff --git a/network/libp2p-peersync/src/behaviour.rs b/network/libp2p-peersync/src/behaviour.rs index 9bde2df682..eeeb880ad1 100644 --- a/network/libp2p-peersync/src/behaviour.rs +++ b/network/libp2p-peersync/src/behaviour.rs @@ -113,11 +113,14 @@ where TPeerStore: PeerStore return; } + let mut is_any_new = false; for addr in addrs { - self.local_peer_record.add_address(addr.clone()); + is_any_new |= self.local_peer_record.add_address(addr.clone()); } - self.handle_update_local_record(); + if is_any_new { + self.handle_update_local_record(); + } } pub async fn want_peers>(&mut self, peers: I) -> Result<(), Error> { @@ -263,14 +266,18 @@ where TPeerStore: PeerStore FromSwarm::ConnectionClosed(connection_closed) => self.on_connection_closed(connection_closed), FromSwarm::AddressChange(_) => {}, FromSwarm::ExternalAddrConfirmed(addr_confirmed) => { - self.local_peer_record.add_address(addr_confirmed.addr.clone()); - self.handle_update_local_record() + if self.local_peer_record.add_address(addr_confirmed.addr.clone()) { + self.handle_update_local_record(); + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::LocalPeerRecordUpdated)); + } }, FromSwarm::ExternalAddrExpired(addr_expired) => { - self.local_peer_record.remove_address(addr_expired.addr); - self.handle_update_local_record(); - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::LocalPeerRecordUpdated)); + if self.local_peer_record.remove_address(addr_expired.addr) { + self.handle_update_local_record(); + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::LocalPeerRecordUpdated)); + } }, _ => {}, } @@ -309,18 +316,6 @@ where TPeerStore: PeerStore fn poll(&mut self, cx: &mut Context<'_>) -> Poll>> { if let Some(event) = self.pending_events.pop_front() { - // if let - // ToSwarm::GenerateEvent(event) = - // &event { - // match event { - // Event::InboundFailure { peer_id, .. } => {} - // Event::OutboundFailure { peer_id, .. } => {} - // Event::InboundStreamInterrupted { peer_id, .. } => {} - // Event::OutboundStreamInterrupted { peer_id, .. } => {} - // Event::ResponseStreamComplete { peer_id, .. } => {} - // Event::Error(_) => {} - // } - // } return Poll::Ready(event); } if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/network/libp2p-peersync/src/peer_record.rs b/network/libp2p-peersync/src/peer_record.rs index 6a9a372cce..77c9ae2272 100644 --- a/network/libp2p-peersync/src/peer_record.rs +++ b/network/libp2p-peersync/src/peer_record.rs @@ -124,14 +124,21 @@ impl LocalPeerRecord { self.keypair.public().to_peer_id() } - pub fn add_address(&mut self, address: Multiaddr) { - self.addresses.insert(address); - self.sign(); + pub fn add_address(&mut self, address: Multiaddr) -> bool { + if self.addresses.insert(address) { + // Sign only if the address was not already there + self.sign(); + return true; + } + false } - pub fn remove_address(&mut self, address: &Multiaddr) { - self.addresses.remove(address); - self.sign(); + pub fn remove_address(&mut self, address: &Multiaddr) -> bool { + if self.addresses.remove(address) { + self.sign(); + return true; + } + false } pub fn addresses(&self) -> &HashSet { diff --git a/network/libp2p-substream/src/behaviour.rs b/network/libp2p-substream/src/behaviour.rs index d604cf349f..f5759fcbf0 100644 --- a/network/libp2p-substream/src/behaviour.rs +++ b/network/libp2p-substream/src/behaviour.rs @@ -164,6 +164,10 @@ impl Behaviour { } fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) { + if matches!(error, DialError::DialPeerConditionFalse(_)) { + return; + } + if let Some(peer) = peer_id { // If there are pending outgoing stream requests when a dial failure occurs, // it is implied that we are not connected to the peer, since pending