Skip to content

Commit

Permalink
Merge pull request #272 from tnull/2024-03-add-node-status
Browse files Browse the repository at this point in the history
Introduce `status` method allowing to query the `Node`'s status
  • Loading branch information
tnull authored Mar 8, 2024
2 parents 1cf74c6 + b052f15 commit cdeeb7f
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 23 deletions.
19 changes: 18 additions & 1 deletion bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ interface LDKNode {
void start();
[Throws=NodeError]
void stop();
NodeStatus status();
Config config();
Event? next_event();
Event wait_next_event();
[Async]
Expand Down Expand Up @@ -97,7 +99,6 @@ interface LDKNode {
[Throws=NodeError]
string sign_message([ByRef]sequence<u8> msg);
boolean verify_signature([ByRef]sequence<u8> msg, [ByRef]string sig, [ByRef]PublicKey pkey);
boolean is_running();
};

[Error]
Expand Down Expand Up @@ -137,6 +138,22 @@ enum NodeError {
"LiquidityFeeTooHigh",
};

dictionary NodeStatus {
boolean is_running;
boolean is_listening;
BestBlock current_best_block;
u64? latest_wallet_sync_timestamp;
u64? latest_onchain_wallet_sync_timestamp;
u64? latest_fee_rate_cache_update_timestamp;
u64? latest_rgs_snapshot_timestamp;
u64? latest_node_announcement_broadcast_timestamp;
};

dictionary BestBlock {
BlockHash block_hash;
u32 height;
};

[Error]
enum BuildError {
"InvalidSeedBytes",
Expand Down
14 changes: 14 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use std::fmt;
use std::fs;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock};
use std::time::SystemTime;

Expand Down Expand Up @@ -945,6 +946,13 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(

let (stop_sender, _) = tokio::sync::watch::channel(());

let is_listening = Arc::new(AtomicBool::new(false));
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
let latest_onchain_wallet_sync_timestamp = Arc::new(RwLock::new(None));
let latest_fee_rate_cache_update_timestamp = Arc::new(RwLock::new(None));
let latest_rgs_snapshot_timestamp = Arc::new(RwLock::new(None));
let latest_node_announcement_broadcast_timestamp = Arc::new(RwLock::new(None));

Ok(Node {
runtime,
stop_sender,
Expand All @@ -968,6 +976,12 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
scorer,
peer_store,
payment_store,
is_listening,
latest_wallet_sync_timestamp,
latest_onchain_wallet_sync_timestamp,
latest_fee_rate_cache_update_timestamp,
latest_rgs_snapshot_timestamp,
latest_node_announcement_broadcast_timestamp,
})
}

Expand Down
148 changes: 128 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub use error::Error as NodeError;
use error::Error;

pub use event::Event;
pub use types::ChannelConfig;
pub use types::{BestBlock, ChannelConfig};

pub use io::utils::generate_entropy_mnemonic;

Expand Down Expand Up @@ -167,8 +167,9 @@ use rand::Rng;

use std::default::Default;
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

#[cfg(feature = "uniffi")]
uniffi::include_scaffolding!("ldk_node");
Expand Down Expand Up @@ -199,6 +200,12 @@ pub struct Node<K: KVStore + Sync + Send + 'static> {
scorer: Arc<Mutex<Scorer>>,
peer_store: Arc<PeerStore<K, Arc<FilesystemLogger>>>,
payment_store: Arc<PaymentStore<K, Arc<FilesystemLogger>>>,
is_listening: Arc<AtomicBool>,
latest_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
latest_onchain_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
latest_fee_rate_cache_update_timestamp: Arc<RwLock<Option<u64>>>,
latest_rgs_snapshot_timestamp: Arc<RwLock<Option<u64>>>,
latest_node_announcement_broadcast_timestamp: Arc<RwLock<Option<u64>>>,
}

