Skip to content

Commit

Permalink
fix(p2p): periodically re-dial to disconnected bootstrap nodes (#3999)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 authored Feb 29, 2024
1 parent d408f57 commit a7eaeb1
Showing 1 changed file with 48 additions and 4 deletions.
52 changes: 48 additions & 4 deletions src/libp2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ pub enum NetRPCMethods {
/// The `Libp2pService` listens to events from the libp2p swarm.
pub struct Libp2pService<DB> {
swarm: Swarm<ForestBehaviour>,
bootstrap_peers: HashMap<PeerId, Multiaddr>,
cs: Arc<ChainStore<DB>>,
peer_manager: Arc<PeerManager>,
network_receiver_in: flume::Receiver<NetworkMessage>,
Expand Down Expand Up @@ -247,8 +248,18 @@ where
anyhow::bail!("p2p peer failed to listen on any network endpoints");
}

let bootstrap_peers = config
.bootstrap_peers
.iter()
.filter_map(|ma| match ma.iter().last() {
Some(Protocol::P2p(peer)) => Some((peer, ma.clone())),
_ => None,
})
.collect();

Ok(Libp2pService {
swarm,
bootstrap_peers,
cs,
peer_manager,
network_receiver_in,
Expand Down Expand Up @@ -285,6 +296,15 @@ where
bitswap_request_manager.outbound_request_stream().fuse();
let mut peer_ops_rx_stream = self.peer_manager.peer_ops_rx().stream().fuse();
let metrics = Metrics::new(&mut crate::metrics::default_registry());

const BOOTSTRAP_PEER_DIALER_INTERVAL: tokio::time::Duration =
tokio::time::Duration::from_secs(60);
let mut bootstrap_peer_dialer_interval_stream =
IntervalStream::new(tokio::time::interval_at(
tokio::time::Instant::now() + BOOTSTRAP_PEER_DIALER_INTERVAL,
BOOTSTRAP_PEER_DIALER_INTERVAL,
))
.fuse();
loop {
select! {
swarm_event = swarm_stream.next() => match swarm_event {
Expand Down Expand Up @@ -339,9 +359,12 @@ where
}
peer_ops_opt = peer_ops_rx_stream.next() => {
if let Some(peer_ops) = peer_ops_opt {
handle_peer_ops(swarm_stream.get_mut(), peer_ops);
handle_peer_ops(swarm_stream.get_mut(), peer_ops, &self.bootstrap_peers);
}
},
_ = bootstrap_peer_dialer_interval_stream.next() => {
dial_to_bootstrap_peers_if_needed(swarm_stream.get_mut(), &self.bootstrap_peers);
}
};
}
Ok(())
Expand All @@ -358,12 +381,33 @@ where
}
}

fn handle_peer_ops(swarm: &mut Swarm<ForestBehaviour>, peer_ops: PeerOperation) {
fn dial_to_bootstrap_peers_if_needed(
swarm: &mut Swarm<ForestBehaviour>,
bootstrap_peers: &HashMap<PeerId, Multiaddr>,
) {
for (peer, ma) in bootstrap_peers {
if !swarm.behaviour().peers().contains(peer) {
info!("Re-dialing to bootstrap peer at {ma}");
if let Err(e) = swarm.dial(ma.clone()) {
warn!("{e}");
}
}
}
}

fn handle_peer_ops(
swarm: &mut Swarm<ForestBehaviour>,
peer_ops: PeerOperation,
bootstrap_peers: &HashMap<PeerId, Multiaddr>,
) {
use PeerOperation::*;
match peer_ops {
Ban(peer_id, reason) => {
warn!("Banning {peer_id}, reason: {reason}");
swarm.behaviour_mut().blocked_peers.block_peer(peer_id);
// Do not ban bootstrap nodes
if !bootstrap_peers.contains_key(&peer_id) {
warn!("Banning {peer_id}, reason: {reason}");
swarm.behaviour_mut().blocked_peers.block_peer(peer_id);
}
}
Unban(peer_id) => {
info!("Unbanning {peer_id}");
Expand Down

0 comments on commit a7eaeb1

Please sign in to comment.