Skip to content

Commit

Permalink
improve error logging if networking fails in base node
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Oct 16, 2024
1 parent 1f6d9be commit bb470d3
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 45 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = ["The Tari Development Community"]
repository = "https://github.com/tari-project/tari"
license = "BSD-3-Clause"
version = "1.7.0-pre.0"
version = "1.7.0-pre.2"
edition = "2021"

[workspace]
Expand Down
2 changes: 1 addition & 1 deletion applications/minotari_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ where B: BlockchainBackend + 'static

let dispatcher = Dispatcher::new();
let user_agent = format!("tari/basenode/{}", consts::APP_VERSION_NUMBER);
let mut handles = StackBuilder::new(self.interrupt_signal)
let handles = StackBuilder::new(self.interrupt_signal)
.add_initializer(P2pInitializer::new(
p2p_config.clone(),
user_agent,
Expand Down
12 changes: 9 additions & 3 deletions applications/minotari_node/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ use tari_core::{
},
OutputSmt,
};
use tari_network::{identity, NetworkHandle};
use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle};
use tari_network::{identity, NetworkError, NetworkHandle};
use tari_p2p::{auto_update::SoftwareUpdaterHandle, initialization::TaskHandle, services::liveness::LivenessHandle};
use tari_rpc_framework::RpcServerHandle;
use tari_service_framework::ServiceHandles;
use tari_shutdown::ShutdownSignal;
use tokio::sync::watch;
use tokio::{sync::watch, task};

use crate::{bootstrap::BaseNodeBootstrapper, ApplicationConfig, DatabaseType};

Expand Down Expand Up @@ -91,6 +91,12 @@ impl BaseNodeContext {
&self.network
}

/// Returns the task JoinHandle for the network worker. This will only return Some once.
pub fn take_network_join_handle(&self) -> Option<task::JoinHandle<Result<(), NetworkError>>> {
let handle = self.base_node_handles.take_handle::<TaskHandle<NetworkError>>()?;
Some(handle.into_inner())
}

