diff --git a/Cargo.lock b/Cargo.lock index aa1bcdb443..34d0bff44a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3901,7 +3901,7 @@ dependencies = [ [[package]] name = "libp2p-messaging" -version = "1.7.0-pre.0" +version = "1.7.0-pre.2" dependencies = [ "async-trait", "futures-bounded", @@ -4058,7 +4058,7 @@ dependencies = [ [[package]] name = "libp2p-substream" -version = "1.7.0-pre.0" +version = "1.7.0-pre.2" dependencies = [ "libp2p", "smallvec 1.13.2", @@ -6218,7 +6218,7 @@ dependencies = [ [[package]] name = "proto_builder" -version = "1.7.0-pre.0" +version = "1.7.0-pre.2" dependencies = [ "prost-build 0.13.3", "sha2 0.10.8", @@ -8143,7 +8143,7 @@ dependencies = [ [[package]] name = "tari_network" -version = "1.7.0-pre.0" +version = "1.7.0-pre.2" dependencies = [ "anyhow", "humantime 2.1.0", @@ -8198,7 +8198,7 @@ dependencies = [ [[package]] name = "tari_rpc_framework" -version = "1.7.0-pre.0" +version = "1.7.0-pre.2" dependencies = [ "async-trait", "bitflags 2.6.0", @@ -8223,7 +8223,7 @@ dependencies = [ [[package]] name = "tari_rpc_macros" -version = "1.7.0-pre.0" +version = "1.7.0-pre.2" dependencies = [ "proc-macro2", "quote", @@ -8288,7 +8288,7 @@ dependencies = [ [[package]] name = "tari_swarm" -version = "1.7.0-pre.0" +version = "1.7.0-pre.2" dependencies = [ "libp2p", "libp2p-messaging", diff --git a/Cargo.toml b/Cargo.toml index 72df515efc..ba8a5ab5c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ authors = ["The Tari Development Community"] repository = "https://github.com/tari-project/tari" license = "BSD-3-Clause" -version = "1.7.0-pre.0" +version = "1.7.0-pre.2" edition = "2021" [workspace] diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index d996263e3a..add0b2fe73 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -100,7 +100,7 @@ where B: BlockchainBackend + 'static let dispatcher = Dispatcher::new(); let user_agent = format!("tari/basenode/{}", consts::APP_VERSION_NUMBER); - let mut handles = StackBuilder::new(self.interrupt_signal) + let handles = StackBuilder::new(self.interrupt_signal) .add_initializer(P2pInitializer::new( p2p_config.clone(), user_agent, diff --git a/applications/minotari_node/src/builder.rs b/applications/minotari_node/src/builder.rs index 70400ceb4f..86b28e4e83 100644 --- a/applications/minotari_node/src/builder.rs +++ b/applications/minotari_node/src/builder.rs @@ -42,12 +42,12 @@ use tari_core::{ }, OutputSmt, }; -use tari_network::{identity, NetworkHandle}; -use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle}; +use tari_network::{identity, NetworkError, NetworkHandle}; +use tari_p2p::{auto_update::SoftwareUpdaterHandle, initialization::TaskHandle, services::liveness::LivenessHandle}; use tari_rpc_framework::RpcServerHandle; use tari_service_framework::ServiceHandles; use tari_shutdown::ShutdownSignal; -use tokio::sync::watch; +use tokio::{sync::watch, task}; use crate::{bootstrap::BaseNodeBootstrapper, ApplicationConfig, DatabaseType}; @@ -91,6 +91,12 @@ impl BaseNodeContext { &self.network } + /// Returns the task JoinHandle for the network worker. This will only return Some once. + pub fn take_network_join_handle(&self) -> Option>> { + let handle = self.base_node_handles.take_handle::>()?; + Some(handle.into_inner()) + } + /// Returns the liveness service handle pub fn liveness(&self) -> LivenessHandle { self.base_node_handles.expect_handle() diff --git a/applications/minotari_node/src/commands/cli_loop.rs b/applications/minotari_node/src/commands/cli_loop.rs index a3cd47b7f5..34c382c93f 100644 --- a/applications/minotari_node/src/commands/cli_loop.rs +++ b/applications/minotari_node/src/commands/cli_loop.rs @@ -1,7 +1,7 @@ // Copyright 2022 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{io, time::Duration}; +use std::{future, io, time::Duration}; use crossterm::{ cursor, @@ -9,9 +9,11 @@ use crossterm::{ terminal, }; use futures::{FutureExt, StreamExt}; +use log::{error, info}; use rustyline::{config::OutputStreamType, error::ReadlineError, CompletionType, Config, EditMode, Editor}; +use tari_network::NetworkError; use tari_shutdown::ShutdownSignal; -use tokio::{signal, time}; +use tokio::{signal, task::JoinError, time}; use crate::{ commands::{ @@ -119,6 +121,19 @@ impl CliLoop { if let Some(command) = self.watch_task.take() { let mut interrupt = signal::ctrl_c().fuse().boxed(); let mut software_update_notif = self.context.software_updater.update_notifier().clone(); + let mut network_handle = self.context.take_network_join_handle(); + // Need to check this before we do anything in handle_command_str + if let Some(handle) = network_handle { + if handle.is_finished() { + log_networking_handle_result(handle.await); + return; + } + network_handle = Some(handle); + } + let mut network = network_handle + .map(|fut| fut.boxed()) + .unwrap_or_else(|| future::pending().boxed()); + let config = self.context.config.clone(); let line = command.line(); let interval = command @@ -146,6 +161,10 @@ impl CliLoop { break; } } + result = &mut network => { + log_networking_handle_result(result); + break; + }, Ok(_) = software_update_notif.changed() => { if let Some(ref update) = *software_update_notif.borrow() { println!( @@ -249,3 +268,21 @@ impl CliLoop { } } } + +fn log_networking_handle_result(result: Result, JoinError>) { + match result { + Ok(Ok(_)) => { + info!(target: LOG_TARGET, "ℹ️ Networking exited cleanly"); + println!("ℹ️ Networking exited cleanly"); + }, + Ok(Err(err)) => { + error!(target: LOG_TARGET, "❗️ Networking exited with an error {err}"); + eprintln!("❗️ Networking exited with an error {err}"); + }, + Err(err) => { + // Panic + error!(target: LOG_TARGET, "❗️ Networking panicked {err}"); + eprintln!("❗️ Networking panicked {err}"); + }, + } +} diff --git a/applications/minotari_node/src/commands/command/mod.rs b/applications/minotari_node/src/commands/command/mod.rs index adfd5ba57f..1686fb0c5f 100644 --- a/applications/minotari_node/src/commands/command/mod.rs +++ b/applications/minotari_node/src/commands/command/mod.rs @@ -76,7 +76,7 @@ use tari_network::{BannedPeer, NetworkError, NetworkHandle}; use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle}; use tari_rpc_framework::RpcServerHandle; use tari_shutdown::Shutdown; -use tokio::{sync::watch, time}; +use tokio::{sync::watch, task, time}; pub use watch_command::WatchCommand; use crate::{ @@ -150,6 +150,7 @@ pub struct CommandContext { blockchain_db: AsyncBlockchainDb, rpc_server: RpcServerHandle, network: NetworkHandle, + network_join_handle: Option>>, liveness: LivenessHandle, node_service: LocalNodeCommsInterface, mempool_service: LocalMempoolService, @@ -168,6 +169,7 @@ impl CommandContext { blockchain_db: ctx.blockchain_db().into(), rpc_server: ctx.rpc_server(), network: ctx.network().clone(), + network_join_handle: ctx.take_network_join_handle(), liveness: ctx.liveness(), node_service: ctx.local_node(), mempool_service: ctx.local_mempool(), @@ -179,6 +181,10 @@ impl CommandContext { } } + pub fn take_network_join_handle(&mut self) -> Option>> { + self.network_join_handle.take() + } + pub async fn handle_command_str(&mut self, line: &str) -> Result, Error> { let args: Args = line.parse()?; if let Command::Watch(command) = args.command { diff --git a/base_layer/p2p/src/config.rs b/base_layer/p2p/src/config.rs index 7a7a0bdc77..1075694966 100644 --- a/base_layer/p2p/src/config.rs +++ b/base_layer/p2p/src/config.rs @@ -34,10 +34,7 @@ use tari_common::{ }, SubConfigPath, }; -use tari_network::{ - multiaddr::{multiaddr, Multiaddr}, - ReachabilityMode, -}; +use tari_network::ReachabilityMode; /// Peer seed configuration #[derive(Clone, Debug, Serialize, Deserialize)] @@ -99,8 +96,8 @@ pub struct P2pConfig { /// manually set. pub public_addresses: MultiaddrList, /// The multiaddrs to listen on. - /// Default: ["/ip4/0.0.0.0/tcp/0"] - pub listen_addresses: Vec, + /// Default: ["/ip4/0.0.0.0/tcp/0", "/ip4/0.0.0.0/udp/0/quic"] + pub listen_addresses: MultiaddrList, #[serde( default, deserialize_with = "deserialize_from_str", @@ -122,7 +119,7 @@ impl Default for P2pConfig { Self { override_from: None, public_addresses: MultiaddrList::default(), - listen_addresses: vec![multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16))], + listen_addresses: tari_network::Config::default_listen_addrs().into(), reachability_mode: Default::default(), rpc_max_simultaneous_sessions: 100, rpc_max_sessions_per_peer: 10, diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index 341dcc7bd3..12f987ae7a 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -53,8 +53,8 @@ use tokio::{sync::mpsc, task::JoinHandle}; use crate::{ config::{P2pConfig, PeerSeedsConfig}, connector::InboundMessaging, - message::TariNodeMessageSpec, dns::DnsClientError, + message::TariNodeMessageSpec, peer_seeds::{DnsSeedResolver, SeedPeer}, }; @@ -120,6 +120,7 @@ where MessagingMode::Enabled { tx_messages }, config, seed_peers.into_iter().map(Into::into).collect(), + vec![], shutdown_signal, )?; @@ -137,6 +138,7 @@ pub type P2pHandles = ( pub fn spawn_network( identity: Arc, seed_peers: Vec, + known_relay_peers: Vec, config: tari_network::Config, shutdown_signal: ShutdownSignal, ) -> Result, CommsInitializationError> @@ -151,6 +153,7 @@ where MessagingMode::Enabled { tx_messages }, config, seed_peers.into_iter().map(Into::into).collect(), + known_relay_peers.into_iter().map(Into::into).collect(), shutdown_signal, )?; @@ -317,11 +320,6 @@ impl ServiceInitializer for P2pInitializer { Vec::new() }); - let mut listener_addrs = self.config.listen_addresses.clone(); - if listener_addrs.is_empty() { - listener_addrs = tari_network::Config::default_listen_addrs(); - } - let config = tari_network::Config { swarm: SwarmConfig { protocol_version: format!("/minotari/{}/1.0.0", self.network.as_key_str()).parse()?, @@ -330,20 +328,22 @@ impl ServiceInitializer for P2pInitializer { enable_relay: self.config.enable_relay, ..Default::default() }, - listener_addrs, + listener_addrs: self.config.listen_addresses.to_vec(), reachability_mode: self.config.reachability_mode, check_connections_interval: Duration::from_secs(2 * 60 * 60), known_local_public_address: self.config.public_addresses.to_vec(), }; let shutdown = context.get_shutdown_signal(); - let (network, outbound_messaging, inbound_messaging, _join_handle) = spawn_network::( + let (network, outbound_messaging, inbound_messaging, join_handle) = spawn_network::( self.identity.clone(), seed_peers.into_iter().chain(dns_peers).collect(), + vec![], config, shutdown, )?; + context.register_handle(TaskHandle(join_handle)); context.register_handle(network); context.register_handle(outbound_messaging); context.register_handle(inbound_messaging); @@ -351,3 +351,13 @@ impl ServiceInitializer for P2pInitializer { Ok(()) } } + +/// Wrapper that makes use a join handle with the service framework easier +#[derive(Debug)] +pub struct TaskHandle(JoinHandle>); + +impl TaskHandle { + pub fn into_inner(self) -> JoinHandle> { + self.0 + } +} diff --git a/base_layer/service_framework/src/context/handles.rs b/base_layer/service_framework/src/context/handles.rs index 1eb4bfce10..7df755b7a8 100644 --- a/base_layer/service_framework/src/context/handles.rs +++ b/base_layer/service_framework/src/context/handles.rs @@ -174,7 +174,7 @@ impl ServiceHandles { } /// Take ownership of a handle - pub fn take_handle(&mut self) -> Option { + pub fn take_handle(&self) -> Option { acquire_lock!(self.handles) .remove(&TypeId::of::()) .and_then(|handle| handle.downcast::().ok().map(|h| *h)) diff --git a/base_layer/wallet/src/wallet.rs b/base_layer/wallet/src/wallet.rs index 42dbc64519..0dc6e7a2ab 100644 --- a/base_layer/wallet/src/wallet.rs +++ b/base_layer/wallet/src/wallet.rs @@ -248,7 +248,7 @@ where stack }; - let mut handles = stack.build().await?; + let handles = stack.build().await?; let inbound = handles .take_handle::>() diff --git a/integration_tests/src/base_node_process.rs b/integration_tests/src/base_node_process.rs index 099b8f6b5d..bac1fc658a 100644 --- a/integration_tests/src/base_node_process.rs +++ b/integration_tests/src/base_node_process.rs @@ -32,10 +32,7 @@ use std::{ use minotari_app_utilities::identity_management::save_identity; use minotari_node::{run_base_node, BaseNodeConfig, GrpcMethod, MetricsConfig}; use minotari_node_grpc_client::BaseNodeGrpcClient; -use tari_common::{ - configuration::{CommonConfig, MultiaddrList}, - network_check::set_network_if_choice_valid, -}; +use tari_common::{configuration::CommonConfig, network_check::set_network_if_choice_valid}; use tari_crypto::ristretto::RistrettoPublicKey; use tari_network::{identity, multiaddr::Multiaddr}; use tari_p2p::{auto_update::AutoUpdateConfig, Network, PeerSeedsConfig}; @@ -182,13 +179,9 @@ pub async fn spawn_base_node_with_config( base_node_config.base_node.max_randomx_vms = 1; base_node_config.base_node.lmdb_path = temp_dir_path.to_path_buf(); - base_node_config - .base_node - .p2p - .listen_addresses - .push(format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap()); - base_node_config.base_node.p2p.public_addresses = - MultiaddrList::from(base_node_config.base_node.p2p.listen_addresses.clone()); + base_node_config.base_node.p2p.listen_addresses = + vec![format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap()].into(); + base_node_config.base_node.p2p.public_addresses = base_node_config.base_node.p2p.listen_addresses.clone(); base_node_config.base_node.storage.orphan_storage_capacity = 10; if base_node_config.base_node.storage.pruning_horizon != 0 { base_node_config.base_node.storage.pruning_interval = 1; diff --git a/integration_tests/src/wallet_process.rs b/integration_tests/src/wallet_process.rs index ee35b37844..c1d89a58ba 100644 --- a/integration_tests/src/wallet_process.rs +++ b/integration_tests/src/wallet_process.rs @@ -143,7 +143,7 @@ pub async fn spawn_wallet( .wallet .base_node_service_config .base_node_monitor_max_refresh_interval = Duration::from_secs(15); - wallet_app_config.wallet.p2p.listen_addresses = vec![listen_addr.clone()]; + wallet_app_config.wallet.p2p.listen_addresses = MultiaddrList::from(vec![listen_addr.clone()]); wallet_app_config.wallet.p2p.public_addresses = MultiaddrList::from(vec![listen_addr]); if let Some(mech) = routing_mechanism { wallet_app_config diff --git a/network/core/src/spawn.rs b/network/core/src/spawn.rs index fbee0bcfe0..178251e8e4 100644 --- a/network/core/src/spawn.rs +++ b/network/core/src/spawn.rs @@ -34,6 +34,7 @@ pub fn spawn( messaging_mode: MessagingMode, mut config: crate::Config, seed_peers: Vec, + known_relay_peers: Vec, shutdown_signal: ShutdownSignal, ) -> Result, NetworkError> where @@ -81,7 +82,7 @@ where swarm, config, seed_peers, - vec![], + known_relay_peers, shutdown_signal, ) .run(), diff --git a/network/core/src/worker.rs b/network/core/src/worker.rs index b4566e930c..20a835d5af 100644 --- a/network/core/src/worker.rs +++ b/network/core/src/worker.rs @@ -155,6 +155,9 @@ where } fn listen(&mut self) -> Result<(), NetworkError> { + if self.config.listener_addrs.is_empty() { + info!(target: LOG_TARGET, "ℹ️ No listener addressed specified. The node will not be able to receive inbound connections."); + } for addr in &self.config.listener_addrs { debug!("listening on {addr}"); self.swarm.listen_on(addr.clone())?; @@ -214,6 +217,8 @@ where } } } + warn!(target: LOG_TARGET, "💤 Networking service shutdown"); + Ok(()) }