From bb03359179631a4b55c515bb37be28ea58b655b3 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Thu, 14 Nov 2024 08:53:14 +0400 Subject: [PATCH] fixes --- network/core/src/worker.rs | 6 +++++- network/libp2p-messaging/src/behaviour.rs | 19 +++++++++---------- network/libp2p-peersync/src/behaviour.rs | 19 +++++-------------- network/libp2p-peersync/src/peer_record.rs | 10 +++++++--- network/libp2p-substream/src/behaviour.rs | 4 ++++ 5 files changed, 30 insertions(+), 28 deletions(-) diff --git a/network/core/src/worker.rs b/network/core/src/worker.rs index fa6e87ef8ea..f2824229105 100644 --- a/network/core/src/worker.rs +++ b/network/core/src/worker.rs @@ -975,8 +975,12 @@ where use autonat::Event::*; match event { StatusChanged { old, new } => { - if let Some(public_address) = self.swarm.behaviour().autonat.public_address() { + if let Some(public_address) = self.swarm.behaviour().autonat.public_address().cloned() { info!(target: LOG_TARGET, "🌍️ Autonat: Our public address is {public_address}"); + self.swarm + .behaviour_mut() + .peer_sync + .add_known_local_public_addresses(vec![public_address]); } self.autonat_status_sender.send_if_modified(|prev| { diff --git a/network/libp2p-messaging/src/behaviour.rs b/network/libp2p-messaging/src/behaviour.rs index b450eb89105..bad7560f8a8 100644 --- a/network/libp2p-messaging/src/behaviour.rs +++ b/network/libp2p-messaging/src/behaviour.rs @@ -15,6 +15,7 @@ use libp2p::{ ConnectionDenied, ConnectionHandler, ConnectionId, + DialError, DialFailure, FromSwarm, NetworkBehaviour, @@ -128,12 +129,12 @@ where TCodec: Codec + Send + Clone + 'static }, None => { let stream_id = self.next_outbound_stream_id(); - tracing::debug!("create a new outbound dial {stream_id}"); let (sink, stream) = stream::channel(stream_id, peer_id); - self.pending_events.push_back(ToSwarm::Dial { - opts: DialOpts::peer_id(peer_id).build(), - }); + let opts = DialOpts::peer_id(peer_id).build(); + let connection_id = opts.connection_id(); + self.pending_events.push_back(ToSwarm::Dial { opts }); + tracing::debug!("create a new outbound dial (conn_id={connection_id}, stream {stream_id})"); self.pending_outbound_dials.insert(peer_id, (sink.clone(), stream)); sink @@ -198,13 +199,11 @@ where TCodec: Codec + Send + Clone + 'static } fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) { + if matches!(error, DialError::DialPeerConditionFalse(_)) { + return; + } + if let Some(peer) = peer_id { - // If there are pending outgoing messages when a dial failure occurs, - // it is implied that we are not connected to the peer, since pending - // outgoing messages are drained when a connection is established and - // only created when a peer is not connected when a request is made. - // Thus these requests must be considered failed, even if there is - // another, concurrent dialing attempt ongoing. if let Some((_sink, stream)) = self.pending_outbound_dials.remove(&peer) { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { diff --git a/network/libp2p-peersync/src/behaviour.rs b/network/libp2p-peersync/src/behaviour.rs index 9bde2df6825..20f5dc574c4 100644 --- a/network/libp2p-peersync/src/behaviour.rs +++ b/network/libp2p-peersync/src/behaviour.rs @@ -113,11 +113,14 @@ where TPeerStore: PeerStore return; } + let mut is_any_new = false; for addr in addrs { - self.local_peer_record.add_address(addr.clone()); + is_any_new |= self.local_peer_record.add_address(addr.clone()); } - self.handle_update_local_record(); + if is_any_new { + self.handle_update_local_record(); + } } pub async fn want_peers>(&mut self, peers: I) -> Result<(), Error> { @@ -309,18 +312,6 @@ where TPeerStore: PeerStore fn poll(&mut self, cx: &mut Context<'_>) -> Poll>> { if let Some(event) = self.pending_events.pop_front() { - // if let - // ToSwarm::GenerateEvent(event) = - // &event { - // match event { - // Event::InboundFailure { peer_id, .. } => {} - // Event::OutboundFailure { peer_id, .. } => {} - // Event::InboundStreamInterrupted { peer_id, .. } => {} - // Event::OutboundStreamInterrupted { peer_id, .. } => {} - // Event::ResponseStreamComplete { peer_id, .. } => {} - // Event::Error(_) => {} - // } - // } return Poll::Ready(event); } if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/network/libp2p-peersync/src/peer_record.rs b/network/libp2p-peersync/src/peer_record.rs index 6a9a372ccea..347a7227a32 100644 --- a/network/libp2p-peersync/src/peer_record.rs +++ b/network/libp2p-peersync/src/peer_record.rs @@ -124,9 +124,13 @@ impl LocalPeerRecord { self.keypair.public().to_peer_id() } - pub fn add_address(&mut self, address: Multiaddr) { - self.addresses.insert(address); - self.sign(); + pub fn add_address(&mut self, address: Multiaddr) -> bool { + if self.addresses.insert(address) { + // Sign only if the address was not already there + self.sign(); + return true; + } + false } pub fn remove_address(&mut self, address: &Multiaddr) { diff --git a/network/libp2p-substream/src/behaviour.rs b/network/libp2p-substream/src/behaviour.rs index d604cf349ff..f5759fcbf07 100644 --- a/network/libp2p-substream/src/behaviour.rs +++ b/network/libp2p-substream/src/behaviour.rs @@ -164,6 +164,10 @@ impl Behaviour { } fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) { + if matches!(error, DialError::DialPeerConditionFalse(_)) { + return; + } + if let Some(peer) = peer_id { // If there are pending outgoing stream requests when a dial failure occurs, // it is implied that we are not connected to the peer, since pending