/// Returns the liveness service handle
pub fn liveness(&self) -> LivenessHandle {
self.base_node_handles.expect_handle()
Expand Down
41 changes: 39 additions & 2 deletions applications/minotari_node/src/commands/cli_loop.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
// Copyright 2022 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{io, time::Duration};
use std::{future, io, time::Duration};

use crossterm::{
cursor,
event::{Event, EventStream, KeyCode, KeyEvent, KeyModifiers},
terminal,
};
use futures::{FutureExt, StreamExt};
use log::{error, info};
use rustyline::{config::OutputStreamType, error::ReadlineError, CompletionType, Config, EditMode, Editor};
use tari_network::NetworkError;
use tari_shutdown::ShutdownSignal;
use tokio::{signal, time};
use tokio::{signal, task::JoinError, time};

use crate::{
commands::{
Expand Down Expand Up @@ -119,6 +121,19 @@ impl CliLoop {
if let Some(command) = self.watch_task.take() {
let mut interrupt = signal::ctrl_c().fuse().boxed();
let mut software_update_notif = self.context.software_updater.update_notifier().clone();
let mut network_handle = self.context.take_network_join_handle();
// Need to check this before we do anything in handle_command_str
if let Some(handle) = network_handle {
if handle.is_finished() {
log_networking_handle_result(handle.await);
return;
}
network_handle = Some(handle);
}
let mut network = network_handle
.map(|fut| fut.boxed())
.unwrap_or_else(|| future::pending().boxed());

let config = self.context.config.clone();
let line = command.line();
let interval = command
Expand Down Expand Up @@ -146,6 +161,10 @@ impl CliLoop {
break;
}
}
result = &mut network => {
log_networking_handle_result(result);
break;
},
Ok(_) = software_update_notif.changed() => {
if let Some(ref update) = *software_update_notif.borrow() {
println!(
Expand Down Expand Up @@ -249,3 +268,21 @@ impl CliLoop {
}
}
}

fn log_networking_handle_result(result: Result<Result<(), NetworkError>, JoinError>) {
match result {
Ok(Ok(_)) => {
info!(target: LOG_TARGET, "ℹ️ Networking exited cleanly");
println!("ℹ️ Networking exited cleanly");
},
Ok(Err(err)) => {
error!(target: LOG_TARGET, "❗️ Networking exited with an error {err}");
eprintln!("❗️ Networking exited with an error {err}");
},
Err(err) => {
// Panic
error!(target: LOG_TARGET, "❗️ Networking panicked {err}");
eprintln!("❗️ Networking panicked {err}");
},
}
}
8 changes: 7 additions & 1 deletion applications/minotari_node/src/commands/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use tari_network::{BannedPeer, NetworkError, NetworkHandle};
use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle};
use tari_rpc_framework::RpcServerHandle;
use tari_shutdown::Shutdown;
use tokio::{sync::watch, time};
use tokio::{sync::watch, task, time};
pub use watch_command::WatchCommand;

use crate::{
Expand Down Expand Up @@ -150,6 +150,7 @@ pub struct CommandContext {
blockchain_db: AsyncBlockchainDb<LMDBDatabase>,
rpc_server: RpcServerHandle,
network: NetworkHandle,
network_join_handle: Option<task::JoinHandle<Result<(), NetworkError>>>,
liveness: LivenessHandle,
node_service: LocalNodeCommsInterface,
mempool_service: LocalMempoolService,
Expand All @@ -168,6 +169,7 @@ impl CommandContext {
blockchain_db: ctx.blockchain_db().into(),
rpc_server: ctx.rpc_server(),
network: ctx.network().clone(),
network_join_handle: ctx.take_network_join_handle(),
liveness: ctx.liveness(),
node_service: ctx.local_node(),
mempool_service: ctx.local_mempool(),
Expand All @@ -179,6 +181,10 @@ impl CommandContext {
}
}

pub fn take_network_join_handle(&mut self) -> Option<task::JoinHandle<Result<(), NetworkError>>> {
self.network_join_handle.take()
}

pub async fn handle_command_str(&mut self, line: &str) -> Result<Option<WatchCommand>, Error> {
let args: Args = line.parse()?;
if let Command::Watch(command) = args.command {
Expand Down
11 changes: 4 additions & 7 deletions base_layer/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ use tari_common::{
},
SubConfigPath,
};
use tari_network::{
multiaddr::{multiaddr, Multiaddr},
ReachabilityMode,
};
use tari_network::ReachabilityMode;

/// Peer seed configuration
#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -99,8 +96,8 @@ pub struct P2pConfig {
/// manually set.
pub public_addresses: MultiaddrList,
/// The multiaddrs to listen on.
/// Default: ["/ip4/0.0.0.0/tcp/0"]
pub listen_addresses: Vec<Multiaddr>,
/// Default: ["/ip4/0.0.0.0/tcp/0", "/ip4/0.0.0.0/udp/0/quic"]
pub listen_addresses: MultiaddrList,
#[serde(
default,
deserialize_with = "deserialize_from_str",
Expand All @@ -122,7 +119,7 @@ impl Default for P2pConfig {
Self {
override_from: None,
public_addresses: MultiaddrList::default(),
listen_addresses: vec![multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16))],
listen_addresses: tari_network::Config::default_listen_addrs().into(),
reachability_mode: Default::default(),
rpc_max_simultaneous_sessions: 100,
rpc_max_sessions_per_peer: 10,
Expand Down
26 changes: 18 additions & 8 deletions base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ use tokio::{sync::mpsc, task::JoinHandle};
use crate::{
config::{P2pConfig, PeerSeedsConfig},
connector::InboundMessaging,
message::TariNodeMessageSpec,
dns::DnsClientError,
message::TariNodeMessageSpec,
peer_seeds::{DnsSeedResolver, SeedPeer},
};

Expand Down Expand Up @@ -120,6 +120,7 @@ where
MessagingMode::Enabled { tx_messages },
config,
seed_peers.into_iter().map(Into::into).collect(),
vec![],
shutdown_signal,
)?;

Expand All @@ -137,6 +138,7 @@ pub type P2pHandles<TMsg> = (
pub fn spawn_network<TMsg>(
identity: Arc<identity::Keypair>,
seed_peers: Vec<SeedPeer>,
known_relay_peers: Vec<SeedPeer>,
config: tari_network::Config,
shutdown_signal: ShutdownSignal,
) -> Result<P2pHandles<TMsg>, CommsInitializationError>
Expand All @@ -151,6 +153,7 @@ where
MessagingMode::Enabled { tx_messages },
config,
seed_peers.into_iter().map(Into::into).collect(),
known_relay_peers.into_iter().map(Into::into).collect(),
shutdown_signal,
)?;

Expand Down Expand Up @@ -317,11 +320,6 @@ impl ServiceInitializer for P2pInitializer {
Vec::new()
});

let mut listener_addrs = self.config.listen_addresses.clone();
if listener_addrs.is_empty() {
listener_addrs = tari_network::Config::default_listen_addrs();
}

let config = tari_network::Config {
swarm: SwarmConfig {
protocol_version: format!("/minotari/{}/1.0.0", self.network.as_key_str()).parse()?,
Expand All @@ -330,24 +328,36 @@ impl ServiceInitializer for P2pInitializer {
enable_relay: self.config.enable_relay,
..Default::default()
},
listener_addrs,
listener_addrs: self.config.listen_addresses.to_vec(),
reachability_mode: self.config.reachability_mode,
check_connections_interval: Duration::from_secs(2 * 60 * 60),
known_local_public_address: self.config.public_addresses.to_vec(),
};

let shutdown = context.get_shutdown_signal();
let (network, outbound_messaging, inbound_messaging, _join_handle) = spawn_network::<TariNodeMessageSpec>(
let (network, outbound_messaging, inbound_messaging, join_handle) = spawn_network::<TariNodeMessageSpec>(
self.identity.clone(),
seed_peers.into_iter().chain(dns_peers).collect(),
vec![],
config,
shutdown,
)?;

context.register_handle(TaskHandle(join_handle));
context.register_handle(network);
context.register_handle(outbound_messaging);
context.register_handle(inbound_messaging);
debug!(target: LOG_TARGET, "P2P Initialized");
Ok(())
}
}

/// Wrapper that makes use a join handle with the service framework easier
#[derive(Debug)]
pub struct TaskHandle<E>(JoinHandle<Result<(), E>>);

impl<E> TaskHandle<E> {
pub fn into_inner(self) -> JoinHandle<Result<(), E>> {
self.0
}
}
2 changes: 1 addition & 1 deletion base_layer/service_framework/src/context/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl ServiceHandles {
}

/// Take ownership of a handle
pub fn take_handle<H: 'static>(&mut self) -> Option<H> {
pub fn take_handle<H: 'static>(&self) -> Option<H> {
acquire_lock!(self.handles)
.remove(&TypeId::of::<H>())
.and_then(|handle| handle.downcast::<H>().ok().map(|h| *h))
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ where
stack
};

let mut handles = stack.build().await?;
let handles = stack.build().await?;

let inbound = handles
.take_handle::<InboundMessaging<TariNodeMessageSpec>>()
Expand Down
15 changes: 4 additions & 11 deletions integration_tests/src/base_node_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ use std::{
use minotari_app_utilities::identity_management::save_identity;
use minotari_node::{run_base_node, BaseNodeConfig, GrpcMethod, MetricsConfig};
use minotari_node_grpc_client::BaseNodeGrpcClient;
use tari_common::{
configuration::{CommonConfig, MultiaddrList},
network_check::set_network_if_choice_valid,
};
use tari_common::{configuration::CommonConfig, network_check::set_network_if_choice_valid};
use tari_crypto::ristretto::RistrettoPublicKey;
use tari_network::{identity, multiaddr::Multiaddr};
use tari_p2p::{auto_update::AutoUpdateConfig, Network, PeerSeedsConfig};
Expand Down Expand Up @@ -182,13 +179,9 @@ pub async fn spawn_base_node_with_config(
base_node_config.base_node.max_randomx_vms = 1;

base_node_config.base_node.lmdb_path = temp_dir_path.to_path_buf();
base_node_config
.base_node
.p2p
.listen_addresses
.push(format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap());
base_node_config.base_node.p2p.public_addresses =
MultiaddrList::from(base_node_config.base_node.p2p.listen_addresses.clone());
base_node_config.base_node.p2p.listen_addresses =
vec![format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap()].into();
base_node_config.base_node.p2p.public_addresses = base_node_config.base_node.p2p.listen_addresses.clone();
base_node_config.base_node.storage.orphan_storage_capacity = 10;
if base_node_config.base_node.storage.pruning_horizon != 0 {
base_node_config.base_node.storage.pruning_interval = 1;
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/wallet_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub async fn spawn_wallet(
.wallet
.base_node_service_config
.base_node_monitor_max_refresh_interval = Duration::from_secs(15);
wallet_app_config.wallet.p2p.listen_addresses = vec![listen_addr.clone()];
wallet_app_config.wallet.p2p.listen_addresses = MultiaddrList::from(vec![listen_addr.clone()]);
wallet_app_config.wallet.p2p.public_addresses = MultiaddrList::from(vec![listen_addr]);
if let Some(mech) = routing_mechanism {
wallet_app_config
Expand Down
Loading

0 comments on commit bb470d3

Please sign in to comment.