impl<K: KVStore + Sync + Send + 'static> Node<K> {
Expand All @@ -222,6 +229,8 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
// Block to ensure we update our fee rate cache once on startup
let fee_estimator = Arc::clone(&self.fee_estimator);
let sync_logger = Arc::clone(&self.logger);
let sync_fee_rate_update_timestamp =
Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
let runtime_ref = &runtime;
tokio::task::block_in_place(move || {
runtime_ref.block_on(async move {
Expand All @@ -233,6 +242,9 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
"Initial fee rate cache update finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt;
Ok(())
},
Err(e) => {
Expand All @@ -246,6 +258,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
// Setup wallet sync
let wallet = Arc::clone(&self.wallet);
let sync_logger = Arc::clone(&self.logger);
let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp);
let mut stop_sync = self.stop_sender.subscribe();
let onchain_wallet_sync_interval_secs = self
.config
Expand All @@ -267,11 +280,16 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
_ = onchain_wallet_sync_interval.tick() => {
let now = Instant::now();
match wallet.sync().await {
Ok(()) => log_trace!(
Ok(()) => {
log_trace!(
sync_logger,
"Background sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
),
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
sync_logger,
Expand All @@ -289,6 +307,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {

let mut stop_fee_updates = self.stop_sender.subscribe();
let fee_update_logger = Arc::clone(&self.logger);
let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
let fee_estimator = Arc::clone(&self.fee_estimator);
let fee_rate_cache_update_interval_secs =
self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
Expand All @@ -307,11 +326,16 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
_ = fee_rate_update_interval.tick() => {
let now = Instant::now();
match fee_estimator.update_fee_estimates().await {
Ok(()) => log_trace!(
Ok(()) => {
log_trace!(
fee_update_logger,
"Background update of fee rate cache finished in {}ms.",
now.elapsed().as_millis()
),
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*fee_update_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
fee_update_logger,
Expand All @@ -330,6 +354,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp);
let mut stop_sync = self.stop_sender.subscribe();
let wallet_sync_interval_secs =
self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
Expand All @@ -350,11 +375,16 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
];
let now = Instant::now();
match tx_sync.sync(confirmables).await {
Ok(()) => log_trace!(
Ok(()) => {
log_trace!(
sync_logger,
"Background sync of Lightning wallet finished in {}ms.",
now.elapsed().as_millis()
),
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(e) => {
log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e)
}
Expand All @@ -368,6 +398,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let gossip_source = Arc::clone(&self.gossip_source);
let gossip_sync_store = Arc::clone(&self.kv_store);
let gossip_sync_logger = Arc::clone(&self.logger);
let gossip_rgs_sync_timestamp = Arc::clone(&self.latest_rgs_snapshot_timestamp);
let mut stop_gossip_sync = self.stop_sender.subscribe();
runtime.spawn(async move {
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
Expand Down Expand Up @@ -395,6 +426,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
log_error!(gossip_sync_logger, "Persistence failed: {}", e);
panic!("Persistence failed");
});
*gossip_rgs_sync_timestamp.write().unwrap() = Some(updated_timestamp as u64);
}
Err(e) => log_error!(
gossip_sync_logger,
Expand All @@ -413,6 +445,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
let mut stop_listen = self.stop_sender.subscribe();
let listening_logger = Arc::clone(&self.logger);
let listening_indicator = Arc::clone(&self.is_listening);

let mut bind_addrs = Vec::with_capacity(listening_addresses.len());

Expand All @@ -431,6 +464,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
}

runtime.spawn(async move {
{
let listener =
tokio::net::TcpListener::bind(&*bind_addrs).await
.unwrap_or_else(|e| {
Expand All @@ -440,11 +474,13 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
);
});

listening_indicator.store(true, Ordering::Release);

loop {
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
tokio::select! {
_ = stop_listen.changed() => {
return;
break;
}
res = listener.accept() => {
let tcp_stream = res.unwrap().0;
Expand All @@ -458,6 +494,9 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
}
}
}
}

listening_indicator.store(false, Ordering::Release);
});
}

Expand Down Expand Up @@ -508,6 +547,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let bcast_config = Arc::clone(&self.config);
let bcast_store = Arc::clone(&self.kv_store);
let bcast_logger = Arc::clone(&self.logger);
let bcast_ann_timestamp = Arc::clone(&self.latest_node_announcement_broadcast_timestamp);
let mut stop_bcast = self.stop_sender.subscribe();
runtime.spawn(async move {
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
Expand Down Expand Up @@ -553,12 +593,17 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {

bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);

let unix_time_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger))
.unwrap_or_else(|e| {
log_error!(bcast_logger, "Persistence failed: {}", e);
panic!("Persistence failed");
});
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*bcast_ann_timestamp.write().unwrap() = unix_time_secs_opt;

if let Some(unix_time_secs) = unix_time_secs_opt {
io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger))
.unwrap_or_else(|e| {
log_error!(bcast_logger, "Persistence failed: {}", e);
panic!("Persistence failed");
});
}
}
}
}
Expand Down Expand Up @@ -662,11 +707,6 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
Ok(())
}

/// Returns whether the [`Node`] is running.
pub fn is_running(&self) -> bool {
self.runtime.read().unwrap().is_some()
}

/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
///
/// After this returns most API methods will return [`Error::NotRunning`].
Expand Down Expand Up @@ -697,6 +737,37 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
Ok(())
}

/// Returns the status of the [`Node`].
pub fn status(&self) -> NodeStatus {
let is_running = self.runtime.read().unwrap().is_some();
let is_listening = self.is_listening.load(Ordering::Acquire);
let current_best_block = self.channel_manager.current_best_block().into();
let latest_wallet_sync_timestamp = *self.latest_wallet_sync_timestamp.read().unwrap();
let latest_onchain_wallet_sync_timestamp =
*self.latest_onchain_wallet_sync_timestamp.read().unwrap();
let latest_fee_rate_cache_update_timestamp =
*self.latest_fee_rate_cache_update_timestamp.read().unwrap();
let latest_rgs_snapshot_timestamp = *self.latest_rgs_snapshot_timestamp.read().unwrap();
let latest_node_announcement_broadcast_timestamp =
*self.latest_node_announcement_broadcast_timestamp.read().unwrap();

NodeStatus {
is_running,
is_listening,
current_best_block,
latest_wallet_sync_timestamp,
latest_onchain_wallet_sync_timestamp,
latest_fee_rate_cache_update_timestamp,
latest_rgs_snapshot_timestamp,
latest_node_announcement_broadcast_timestamp,
}
}

/// Returns the config with which the [`Node`] was initialized.
pub fn config(&self) -> Config {
self.config.as_ref().clone()
}

/// Returns the next event in the event queue, if currently available.
///
/// Will return `Some(..)` if an event is available and `None` otherwise.
Expand Down Expand Up @@ -1746,6 +1817,43 @@ impl<K: KVStore + Sync + Send + 'static> Drop for Node<K> {
}
}

