From 84ecc24bc871b7411d4c279ddef21f2062c7a48b Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 6 Mar 2024 14:04:46 +0100 Subject: [PATCH 1/3] Introduce `status` method allowing to query the `Node`'s status .. we replace the simple `is_running` with a more verbose `status` method returning a `NodeStatus` struct, giving more information on syncing states etc. --- bindings/ldk_node.udl | 18 +++++- src/builder.rs | 14 +++++ src/lib.rs | 143 ++++++++++++++++++++++++++++++++++++------ src/types.rs | 20 +++++- tests/common/mod.rs | 2 + 5 files changed, 175 insertions(+), 22 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 4f09b7ced..5e9c98053 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -42,6 +42,7 @@ interface LDKNode { void start(); [Throws=NodeError] void stop(); + NodeStatus status(); Event? next_event(); Event wait_next_event(); [Async] @@ -97,7 +98,6 @@ interface LDKNode { [Throws=NodeError] string sign_message([ByRef]sequence msg); boolean verify_signature([ByRef]sequence msg, [ByRef]string sig, [ByRef]PublicKey pkey); - boolean is_running(); }; [Error] @@ -137,6 +137,22 @@ enum NodeError { "LiquidityFeeTooHigh", }; +dictionary NodeStatus { + boolean is_running; + boolean is_listening; + BestBlock current_best_block; + u64? latest_wallet_sync_timestamp; + u64? latest_onchain_wallet_sync_timestamp; + u64? latest_fee_rate_cache_update_timestamp; + u64? latest_rgs_snapshot_timestamp; + u64? latest_node_announcement_broadcast_timestamp; +}; + +dictionary BestBlock { + BlockHash block_hash; + u32 height; +}; + [Error] enum BuildError { "InvalidSeedBytes", diff --git a/src/builder.rs b/src/builder.rs index a09b2563f..161c3bbbb 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -65,6 +65,7 @@ use std::fmt; use std::fs; use std::io::Cursor; use std::path::PathBuf; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex, RwLock}; use std::time::SystemTime; @@ -945,6 +946,13 @@ fn build_with_store_internal( let (stop_sender, _) = tokio::sync::watch::channel(()); + let is_listening = Arc::new(AtomicBool::new(false)); + let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None)); + let latest_onchain_wallet_sync_timestamp = Arc::new(RwLock::new(None)); + let latest_fee_rate_cache_update_timestamp = Arc::new(RwLock::new(None)); + let latest_rgs_snapshot_timestamp = Arc::new(RwLock::new(None)); + let latest_node_announcement_broadcast_timestamp = Arc::new(RwLock::new(None)); + Ok(Node { runtime, stop_sender, @@ -968,6 +976,12 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + is_listening, + latest_wallet_sync_timestamp, + latest_onchain_wallet_sync_timestamp, + latest_fee_rate_cache_update_timestamp, + latest_rgs_snapshot_timestamp, + latest_node_announcement_broadcast_timestamp, }) } diff --git a/src/lib.rs b/src/lib.rs index 24b2123f5..2b0e796ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,7 +107,7 @@ pub use error::Error as NodeError; use error::Error; pub use event::Event; -pub use types::ChannelConfig; +pub use types::{BestBlock, ChannelConfig}; pub use io::utils::generate_entropy_mnemonic; @@ -167,8 +167,9 @@ use rand::Rng; use std::default::Default; use std::net::ToSocketAddrs; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; -use std::time::{Duration, Instant, SystemTime}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[cfg(feature = "uniffi")] uniffi::include_scaffolding!("ldk_node"); @@ -199,6 +200,12 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc>>, + is_listening: Arc, + latest_wallet_sync_timestamp: Arc>>, + latest_onchain_wallet_sync_timestamp: Arc>>, + latest_fee_rate_cache_update_timestamp: Arc>>, + latest_rgs_snapshot_timestamp: Arc>>, + latest_node_announcement_broadcast_timestamp: Arc>>, } impl Node { @@ -222,6 +229,8 @@ impl Node { // Block to ensure we update our fee rate cache once on startup let fee_estimator = Arc::clone(&self.fee_estimator); let sync_logger = Arc::clone(&self.logger); + let sync_fee_rate_update_timestamp = + Arc::clone(&self.latest_fee_rate_cache_update_timestamp); let runtime_ref = &runtime; tokio::task::block_in_place(move || { runtime_ref.block_on(async move { @@ -233,6 +242,9 @@ impl Node { "Initial fee rate cache update finished in {}ms.", now.elapsed().as_millis() ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt; Ok(()) }, Err(e) => { @@ -246,6 +258,7 @@ impl Node { // Setup wallet sync let wallet = Arc::clone(&self.wallet); let sync_logger = Arc::clone(&self.logger); + let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp); let mut stop_sync = self.stop_sender.subscribe(); let onchain_wallet_sync_interval_secs = self .config @@ -267,11 +280,16 @@ impl Node { _ = onchain_wallet_sync_interval.tick() => { let now = Instant::now(); match wallet.sync().await { - Ok(()) => log_trace!( + Ok(()) => { + log_trace!( sync_logger, "Background sync of on-chain wallet finished in {}ms.", now.elapsed().as_millis() - ), + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + } Err(err) => { log_error!( sync_logger, @@ -289,6 +307,7 @@ impl Node { let mut stop_fee_updates = self.stop_sender.subscribe(); let fee_update_logger = Arc::clone(&self.logger); + let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp); let fee_estimator = Arc::clone(&self.fee_estimator); let fee_rate_cache_update_interval_secs = self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); @@ -307,11 +326,16 @@ impl Node { _ = fee_rate_update_interval.tick() => { let now = Instant::now(); match fee_estimator.update_fee_estimates().await { - Ok(()) => log_trace!( + Ok(()) => { + log_trace!( fee_update_logger, "Background update of fee rate cache finished in {}ms.", now.elapsed().as_millis() - ), + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *fee_update_timestamp.write().unwrap() = unix_time_secs_opt; + } Err(err) => { log_error!( fee_update_logger, @@ -330,6 +354,7 @@ impl Node { let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); let sync_logger = Arc::clone(&self.logger); + let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp); let mut stop_sync = self.stop_sender.subscribe(); let wallet_sync_interval_secs = self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); @@ -350,11 +375,16 @@ impl Node { ]; let now = Instant::now(); match tx_sync.sync(confirmables).await { - Ok(()) => log_trace!( + Ok(()) => { + log_trace!( sync_logger, "Background sync of Lightning wallet finished in {}ms.", now.elapsed().as_millis() - ), + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + } Err(e) => { log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) } @@ -368,6 +398,7 @@ impl Node { let gossip_source = Arc::clone(&self.gossip_source); let gossip_sync_store = Arc::clone(&self.kv_store); let gossip_sync_logger = Arc::clone(&self.logger); + let gossip_rgs_sync_timestamp = Arc::clone(&self.latest_rgs_snapshot_timestamp); let mut stop_gossip_sync = self.stop_sender.subscribe(); runtime.spawn(async move { let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); @@ -395,6 +426,7 @@ impl Node { log_error!(gossip_sync_logger, "Persistence failed: {}", e); panic!("Persistence failed"); }); + *gossip_rgs_sync_timestamp.write().unwrap() = Some(updated_timestamp as u64); } Err(e) => log_error!( gossip_sync_logger, @@ -413,6 +445,7 @@ impl Node { let peer_manager_connection_handler = Arc::clone(&self.peer_manager); let mut stop_listen = self.stop_sender.subscribe(); let listening_logger = Arc::clone(&self.logger); + let listening_indicator = Arc::clone(&self.is_listening); let mut bind_addrs = Vec::with_capacity(listening_addresses.len()); @@ -431,6 +464,7 @@ impl Node { } runtime.spawn(async move { + { let listener = tokio::net::TcpListener::bind(&*bind_addrs).await .unwrap_or_else(|e| { @@ -440,11 +474,13 @@ impl Node { ); }); + listening_indicator.store(true, Ordering::Release); + loop { let peer_mgr = Arc::clone(&peer_manager_connection_handler); tokio::select! { _ = stop_listen.changed() => { - return; + break; } res = listener.accept() => { let tcp_stream = res.unwrap().0; @@ -458,6 +494,9 @@ impl Node { } } } + } + + listening_indicator.store(false, Ordering::Release); }); } @@ -508,6 +547,7 @@ impl Node { let bcast_config = Arc::clone(&self.config); let bcast_store = Arc::clone(&self.kv_store); let bcast_logger = Arc::clone(&self.logger); + let bcast_ann_timestamp = Arc::clone(&self.latest_node_announcement_broadcast_timestamp); let mut stop_bcast = self.stop_sender.subscribe(); runtime.spawn(async move { // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. @@ -553,12 +593,17 @@ impl Node { bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses); - let unix_time_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); - io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) - .unwrap_or_else(|e| { - log_error!(bcast_logger, "Persistence failed: {}", e); - panic!("Persistence failed"); - }); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *bcast_ann_timestamp.write().unwrap() = unix_time_secs_opt; + + if let Some(unix_time_secs) = unix_time_secs_opt { + io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) + .unwrap_or_else(|e| { + log_error!(bcast_logger, "Persistence failed: {}", e); + panic!("Persistence failed"); + }); + } } } } @@ -662,11 +707,6 @@ impl Node { Ok(()) } - /// Returns whether the [`Node`] is running. - pub fn is_running(&self) -> bool { - self.runtime.read().unwrap().is_some() - } - /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`]. /// /// After this returns most API methods will return [`Error::NotRunning`]. @@ -697,6 +737,32 @@ impl Node { Ok(()) } + /// Returns the status of the [`Node`]. + pub fn status(&self) -> NodeStatus { + let is_running = self.runtime.read().unwrap().is_some(); + let is_listening = self.is_listening.load(Ordering::Acquire); + let current_best_block = self.channel_manager.current_best_block().into(); + let latest_wallet_sync_timestamp = *self.latest_wallet_sync_timestamp.read().unwrap(); + let latest_onchain_wallet_sync_timestamp = + *self.latest_onchain_wallet_sync_timestamp.read().unwrap(); + let latest_fee_rate_cache_update_timestamp = + *self.latest_fee_rate_cache_update_timestamp.read().unwrap(); + let latest_rgs_snapshot_timestamp = *self.latest_rgs_snapshot_timestamp.read().unwrap(); + let latest_node_announcement_broadcast_timestamp = + *self.latest_node_announcement_broadcast_timestamp.read().unwrap(); + + NodeStatus { + is_running, + is_listening, + current_best_block, + latest_wallet_sync_timestamp, + latest_onchain_wallet_sync_timestamp, + latest_fee_rate_cache_update_timestamp, + latest_rgs_snapshot_timestamp, + latest_node_announcement_broadcast_timestamp, + } + } + /// Returns the next event in the event queue, if currently available. /// /// Will return `Some(..)` if an event is available and `None` otherwise. @@ -1746,6 +1812,43 @@ impl Drop for Node { } } +/// Represents the status of the [`Node`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct NodeStatus { + /// Indicates whether the [`Node`] is running. + pub is_running: bool, + /// Indicates whether the [`Node`] is listening for incoming connections on the addresses + /// configured via [`Config::listening_addresses`]. + pub is_listening: bool, + /// The best block to which our Lightning wallet is currently synced. + pub current_best_block: BestBlock, + /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced + /// our Lightning wallet to the chain tip. + /// + /// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized. + pub latest_wallet_sync_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced + /// our on-chain wallet to the chain tip. + /// + /// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized. + pub latest_onchain_wallet_sync_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully update + /// our fee rate cache. + /// + /// Will be `None` if the cache hasn't been updated since the [`Node`] was initialized. + pub latest_fee_rate_cache_update_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when the last rapid gossip sync + /// (RGS) snapshot we successfully applied was generated. + /// + /// Will be `None` if RGS isn't configured or the snapshot hasn't been updated since the [`Node`] was initialized. + pub latest_rgs_snapshot_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node + /// announcement. + /// + /// Will be `None` if we have no public channels or we haven't broadcasted since the [`Node`] was initialized. + pub latest_node_announcement_broadcast_timestamp: Option, +} + async fn connect_peer_if_necessary( node_id: PublicKey, addr: SocketAddress, peer_manager: Arc>, logger: Arc, diff --git a/src/types.rs b/src/types.rs index 6269b3ddf..4e082498e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -4,6 +4,7 @@ use crate::sweep::OutputSweeper; use lightning::blinded_path::BlindedPath; use lightning::chain::chainmonitor; +use lightning::chain::BestBlock as LdkBestBlock; use lightning::ln::channelmanager::ChannelDetails as LdkChannelDetails; use lightning::ln::msgs::RoutingMessageHandler; use lightning::ln::msgs::SocketAddress; @@ -20,7 +21,7 @@ use lightning_net_tokio::SocketDescriptor; use lightning_transaction_sync::EsploraSyncClient; use bitcoin::secp256k1::{self, PublicKey, Secp256k1}; -use bitcoin::OutPoint; +use bitcoin::{BlockHash, OutPoint}; use std::sync::{Arc, Mutex, RwLock}; @@ -456,3 +457,20 @@ impl Default for ChannelConfig { LdkChannelConfig::default().into() } } + +/// The best known block as identified by its hash and height. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BestBlock { + /// The block's hash + pub block_hash: BlockHash, + /// The height at which the block was confirmed. + pub height: u32, +} + +impl From for BestBlock { + fn from(value: LdkBestBlock) -> Self { + let block_hash = value.block_hash(); + let height = value.height(); + Self { block_hash, height } + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 474ac14a8..3be36869d 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -220,6 +220,8 @@ pub(crate) fn setup_node(electrsd: &ElectrsD, config: Config) -> TestNode Date: Wed, 6 Mar 2024 14:09:08 +0100 Subject: [PATCH 2/3] Allow to retrieve the `Node`'s `Config` --- bindings/ldk_node.udl | 1 + src/lib.rs | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 5e9c98053..992899000 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -43,6 +43,7 @@ interface LDKNode { [Throws=NodeError] void stop(); NodeStatus status(); + Config config(); Event? next_event(); Event wait_next_event(); [Async] diff --git a/src/lib.rs b/src/lib.rs index 2b0e796ca..b9508ad60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -763,6 +763,11 @@ impl Node { } } + /// Returns the config with which the [`Node`] was initialized. + pub fn config(&self) -> Config { + self.config.as_ref().clone() + } + /// Returns the next event in the event queue, if currently available. /// /// Will return `Some(..)` if an event is available and `None` otherwise. From b052f15d8239fa0d60ff8b21a058db1df239fefd Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 6 Mar 2024 14:22:29 +0100 Subject: [PATCH 3/3] Use `is_listening` flag in `connection_restart_behavior` test --- tests/integration_tests_rust.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index d9562c565..0f1689ecd 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -289,7 +289,11 @@ fn do_connection_restart_behavior(persist: bool) { let node_id_b = node_b.node_id(); let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone(); - std::thread::sleep(std::time::Duration::from_secs(1)); + + while !node_b.status().is_listening { + std::thread::sleep(std::time::Duration::from_millis(10)); + } + node_a.connect(node_id_b, node_addr_b, persist).unwrap(); let peer_details_a = node_a.list_peers().first().unwrap().clone();