From da8ca405ce91b6d1cf8f529279cc20072efa1080 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Sun, 3 Nov 2024 08:48:07 +0400 Subject: [PATCH] fix: mempool sync handles multiple connections to same peer --- .../core/src/mempool/sync_protocol/mod.rs | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/base_layer/core/src/mempool/sync_protocol/mod.rs b/base_layer/core/src/mempool/sync_protocol/mod.rs index e75b4ab6df..d40c157910 100644 --- a/base_layer/core/src/mempool/sync_protocol/mod.rs +++ b/base_layer/core/src/mempool/sync_protocol/mod.rs @@ -64,10 +64,11 @@ //! ``` use std::{ + collections::HashSet, convert::TryFrom, iter, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, }, time::Duration, @@ -113,7 +114,8 @@ pub struct MempoolSyncProtocol { config: MempoolServiceConfig, protocol_notifier: mpsc::UnboundedReceiver>, mempool: Mempool, - num_synched: Arc, + peers_attempted: HashSet, + is_done: Arc, permits: Arc, network: NetworkHandle, block_event_stream: BlockEventReceiver, @@ -131,7 +133,8 @@ impl MempoolSyncProtocol { config, protocol_notifier, mempool, - num_synched: Arc::new(AtomicUsize::new(0)), + peers_attempted: HashSet::new(), + is_done: Arc::new(AtomicBool::new(false)), permits: Arc::new(Semaphore::new(1)), network, block_event_stream, @@ -162,7 +165,7 @@ impl MempoolSyncProtocol { match event { // If this node is connecting to a peer NetworkEvent::PeerConnected { peer_id, direction } if direction.is_outbound() => { - if !self.is_synched() { + if !self.is_synched() && !self.has_attempted_peer(peer_id) { self.spawn_initiator_protocol(peer_id).await; } }, @@ -191,7 +194,7 @@ impl MempoolSyncProtocol { // we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till // initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the // initial_sync_num_peers - self.num_synched.store(0, Ordering::SeqCst); + self.peers_attempted.clear(); let connections = match self .network .select_random_connections(self.config.initial_sync_num_peers, Default::default()) @@ -218,7 +221,11 @@ impl MempoolSyncProtocol { } fn is_synched(&self) -> bool { - self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers + self.is_done.load(Ordering::SeqCst) + } + + fn has_attempted_peer(&self, peer_id: PeerId) -> bool { + self.peers_attempted.contains(&peer_id) } fn handle_protocol_notification(&mut self, notification: ProtocolNotification) { @@ -232,13 +239,15 @@ impl MempoolSyncProtocol { async fn spawn_initiator_protocol(&mut self, peer_id: PeerId) { let mempool = self.mempool.clone(); let permits = self.permits.clone(); - let num_synched = self.num_synched.clone(); + let is_done = self.is_done.clone(); let config = self.config.clone(); let network = self.network.clone(); + let num_synced = self.peers_attempted.len(); + self.peers_attempted.insert(peer_id); task::spawn(async move { // Only initiate this protocol with a single peer at a time let _permit = permits.acquire().await; - if num_synched.load(Ordering::SeqCst) >= config.initial_sync_num_peers { + if is_done.load(Ordering::SeqCst) { return; } match network @@ -246,6 +255,7 @@ impl MempoolSyncProtocol { .await { Ok(framed) => { + let initial_sync_num_peers = config.initial_sync_num_peers; let protocol = MempoolPeerProtocol::new(config, framed, peer_id, mempool); match protocol.start_initiator().await { Ok(_) => { @@ -254,7 +264,9 @@ impl MempoolSyncProtocol { "Mempool initiator protocol completed successfully for peer `{}`", peer_id, ); - num_synched.fetch_add(1, Ordering::SeqCst); + if num_synced >= initial_sync_num_peers { + is_done.store(true, Ordering::SeqCst); + } }, Err(err) => { debug!(