/// Represents the status of the [`Node`].
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct NodeStatus {
/// Indicates whether the [`Node`] is running.
pub is_running: bool,
/// Indicates whether the [`Node`] is listening for incoming connections on the addresses
/// configured via [`Config::listening_addresses`].
pub is_listening: bool,
/// The best block to which our Lightning wallet is currently synced.
pub current_best_block: BestBlock,
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced
/// our Lightning wallet to the chain tip.
///
/// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized.
pub latest_wallet_sync_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced
/// our on-chain wallet to the chain tip.
///
/// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized.
pub latest_onchain_wallet_sync_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully update
/// our fee rate cache.
///
/// Will be `None` if the cache hasn't been updated since the [`Node`] was initialized.
pub latest_fee_rate_cache_update_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when the last rapid gossip sync
/// (RGS) snapshot we successfully applied was generated.
///
/// Will be `None` if RGS isn't configured or the snapshot hasn't been updated since the [`Node`] was initialized.
pub latest_rgs_snapshot_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node
/// announcement.
///
/// Will be `None` if we have no public channels or we haven't broadcasted since the [`Node`] was initialized.
pub latest_node_announcement_broadcast_timestamp: Option<u64>,
}

async fn connect_peer_if_necessary<K: KVStore + Sync + Send + 'static>(
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager<K>>,
logger: Arc<FilesystemLogger>,
Expand Down
Loading

0 comments on commit cdeeb7f

Please sign in to comment.