From 619c9c9209b8c6796f4cd7f2289c11fdf8264fb1 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 30 Jan 2025 14:10:03 +0100 Subject: [PATCH] add functionality to periodically update routing scores from an external http source --- Cargo.toml | 1 + bindings/ldk_node.udl | 2 ++ src/builder.rs | 52 +++++++++++++++++++++++++----- src/config.rs | 6 ++++ src/io/utils.rs | 11 ++++++- src/lib.rs | 53 ++++++++++++++++++++++++++++++- src/scoring.rs | 74 +++++++++++++++++++++++++++++++++++++++++++ src/types.rs | 5 +-- 8 files changed, 192 insertions(+), 12 deletions(-) create mode 100644 src/scoring.rs diff --git a/Cargo.toml b/Cargo.toml index 39b684d6d..8545112de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ log = { version = "0.4.22", default-features = false, features = ["std"]} vss-client = "0.3" prost = { version = "0.11.6", default-features = false} +bytes = "1.9.0" [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index b0ff44b0a..bffda9b55 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -58,6 +58,7 @@ interface Builder { void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_gossip_source_p2p(); void set_gossip_source_rgs(string rgs_server_url); + void set_scoring_source(string url); void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token); void set_storage_dir_path(string storage_dir_path); void set_filesystem_logger(string? log_file_path, LogLevel? log_level); @@ -273,6 +274,7 @@ dictionary NodeStatus { u64? latest_onchain_wallet_sync_timestamp; u64? latest_fee_rate_cache_update_timestamp; u64? latest_rgs_snapshot_timestamp; + u64? latest_scores_sync_timestamp; u64? latest_node_announcement_broadcast_timestamp; u32? latest_channel_monitor_archival_height; }; diff --git a/src/builder.rs b/src/builder.rs index bcd91eeb8..fd7536dec 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -23,6 +23,7 @@ use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger} use crate::message_handler::NodeCustomMessageHandler; use crate::payment::store::PaymentStore; use crate::peer_store::PeerStore; +use crate::scoring::ScoringSource; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter, @@ -41,7 +42,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; use lightning::routing::gossip::NodeAlias; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ - ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters, + CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters, + ProbabilisticScoringFeeParameters, }; use lightning::sign::EntropySource; @@ -97,6 +99,11 @@ enum GossipSourceConfig { RapidGossipSync(String), } +#[derive(Debug, Clone)] +struct ScoringSourceConfig { + url: String, +} + #[derive(Debug, Clone)] struct LiquiditySourceConfig { // LSPS2 service's (address, node_id, token) @@ -229,6 +236,7 @@ pub struct NodeBuilder { gossip_source_config: Option, liquidity_source_config: Option, log_writer_config: Option, + scoring_source_config: Option, } impl NodeBuilder { @@ -245,6 +253,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let scoring_source_config = None; Self { config, entropy_source_config, @@ -252,6 +261,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + scoring_source_config, } } @@ -322,6 +332,13 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source its external scores from the given URL. These scores are used to + /// augment the internal pathfinding scoring system to improve routing. + pub fn set_scoring_source(&mut self, url: String) -> &mut Self { + self.scoring_source_config = Some(ScoringSourceConfig { url }); + self + } + /// Configures the [`Node`] instance to source its inbound liquidity from the given /// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md) /// service. @@ -540,6 +557,7 @@ impl NodeBuilder { config, self.chain_data_source_config.as_ref(), self.gossip_source_config.as_ref(), + self.scoring_source_config.as_ref(), self.liquidity_source_config.as_ref(), seed_bytes, logger, @@ -562,6 +580,7 @@ impl NodeBuilder { config, self.chain_data_source_config.as_ref(), self.gossip_source_config.as_ref(), + self.scoring_source_config.as_ref(), self.liquidity_source_config.as_ref(), seed_bytes, logger, @@ -654,6 +673,12 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url); } + /// Configures the [`Node`] instance to source its external scores from the given URL. These scores are used to + /// augment the internal pathfinding scoring system to improve routing. + pub fn set_scoring_source(&self, url: String) { + self.inner.write().unwrap().scoring_source_config = Some(ScoringSourceConfig { url }); + } + /// Configures the [`Node`] instance to source its inbound liquidity from the given /// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md) /// service. @@ -806,6 +831,7 @@ impl ArcedNodeBuilder { fn build_with_store_internal( config: Arc, chain_data_source_config: Option<&ChainDataSourceConfig>, gossip_source_config: Option<&GossipSourceConfig>, + scoring_source_config: Option<&ScoringSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64], logger: Arc, kv_store: Arc, ) -> Result { @@ -950,26 +976,24 @@ fn build_with_store_internal( }, }; - let scorer = match io::utils::read_scorer( + let local_scorer = match io::utils::read_scorer( Arc::clone(&kv_store), Arc::clone(&network_graph), Arc::clone(&logger), ) { - Ok(scorer) => Arc::new(Mutex::new(scorer)), + Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { let params = ProbabilisticScoringDecayParameters::default(); - Arc::new(Mutex::new(ProbabilisticScorer::new( - params, - Arc::clone(&network_graph), - Arc::clone(&logger), - ))) + ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger)) } else { return Err(BuildError::ReadFailed); } }, }; + let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); + let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), @@ -1129,6 +1153,17 @@ fn build_with_store_internal( }, }; + let scoring_source = if let Some(config) = scoring_source_config { + Some(Arc::new(ScoringSource::new( + config.url.clone(), + Arc::clone(&scorer), + Arc::clone(&node_metrics), + Arc::clone(&logger), + ))) + } else { + None + }; + let liquidity_source = liquidity_source_config.as_ref().and_then(|lsc| { lsc.lsps2_service.as_ref().map(|(address, node_id, token)| { let lsps2_client_config = Some(LSPS2ClientConfig {}); @@ -1303,6 +1338,7 @@ fn build_with_store_internal( keys_manager, network_graph, gossip_source, + scoring_source, liquidity_source, kv_store, logger, diff --git a/src/config.rs b/src/config.rs index f38d7f63d..695ea1cda 100644 --- a/src/config.rs +++ b/src/config.rs @@ -57,6 +57,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10); // The time in-between RGS sync attempts. pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); +// The time in-between external scores sync attempts. +pub(crate) const EXTERNAL_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); + // The time in-between node announcement broadcast attempts. pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60); @@ -78,6 +81,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5; // The timeout after which we abort a RGS sync operation. pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5; +// The timeout after which we abort a external scores sync operation. +pub(crate) const EXTERNAL_SCORES_SYNC_TIMEOUT_SECS: u64 = 5; + // The length in bytes of our wallets' keys seed. pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64; diff --git a/src/io/utils.rs b/src/io/utils.rs index b5537ed7d..50feefd4d 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -20,10 +20,13 @@ use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; use crate::{Error, EventQueue, NodeMetrics, PaymentDetails}; +use bytes::Bytes; use lightning::io::Cursor; use lightning::ln::msgs::DecodeError; use lightning::routing::gossip::NetworkGraph; -use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters}; +use lightning::routing::scoring::{ + ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters, +}; use lightning::util::persist::{ KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, @@ -166,6 +169,12 @@ where }) } +/// Read external scores from a Bytes object. +pub(crate) fn read_external_scores(bytes: Bytes) -> Result { + let mut reader = Cursor::new(bytes); + ChannelLiquidities::read(&mut reader) +} + /// Read previously persisted events from the store. pub(crate) fn read_event_queue( kv_store: Arc, logger: L, diff --git a/src/lib.rs b/src/lib.rs index 9c5b9dc8d..e6b0f5923 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,6 +89,7 @@ pub mod logger; mod message_handler; pub mod payment; mod peer_store; +mod scoring; mod sweep; mod tx_broadcaster; mod types; @@ -122,7 +123,8 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ - default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, + default_user_config, may_announce_channel, ChannelConfig, Config, + EXTERNAL_SCORES_SYNC_INTERVAL, EXTERNAL_SCORES_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; @@ -137,6 +139,7 @@ use payment::{ UnifiedQrPayment, }; use peer_store::{PeerInfo, PeerStore}; +use scoring::ScoringSource; use types::{ Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet, @@ -190,6 +193,7 @@ pub struct Node { keys_manager: Arc, network_graph: Arc, gossip_source: Arc, + scoring_source: Option>, liquidity_source: Option>>>, kv_store: Arc, logger: Arc, @@ -304,6 +308,10 @@ impl Node { }); } + if self.scoring_source.is_some() { + self.setup_external_scores_syncing(&runtime); + } + if let Some(listening_addresses) = &self.config.listening_addresses { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); @@ -634,6 +642,42 @@ impl Node { Ok(()) } + /// Spawn a background task to sync external scores. + fn setup_external_scores_syncing(&self, runtime: &tokio::runtime::Runtime) { + let scoring_source = Arc::clone(&self.scoring_source.as_ref().unwrap()); + log_info!( + self.logger, + "External scores background syncing from {} enabled", + scoring_source.get_url() + ); + + let external_scores_sync_logger = Arc::clone(&self.logger); + let mut stop_sync = self.stop_sender.subscribe(); + + runtime.spawn(async move { + let mut interval = tokio::time::interval(EXTERNAL_SCORES_SYNC_INTERVAL); + loop { + tokio::select! { + _ = stop_sync.changed() => { + log_trace!( + external_scores_sync_logger, + "Stopping background syncing external scores.", + ); + return; + } + _ = interval.tick() => { + log_trace!( + external_scores_sync_logger, + "Background sync of external scores started.", + ); + + scoring_source.sync_external_scores().await; + } + } + } + }); + } + /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`]. /// /// After this returns most API methods will return [`Error::NotRunning`]. @@ -725,6 +769,7 @@ impl Node { locked_node_metrics.latest_fee_rate_cache_update_timestamp; let latest_rgs_snapshot_timestamp = locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64); + let latest_scores_sync_timestamp = locked_node_metrics.latest_scores_sync_timestamp; let latest_node_announcement_broadcast_timestamp = locked_node_metrics.latest_node_announcement_broadcast_timestamp; let latest_channel_monitor_archival_height = @@ -738,6 +783,7 @@ impl Node { latest_onchain_wallet_sync_timestamp, latest_fee_rate_cache_update_timestamp, latest_rgs_snapshot_timestamp, + latest_scores_sync_timestamp, latest_node_announcement_broadcast_timestamp, latest_channel_monitor_archival_height, } @@ -1547,6 +1593,8 @@ pub struct NodeStatus { /// /// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet. pub latest_rgs_snapshot_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores. + pub latest_scores_sync_timestamp: Option, /// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node /// announcement. /// @@ -1565,6 +1613,7 @@ pub(crate) struct NodeMetrics { latest_onchain_wallet_sync_timestamp: Option, latest_fee_rate_cache_update_timestamp: Option, latest_rgs_snapshot_timestamp: Option, + latest_scores_sync_timestamp: Option, latest_node_announcement_broadcast_timestamp: Option, latest_channel_monitor_archival_height: Option, } @@ -1576,6 +1625,7 @@ impl Default for NodeMetrics { latest_onchain_wallet_sync_timestamp: None, latest_fee_rate_cache_update_timestamp: None, latest_rgs_snapshot_timestamp: None, + latest_scores_sync_timestamp: None, latest_node_announcement_broadcast_timestamp: None, latest_channel_monitor_archival_height: None, } @@ -1589,6 +1639,7 @@ impl_writeable_tlv_based!(NodeMetrics, { (6, latest_rgs_snapshot_timestamp, option), (8, latest_node_announcement_broadcast_timestamp, option), (10, latest_channel_monitor_archival_height, option), + (12, latest_scores_sync_timestamp, option), }); pub(crate) fn total_anchor_channels_reserve_sats( diff --git a/src/scoring.rs b/src/scoring.rs new file mode 100644 index 000000000..5c153796c --- /dev/null +++ b/src/scoring.rs @@ -0,0 +1,74 @@ +use std::{ + sync::{Arc, Mutex, RwLock}, + time::{Duration, SystemTime}, +}; + +use crate::{io::utils::read_external_scores, Graph, Logger, EXTERNAL_SCORES_SYNC_TIMEOUT_SECS}; +use crate::{logger::LdkLogger, NodeMetrics}; +use lightning::{log_error, log_trace, routing::scoring::CombinedScorer}; + +pub struct ScoringSource { + logger: Arc, + scorer: Arc, Arc>>>, + metrics: Arc>, + url: String, +} + +impl ScoringSource { + pub fn new( + url: String, scorer: Arc, Arc>>>, + metrics: Arc>, logger: Arc, + ) -> Self { + Self { logger, scorer, metrics, url } + } + + pub fn get_url(&self) -> String { + return self.url.clone(); + } + + pub async fn sync_external_scores(&self) -> () { + let response = tokio::time::timeout( + Duration::from_secs(EXTERNAL_SCORES_SYNC_TIMEOUT_SECS), + reqwest::get(&self.url), + ) + .await; + + match response { + Ok(Ok(response)) => { + let body = response.bytes().await; + match body { + Err(e) => { + log_error!(self.logger, "Failed to read external scores update: {}", e); + return; + }, + Ok(body) => match read_external_scores(body) { + Ok(liquidities) => { + let duration_since_epoch = + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + self.scorer.lock().unwrap().merge(liquidities, duration_since_epoch); + self.metrics.write().unwrap().latest_scores_sync_timestamp = + Some(duration_since_epoch.as_secs()); + log_trace!(self.logger, "External scores merged successfully"); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to parse external scores update: {}", + e + ); + return; + }, + }, + } + }, + Err(e) => { + log_error!(self.logger, "Retrieving external scores timed out: {}", e); + return; + }, + Ok(Err(e)) => { + log_error!(self.logger, "Failed to retrieve external scores update: {}", e); + return; + }, + } + } +} diff --git a/src/types.rs b/src/types.rs index 1c9ab64b9..738203127 100644 --- a/src/types.rs +++ b/src/types.rs @@ -21,7 +21,8 @@ use lightning::ln::peer_handler::IgnoringMessageHandler; use lightning::ln::types::ChannelId; use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; -use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; +use lightning::routing::scoring::CombinedScorer; +use lightning::routing::scoring::ProbabilisticScoringFeeParameters; use lightning::sign::InMemorySigner; use lightning::util::persist::KVStore; use lightning::util::ser::{Readable, Writeable, Writer}; @@ -88,7 +89,7 @@ pub(crate) type Router = DefaultRouter< ProbabilisticScoringFeeParameters, Scorer, >; -pub(crate) type Scorer = ProbabilisticScorer, Arc>; +pub(crate) type Scorer = CombinedScorer, Arc>; pub(crate) type Graph = gossip::NetworkGraph>;