Skip to content

Commit

Permalink
fix: mempool sync handles multiple connections to same peer
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 3, 2024
1 parent eeb3dca commit da8ca40
Showing 1 changed file with 21 additions and 9 deletions.
30 changes: 21 additions & 9 deletions base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@
//! ```
use std::{
collections::HashSet,
convert::TryFrom,
iter,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
Expand Down Expand Up @@ -113,7 +114,8 @@ pub struct MempoolSyncProtocol {
config: MempoolServiceConfig,
protocol_notifier: mpsc::UnboundedReceiver<ProtocolNotification<Substream>>,
mempool: Mempool,
num_synched: Arc<AtomicUsize>,
peers_attempted: HashSet<PeerId>,
is_done: Arc<AtomicBool>,
permits: Arc<Semaphore>,
network: NetworkHandle,
block_event_stream: BlockEventReceiver,
Expand All @@ -131,7 +133,8 @@ impl MempoolSyncProtocol {
config,
protocol_notifier,
mempool,
num_synched: Arc::new(AtomicUsize::new(0)),
peers_attempted: HashSet::new(),
is_done: Arc::new(AtomicBool::new(false)),
permits: Arc::new(Semaphore::new(1)),
network,
block_event_stream,
Expand Down Expand Up @@ -162,7 +165,7 @@ impl MempoolSyncProtocol {
match event {
// If this node is connecting to a peer
NetworkEvent::PeerConnected { peer_id, direction } if direction.is_outbound() => {
if !self.is_synched() {
if !self.is_synched() && !self.has_attempted_peer(peer_id) {
self.spawn_initiator_protocol(peer_id).await;
}
},
Expand Down Expand Up @@ -191,7 +194,7 @@ impl MempoolSyncProtocol {
// we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till
// initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the
// initial_sync_num_peers
self.num_synched.store(0, Ordering::SeqCst);
self.peers_attempted.clear();
let connections = match self
.network
.select_random_connections(self.config.initial_sync_num_peers, Default::default())
Expand All @@ -218,7 +221,11 @@ impl MempoolSyncProtocol {
}

fn is_synched(&self) -> bool {
self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers
self.is_done.load(Ordering::SeqCst)
}

fn has_attempted_peer(&self, peer_id: PeerId) -> bool {
self.peers_attempted.contains(&peer_id)
}

fn handle_protocol_notification(&mut self, notification: ProtocolNotification<Substream>) {
Expand All @@ -232,20 +239,23 @@ impl MempoolSyncProtocol {
async fn spawn_initiator_protocol(&mut self, peer_id: PeerId) {
let mempool = self.mempool.clone();
let permits = self.permits.clone();
let num_synched = self.num_synched.clone();
let is_done = self.is_done.clone();
let config = self.config.clone();
let network = self.network.clone();
let num_synced = self.peers_attempted.len();
self.peers_attempted.insert(peer_id);
task::spawn(async move {
// Only initiate this protocol with a single peer at a time
let _permit = permits.acquire().await;
if num_synched.load(Ordering::SeqCst) >= config.initial_sync_num_peers {
if is_done.load(Ordering::SeqCst) {
return;
}
match network
.open_framed_substream(peer_id, &MEMPOOL_SYNC_PROTOCOL, MAX_FRAME_SIZE)
.await
{
Ok(framed) => {
let initial_sync_num_peers = config.initial_sync_num_peers;
let protocol = MempoolPeerProtocol::new(config, framed, peer_id, mempool);
match protocol.start_initiator().await {
Ok(_) => {
Expand All @@ -254,7 +264,9 @@ impl MempoolSyncProtocol {
"Mempool initiator protocol completed successfully for peer `{}`",
peer_id,
);
num_synched.fetch_add(1, Ordering::SeqCst);
if num_synced >= initial_sync_num_peers {
is_done.store(true, Ordering::SeqCst);
}
},
Err(err) => {
debug!(
Expand Down

0 comments on commit da8ca40

Please sign in to comment.