diff --git a/Cargo.lock b/Cargo.lock index f1b90da5a3..cc6e0830f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3901,7 +3901,7 @@ dependencies = [ [[package]] name = "libp2p-messaging" -version = "1.7.0-pre.2" +version = "1.8.0-pre.0" dependencies = [ "async-trait", "futures-bounded", @@ -4058,7 +4058,7 @@ dependencies = [ [[package]] name = "libp2p-substream" -version = "1.7.0-pre.2" +version = "1.8.0-pre.0" dependencies = [ "libp2p", "smallvec 1.13.2", @@ -6219,7 +6219,7 @@ dependencies = [ [[package]] name = "proto_builder" -version = "1.7.0-pre.2" +version = "1.8.0-pre.0" dependencies = [ "prost-build 0.13.3", "sha2 0.10.8", @@ -8144,7 +8144,7 @@ dependencies = [ [[package]] name = "tari_network" -version = "1.7.0-pre.2" +version = "1.8.0-pre.0" dependencies = [ "anyhow", "humantime 2.1.0", @@ -8198,7 +8198,7 @@ dependencies = [ [[package]] name = "tari_rpc_framework" -version = "1.7.0-pre.2" +version = "1.8.0-pre.0" dependencies = [ "async-trait", "bitflags 2.6.0", @@ -8223,7 +8223,7 @@ dependencies = [ [[package]] name = "tari_rpc_macros" -version = "1.7.0-pre.2" +version = "1.8.0-pre.0" dependencies = [ "proc-macro2", "quote", @@ -8288,7 +8288,7 @@ dependencies = [ [[package]] name = "tari_swarm" -version = "1.7.0-pre.2" +version = "1.8.0-pre.0" dependencies = [ "libp2p", "libp2p-messaging", diff --git a/Cargo.toml b/Cargo.toml index ba8a5ab5c3..7b5819f882 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ authors = ["The Tari Development Community"] repository = "https://github.com/tari-project/tari" license = "BSD-3-Clause" -version = "1.7.0-pre.2" +version = "1.8.0-pre.0" edition = "2021" [workspace] diff --git a/applications/minotari_console_wallet/log4rs_sample.yml b/applications/minotari_console_wallet/log4rs_sample.yml index db31adea0d..7efb92e618 100644 --- a/applications/minotari_console_wallet/log4rs_sample.yml +++ b/applications/minotari_console_wallet/log4rs_sample.yml @@ -97,6 +97,13 @@ appenders: encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}" + # An appender named "fail2ban" that writes to a file with a custom pattern encoder + fail2ban: + kind: file + path: "{{log_dir}}/log/wallet/fail2ban.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}" + # root (to base_layer) root: level: debug @@ -174,3 +181,10 @@ loggers: appenders: - other additive: false + # fail2ban logs + fail2ban: + level: info + appenders: + - fail2ban + additive: false + diff --git a/applications/minotari_console_wallet/src/ui/components/network_tab.rs b/applications/minotari_console_wallet/src/ui/components/network_tab.rs index d8b4a0ba3d..5c15efe0ce 100644 --- a/applications/minotari_console_wallet/src/ui/components/network_tab.rs +++ b/applications/minotari_console_wallet/src/ui/components/network_tab.rs @@ -126,7 +126,7 @@ impl NetworkTab { .heading_style(Style::default().fg(Color::Magenta)) .max_width(MAX_WIDTH) .add_column(Some("Type"), Some(17), column0_items) - .add_column(Some("NodeID"), Some(57), column1_items) + .add_column(Some("PeerID"), Some(57), column1_items) .add_column(Some("Public Key"), Some(65), column2_items); column_list.render(f, areas[1], &mut base_node_list_state); } @@ -215,7 +215,7 @@ impl NetworkTab { let column_list = MultiColumnList::new() .heading_style(Style::default().fg(Color::Magenta)) .max_width(MAX_WIDTH) - .add_column(Some("NodeID"), Some(27), column0_items) + .add_column(Some("PeerID"), Some(53), column0_items) .add_column(Some("Public Key"), Some(65), column1_items) .add_column(Some("User Agent"), Some(MAX_WIDTH.saturating_sub(93)), column2_items); column_list.render(f, list_areas[0], &mut ListState::default()); diff --git a/applications/minotari_node/log4rs_sample.yml b/applications/minotari_node/log4rs_sample.yml index 0fb98ea70b..8005e3867d 100644 --- a/applications/minotari_node/log4rs_sample.yml +++ b/applications/minotari_node/log4rs_sample.yml @@ -104,6 +104,13 @@ appenders: encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n} // {f}:{L} " + # An appender named "fail2ban" that writes to a file with a custom pattern encoder + fail2ban: + kind: file + path: "{{log_dir}}/log/base_node/fail2ban.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n}" + # Set the default logging level to "info" root: level: warn @@ -195,3 +202,9 @@ loggers: level: warn appenders: - message_logging + # fail2ban logs + fail2ban: + level: info + appenders: + - fail2ban + additive: false \ No newline at end of file diff --git a/applications/minotari_node/src/commands/command/add_peer.rs b/applications/minotari_node/src/commands/command/add_peer.rs index d55fd33314..01cc4bb2b7 100644 --- a/applications/minotari_node/src/commands/command/add_peer.rs +++ b/applications/minotari_node/src/commands/command/add_peer.rs @@ -20,6 +20,8 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use std::time::Instant; + use anyhow::Error; use async_trait::async_trait; use clap::Parser; @@ -45,10 +47,18 @@ impl HandleCommand for CommandContext { if *self.network.local_peer_id() == peer_id { return Err(Error::msg("Cannot add self as peer")); } - self.network + let timer = Instant::now(); + let dial = self + .network .dial_peer(DialOpts::peer_id(peer_id).addresses(vec![args.address]).build()) .await?; - println!("Peer with node id '{}' was added to the base node.", peer_id); + println!("Peer with node id '{}' was added to the base node. Dialing...", peer_id); + + match dial.await { + Ok(_) => println!("⚑️ Peer connected in {}ms!", timer.elapsed().as_millis()), + Err(err) => println!("☠️ {}", err), + } + Ok(()) } } diff --git a/applications/minotari_node/src/commands/command/whoami.rs b/applications/minotari_node/src/commands/command/whoami.rs index 3c9e7fb157..3b1684b680 100644 --- a/applications/minotari_node/src/commands/command/whoami.rs +++ b/applications/minotari_node/src/commands/command/whoami.rs @@ -24,46 +24,66 @@ use anyhow::Error; use async_trait::async_trait; use clap::Parser; use qrcode::{render::unicode, QrCode}; -use tari_network::multiaddr::{Multiaddr, Protocol}; +use tari_network::GlobalIp; use super::{CommandContext, HandleCommand}; /// Display identity information about this node, /// including: public key, node ID and the public address #[derive(Debug, Parser)] -pub struct Args {} +pub struct Args { + /// Number of addresses to show + #[clap(default_value_t = 5)] + num_show_addrs: usize, +} #[async_trait] impl HandleCommand for CommandContext { - async fn handle_command(&mut self, _: Args) -> Result<(), Error> { - self.whoami().await + async fn handle_command(&mut self, args: Args) -> Result<(), Error> { + self.whoami(args).await } } impl CommandContext { /// Function to process the whoami command - pub async fn whoami(&self) -> Result<(), Error> { + pub async fn whoami(&self, args: Args) -> Result<(), Error> { let peer_info = self.network.get_local_peer_info().await?; - let peer = format!( - "{}::{}", - peer_info.public_key.try_into_sr25519()?.inner_key(), - peer_info - .listen_addrs - .iter() - .filter(|addr| !is_loopback(addr)) - .map(|addr| addr.to_string()) - .collect::>() - .join("::") - ); - println!("{}", peer); + let pk = peer_info.public_key.try_into_sr25519()?; + let num_addrs = peer_info.listen_addrs.len(); + let (global, local) = peer_info + .listen_addrs + .iter() + .partition::, _>(|addr| addr.is_global_ip()); + let qr_addresses = global + .iter() + .chain(Some(&local).filter(|_| !global.is_empty()).into_iter().flatten()) + .take(args.num_show_addrs) + .map(ToString::to_string) + .collect::>() + .join("::"); + + let peer_str = format!("{}::{}", pk.inner_key(), qr_addresses); + + println!("πŸ”‘ Public Key: {}", pk.inner_key()); + println!("πŸͺͺ Peer ID: {}", peer_info.peer_id); + println!("🏠️ Addresses ({num_addrs})"); + for addr in global.into_iter().chain(local).take(args.num_show_addrs) { + println!("- {addr}"); + } + if num_addrs > 0 && num_addrs > args.num_show_addrs { + println!("{} more...", num_addrs - args.num_show_addrs); + } + + println!(); + println!("{peer_str}"); println!(); let network = self.config.network(); let qr_link = format!( - "tari://{}/base_nodes/add?name={}&peer={}", - network, peer_info.peer_id, peer + "tari://{}/base_nodes/add?name={}&peer_str={}", + network, peer_info.peer_id, peer_str ); - let code = QrCode::new(qr_link).unwrap(); + let code = QrCode::new(qr_link)?; let image = code .render::() .dark_color(unicode::Dense1x2::Dark) @@ -76,11 +96,3 @@ impl CommandContext { Ok(()) } } - -fn is_loopback(addr: &Multiaddr) -> bool { - match addr.iter().next() { - Some(Protocol::Ip4(ip)) => ip.is_loopback(), - Some(Protocol::Ip6(ip)) => ip.is_loopback(), - _ => false, - } -} diff --git a/base_layer/wallet_ffi/src/error.rs b/base_layer/wallet_ffi/src/error.rs index 65538c55e0..7df7941035 100644 --- a/base_layer/wallet_ffi/src/error.rs +++ b/base_layer/wallet_ffi/src/error.rs @@ -559,6 +559,10 @@ impl From for LibWalletError { code: 912, message: value.to_string(), }, + NetworkError::RefuseDialPeerBanned { .. } => Self { + code: 913, + message: value.to_string(), + }, } } } diff --git a/network/core/src/error.rs b/network/core/src/error.rs index e680e09cb3..5667d25e4d 100644 --- a/network/core/src/error.rs +++ b/network/core/src/error.rs @@ -65,6 +65,8 @@ pub enum NetworkError { MessagingDisabled, #[error("Failed to add peer: {details}")] FailedToAddPeer { details: String }, + #[error("Refusing to dial peer {peer_id} because it is banned")] + RefuseDialPeerBanned { peer_id: PeerId }, } impl From for NetworkError { diff --git a/network/core/src/global_ip.rs b/network/core/src/global_ip.rs index 16fbd1c60c..079e8759d0 100644 --- a/network/core/src/global_ip.rs +++ b/network/core/src/global_ip.rs @@ -5,7 +5,7 @@ use tari_swarm::libp2p::{multiaddr::Protocol, Multiaddr}; -pub(crate) trait GlobalIp { +pub trait GlobalIp { fn is_global_ip(&self) -> bool; } diff --git a/network/core/src/handle.rs b/network/core/src/handle.rs index 95f3dca79a..c2b711c39e 100644 --- a/network/core/src/handle.rs +++ b/network/core/src/handle.rs @@ -68,7 +68,7 @@ pub(super) type Reply = oneshot::Sender>; pub enum NetworkingRequest { DialPeer { dial_opts: DialOpts, - reply_tx: Reply>, + reply: Reply>, }, DisconnectPeer { peer_id: PeerId, @@ -77,7 +77,7 @@ pub enum NetworkingRequest { PublishGossip { topic: IdentTopic, message: Vec, - reply_tx: Reply<()>, + reply: Reply<()>, }, SubscribeTopic { topic: IdentTopic, @@ -86,16 +86,16 @@ pub enum NetworkingRequest { }, UnsubscribeTopic { topic: IdentTopic, - reply_tx: Reply<()>, + reply: Reply<()>, }, IsSubscribedTopic { topic: IdentTopic, - reply_tx: Reply, + reply: Reply, }, OpenSubstream { peer_id: PeerId, protocol_id: StreamProtocol, - reply_tx: Reply>, + reply: Reply>, }, AddProtocolNotifier { protocols: HashSet, @@ -106,13 +106,13 @@ pub enum NetworkingRequest { limit: Option, randomize: bool, exclude_peers: HashSet, - reply_tx: Reply>, + reply: Reply>, }, GetAveragePeerLatency { reply: Reply, }, GetLocalPeerInfo { - reply_tx: Reply, + reply: Reply, }, SetWantPeers(HashSet), AddPeer { @@ -206,7 +206,7 @@ impl NetworkHandle { self.tx_request .send(NetworkingRequest::IsSubscribedTopic { topic: IdentTopic::new(topic), - reply_tx: tx, + reply: tx, }) .await .map_err(|_| NetworkingHandleError::ServiceHasShutdown)?; @@ -235,12 +235,12 @@ impl NetworkHandle { peer_id: PeerId, protocol_id: &StreamProtocol, ) -> Result, NetworkError> { - let (reply_tx, reply_rx) = oneshot::channel(); + let (reply, reply_rx) = oneshot::channel(); self.tx_request .send(NetworkingRequest::OpenSubstream { peer_id, protocol_id: protocol_id.clone(), - reply_tx, + reply, }) .await?; reply_rx @@ -293,7 +293,7 @@ impl NetworkHandle { limit, randomize, exclude_peers, - reply_tx: tx, + reply: tx, }) .await?; rx.await? @@ -302,7 +302,7 @@ impl NetworkHandle { pub async fn get_local_peer_info(&self) -> Result { let (tx, rx) = oneshot::channel(); self.tx_request - .send(NetworkingRequest::GetLocalPeerInfo { reply_tx: tx }) + .send(NetworkingRequest::GetLocalPeerInfo { reply: tx }) .await .map_err(|_| NetworkingHandleError::ServiceHasShutdown)?; rx.await? @@ -336,7 +336,7 @@ impl NetworkHandle { .send(NetworkingRequest::PublishGossip { topic: IdentTopic::new(topic), message, - reply_tx: tx, + reply: tx, }) .await .map_err(|_| NetworkingHandleError::ServiceHasShutdown)?; @@ -373,7 +373,7 @@ impl NetworkHandle { self.tx_request .send(NetworkingRequest::UnsubscribeTopic { topic: IdentTopic::new(topic), - reply_tx: tx, + reply: tx, }) .await .map_err(|_| NetworkingHandleError::ServiceHasShutdown)?; @@ -485,7 +485,7 @@ impl NetworkingService for NetworkHandle { self.tx_request .send(NetworkingRequest::DialPeer { dial_opts: dial_opts.into(), - reply_tx: tx, + reply: tx, }) .await .map_err(|_| NetworkingHandleError::ServiceHasShutdown)?; diff --git a/network/core/src/lib.rs b/network/core/src/lib.rs index 9e08e06e21..a9b2c81f88 100644 --- a/network/core/src/lib.rs +++ b/network/core/src/lib.rs @@ -26,6 +26,7 @@ pub use autonat::*; pub use config::*; pub use connection::*; pub use event::*; +pub use global_ip::*; pub use gossip::*; pub use handle::*; pub use message::*; diff --git a/network/core/src/messaging.rs b/network/core/src/messaging.rs index 02ad24376d..2a85157c52 100644 --- a/network/core/src/messaging.rs +++ b/network/core/src/messaging.rs @@ -16,12 +16,12 @@ pub enum MessagingRequest { SendMessage { peer: PeerId, message: TMsg::Message, - reply_tx: Reply<()>, + reply: Reply<()>, }, SendMulticast { destination: MulticastDestination, message: TMsg::Message, - reply_tx: Reply, + reply: Reply, }, } @@ -47,7 +47,7 @@ impl OutboundMessager for OutboundMessaging { .send(MessagingRequest::SendMessage { peer, message: message.into(), - reply_tx: tx, + reply: tx, }) .await .map_err(|_| NetworkingHandleError::ServiceHasShutdown)?; @@ -64,7 +64,7 @@ impl OutboundMessager for OutboundMessaging { .send(MessagingRequest::SendMulticast { destination: dest.into(), message: message.into(), - reply_tx: tx, + reply: tx, }) .await .map_err(|_| NetworkingHandleError::ServiceHasShutdown)?; diff --git a/network/core/src/worker.rs b/network/core/src/worker.rs index f1df3e3380..3744ec8fbc 100644 --- a/network/core/src/worker.rs +++ b/network/core/src/worker.rs @@ -72,6 +72,7 @@ use crate::{ }; const LOG_TARGET: &str = "network::service::worker"; +const LOG_TARGET_FAIL2BAN: &str = "fail2ban"; type ReplyTx = oneshot::Sender>; @@ -187,9 +188,7 @@ where loop { tokio::select! { Some(request) = self.rx_request.recv() => { - if let Err(err) = self.handle_request(request).await { - error!(target: LOG_TARGET, "Error handling request: {err}"); - } + self.handle_request(request).await; } Some(request) = self.rx_msg_request.recv() => { self.handle_messaging_request(request).await; @@ -228,9 +227,17 @@ where } #[allow(clippy::too_many_lines)] - async fn handle_request(&mut self, request: NetworkingRequest) -> Result<(), NetworkError> { + async fn handle_request(&mut self, request: NetworkingRequest) { match request { - NetworkingRequest::DialPeer { dial_opts, reply_tx } => { + NetworkingRequest::DialPeer { dial_opts, reply } => { + if let Some(peer_id) = dial_opts.get_peer_id() { + if self.is_banned(&peer_id) { + warn!(target: LOG_TARGET, "⚠️ Attempt to dial banned peer {peer_id}. Refusing request."); + let _ignore = reply.send(Err(NetworkError::RefuseDialPeerBanned { peer_id })); + return; + } + } + let (tx_waiter, rx_waiter) = oneshot::channel(); let maybe_peer_id = dial_opts.get_peer_id(); info!(target: LOG_TARGET, "🀝 Dialing peer {:?}", dial_opts); @@ -240,69 +247,67 @@ where if let Some(peer_id) = maybe_peer_id { self.pending_dial_requests.entry(peer_id).or_default().push(tx_waiter); } - let _ignore = reply_tx.send(Ok(rx_waiter.into())); + let _ignore = reply.send(Ok(rx_waiter.into())); }, Err(err) => { info!(target: LOG_TARGET, "🚨 Failed to dial peer: {}", err); - let _ignore = reply_tx.send(Err(err.into())); + let _ignore = reply.send(Err(err.into())); }, } }, NetworkingRequest::DisconnectPeer { peer_id, reply } => { let _ignore = reply.send(Ok(self.swarm.disconnect_peer_id(peer_id).is_ok())); }, - NetworkingRequest::PublishGossip { - topic, - message, - reply_tx, - } => match self.swarm.behaviour_mut().gossipsub.publish(topic, message) { - Ok(msg_id) => { - debug!(target: LOG_TARGET, "πŸ“’ Published gossipsub message: {}", msg_id); - let _ignore = reply_tx.send(Ok(())); - }, - Err(err) => { - if matches!(err, PublishError::Duplicate) { - debug!(target: LOG_TARGET, "Published duplicate: {}", err); - } else { - error!(target: LOG_TARGET, "🚨 Failed to publish gossip message: {}", err); - } - let _ignore = reply_tx.send(Err(err.into())); - }, + NetworkingRequest::PublishGossip { topic, message, reply } => { + match self.swarm.behaviour_mut().gossipsub.publish(topic, message) { + Ok(msg_id) => { + debug!(target: LOG_TARGET, "πŸ“’ Published gossipsub message: {}", msg_id); + let _ignore = reply.send(Ok(())); + }, + Err(err) => { + if matches!(err, PublishError::Duplicate) { + debug!(target: LOG_TARGET, "Published duplicate: {}", err); + } else { + error!(target: LOG_TARGET, "🚨 Failed to publish gossip message: {}", err); + } + let _ignore = reply.send(Err(err.into())); + }, + } }, NetworkingRequest::SubscribeTopic { topic, inbound, reply } => { let result = self.gossipsub_subscribe_topic(topic, inbound); let _ignore = reply.send(result.map(|_| self.gossipsub_outbound_tx.clone())); }, - NetworkingRequest::UnsubscribeTopic { topic, reply_tx } => { + NetworkingRequest::UnsubscribeTopic { topic, reply } => { self.gossipsub_subscriptions.remove(&topic.hash()); match self.swarm.behaviour_mut().gossipsub.unsubscribe(&topic) { Ok(_) => { debug!(target: LOG_TARGET, "πŸ“’ Unsubscribed from gossipsub topic: {}", topic); - let _ignore = reply_tx.send(Ok(())); + let _ignore = reply.send(Ok(())); }, Err(err) => { error!(target: LOG_TARGET, "🚨 Failed to unsubscribe from gossipsub topic: {}", err); - let _ignore = reply_tx.send(Err(err.into())); + let _ignore = reply.send(Err(err.into())); }, } }, - NetworkingRequest::IsSubscribedTopic { topic, reply_tx } => { + NetworkingRequest::IsSubscribedTopic { topic, reply } => { let hash = topic.hash(); let found = self.swarm.behaviour_mut().gossipsub.topics().any(|t| *t == hash); - let _ignore = reply_tx.send(Ok(found)); + let _ignore = reply.send(Ok(found)); }, NetworkingRequest::OpenSubstream { peer_id, protocol_id, - reply_tx, + reply, } => { let stream_id = self .swarm .behaviour_mut() .substream .open_substream(peer_id, protocol_id.clone()); - self.pending_substream_requests.insert(stream_id, reply_tx); + self.pending_substream_requests.insert(stream_id, reply); }, NetworkingRequest::AddProtocolNotifier { protocols, tx_notifier } => { for protocol in protocols { @@ -315,7 +320,7 @@ where limit, randomize, exclude_peers: excluded_peers, - reply_tx, + reply, } => { let iter = self .active_connections @@ -332,9 +337,9 @@ where iter.collect() }; - let _ignore = reply_tx.send(Ok(connections)); + let _ignore = reply.send(Ok(connections)); }, - NetworkingRequest::GetLocalPeerInfo { reply_tx } => { + NetworkingRequest::GetLocalPeerInfo { reply } => { let peer = crate::peer::PeerInfo { peer_id: *self.swarm.local_peer_id(), public_key: self.keypair.public(), @@ -345,7 +350,7 @@ where protocols: self.swarm.behaviour_mut().substream.supported_protocols().to_vec(), // observed_addr: (), }; - let _ignore = reply_tx.send(Ok(peer)); + let _ignore = reply.send(Ok(peer)); }, NetworkingRequest::GetAveragePeerLatency { reply } => { let iter = self @@ -359,7 +364,9 @@ where }, NetworkingRequest::SetWantPeers(peers) => { info!(target: LOG_TARGET, "🧭 Setting want peers to {:?}", peers); - self.swarm.behaviour_mut().peer_sync.want_peers(peers).await?; + if let Err(err) = self.swarm.behaviour_mut().peer_sync.want_peers(peers).await { + error!(target: LOG_TARGET, "Failed to set want peers: {err}"); + } }, NetworkingRequest::AddPeer { peer, reply } => { info!(target: LOG_TARGET, "Adding {peer}"); @@ -367,7 +374,7 @@ where let _ignore = reply.send(Err(NetworkError::FailedToAddPeer { details: format!("AddPeer: No addresses provided for peer {}. Nothing to do.", peer), })); - return Ok(()); + return; } let num_addresses = peer.addresses().len(); let peer_id = peer.peer_id(); @@ -405,9 +412,10 @@ where reply, } => { info!(target: LOG_TARGET, "🎯Banning peer {peer_id} for {ban_duration:?}: {reason}"); + self.fail2ban_ban(&peer_id, ban_duration, &reason); if self.allow_list.contains(&peer_id) { info!(target: LOG_TARGET, "Not banning peer because it is on the allow list"); - return Ok(()); + return; } // TODO: mark the peer as banned and prevent connections,messages from coming through @@ -427,6 +435,7 @@ where }, NetworkingRequest::UnbanPeer { peer_id, reply } => match self.ban_list.remove(&peer_id) { Some(peer) => { + self.fail2ban_unban(&peer_id); let _ignore = reply.send(Ok(peer.is_banned())); shrink_hashmap_if_required(&mut self.ban_list); }, @@ -490,17 +499,11 @@ where let _ignore = reply.send(Ok(self.seed_peers.clone())); }, } - - Ok(()) } async fn handle_messaging_request(&mut self, request: MessagingRequest) { match request { - MessagingRequest::SendMessage { - peer, - message, - reply_tx, - } => { + MessagingRequest::SendMessage { peer, message, reply } => { match self .swarm .behaviour_mut() @@ -510,27 +513,27 @@ where { Some(Ok(_)) => { debug!(target: LOG_TARGET, "πŸ“’ Queued message to peer {}", peer); - let _ignore = reply_tx.send(Ok(())); + let _ignore = reply.send(Ok(())); }, Some(Err(err)) => { debug!(target: LOG_TARGET, "🚨 Failed to queue message to peer {}: {}", peer, err); - let _ignore = reply_tx.send(Err(err.into())); + let _ignore = reply.send(Err(err.into())); }, None => { warn!(target: LOG_TARGET, "Sent message but messaging is disabled"); - let _ignore = reply_tx.send(Err(NetworkError::MessagingDisabled)); + let _ignore = reply.send(Err(NetworkError::MessagingDisabled)); }, } }, MessagingRequest::SendMulticast { destination, message, - reply_tx, + reply, } => { let len = destination.len(); let Some(messaging_mut) = &mut self.swarm.behaviour_mut().messaging.as_mut() else { warn!(target: LOG_TARGET, "Sent multicast message but messaging is disabled"); - let _ignore = reply_tx.send(Err(NetworkError::MessagingDisabled)); + let _ignore = reply.send(Err(NetworkError::MessagingDisabled)); return; }; @@ -546,7 +549,7 @@ where } } debug!(target: LOG_TARGET, "πŸ“’ Queued message to {num_sent} out of {len} peers"); - let _ignore = reply_tx.send(Ok(num_sent)); + let _ignore = reply.send(Ok(num_sent)); }, } } @@ -999,6 +1002,14 @@ where established_in ); + if self.is_banned(&peer_id) { + warn!(target: LOG_TARGET, "Banned peer {peer_id} reconnected. Disconnecting."); + if self.swarm.disconnect_peer_id(peer_id).is_err() { + debug!(target: LOG_TARGET, "Banned peer not connected when disconnecting it due to ban"); + } + return Ok(()); + } + if let Some(relay) = self.relays.selected_relay_mut() { if endpoint.is_dialer() && relay.peer_id == peer_id { relay.dialled_address = Some(endpoint.get_remote_address().clone()); @@ -1237,6 +1248,30 @@ where debug!(target: LOG_TARGET, "πŸ“’ Published networking event to {num} subscribers"); } } + + fn is_banned(&self, peer_id: &PeerId) -> bool { + self.ban_list.get(peer_id).map_or(false, |p| p.is_banned()) + } + + fn fail2ban_ban(&self, peer_id: &PeerId, duration: Option, reason: &str) { + if let Some(conns) = self.active_connections.get(peer_id) { + for conn in conns { + let remote_addr = conn.endpoint.get_remote_address(); + if let Some((protocol, port)) = extract_protocol_and_port(remote_addr) { + let params = duration + // NOTE: not sure if fail2ban can be configured with a dynamic bantime + .map(|ban_time| format!(r#", bantime="{}s""#, ban_time.as_secs())) + .unwrap_or_default(); + // NOTE: Not sure if the "name" and "chain" should be configurable + info!(target: LOG_TARGET_FAIL2BAN, r#"FAIL2BAN: actionban[name="tarinetwork", peer={peer_id}, addr={remote_addr}, port="{port}", protocol="{protocol}", chain="INPUT"{params}, reason="{reason}"]"#); + } + } + } + } + + fn fail2ban_unban(&self, peer_id: &PeerId) { + info!(target: LOG_TARGET_FAIL2BAN, r#"FAIL2BAN: actionunban[name="tarinetwork", peer={peer_id}, chain="INPUT"]"#); + } } fn is_p2p_address(address: &Multiaddr) -> bool { @@ -1274,3 +1309,19 @@ where K: Eq + Hash { map.shrink_to_fit(); } } + +fn extract_protocol_and_port(addr: &Multiaddr) -> Option<(&'static str, u16)> { + let mut iter = addr.iter(); + match iter.next()? { + Protocol::Ip4(_) | Protocol::Ip6(_) => { + // Continue + }, + _ => return None, + } + + match iter.next()? { + Protocol::Tcp(port) => Some(("tcp", port)), + Protocol::Udp(port) => Some(("udp", port)), + _ => None, + } +}