Skip to content

Commit

Permalink
add functionality to periodically update routing scores from an exter…
Browse files Browse the repository at this point in the history
…nal http source
  • Loading branch information
joostjager committed Feb 3, 2025
1 parent c64d8c5 commit 619c9c9
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ log = { version = "0.4.22", default-features = false, features = ["std"]}

vss-client = "0.3"
prost = { version = "0.11.6", default-features = false}
bytes = "1.9.0"

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["winbase"] }
Expand Down
2 changes: 2 additions & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ interface Builder {
void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password);
void set_gossip_source_p2p();
void set_gossip_source_rgs(string rgs_server_url);
void set_scoring_source(string url);
void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token);
void set_storage_dir_path(string storage_dir_path);
void set_filesystem_logger(string? log_file_path, LogLevel? log_level);
Expand Down Expand Up @@ -273,6 +274,7 @@ dictionary NodeStatus {
u64? latest_onchain_wallet_sync_timestamp;
u64? latest_fee_rate_cache_update_timestamp;
u64? latest_rgs_snapshot_timestamp;
u64? latest_scores_sync_timestamp;
u64? latest_node_announcement_broadcast_timestamp;
u32? latest_channel_monitor_archival_height;
};
Expand Down
52 changes: 44 additions & 8 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger}
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::store::PaymentStore;
use crate::peer_store::PeerStore;
use crate::scoring::ScoringSource;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
Expand All @@ -41,7 +42,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
use lightning::routing::gossip::NodeAlias;
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::{
ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters,
CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
ProbabilisticScoringFeeParameters,
};
use lightning::sign::EntropySource;

Expand Down Expand Up @@ -97,6 +99,11 @@ enum GossipSourceConfig {
RapidGossipSync(String),
}

#[derive(Debug, Clone)]
struct ScoringSourceConfig {
url: String,
}

#[derive(Debug, Clone)]
struct LiquiditySourceConfig {
// LSPS2 service's (address, node_id, token)
Expand Down Expand Up @@ -229,6 +236,7 @@ pub struct NodeBuilder {
gossip_source_config: Option<GossipSourceConfig>,
liquidity_source_config: Option<LiquiditySourceConfig>,
log_writer_config: Option<LogWriterConfig>,
scoring_source_config: Option<ScoringSourceConfig>,
}

impl NodeBuilder {
Expand All @@ -245,13 +253,15 @@ impl NodeBuilder {
let gossip_source_config = None;
let liquidity_source_config = None;
let log_writer_config = None;
let scoring_source_config = None;
Self {
config,
entropy_source_config,
chain_data_source_config,
gossip_source_config,
liquidity_source_config,
log_writer_config,
scoring_source_config,
}
}

Expand Down Expand Up @@ -322,6 +332,13 @@ impl NodeBuilder {
self
}

/// Configures the [`Node`] instance to source its external scores from the given URL. These scores are used to
/// augment the internal pathfinding scoring system to improve routing.
pub fn set_scoring_source(&mut self, url: String) -> &mut Self {
self.scoring_source_config = Some(ScoringSourceConfig { url });
self
}

/// Configures the [`Node`] instance to source its inbound liquidity from the given
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
/// service.
Expand Down Expand Up @@ -540,6 +557,7 @@ impl NodeBuilder {
config,
self.chain_data_source_config.as_ref(),
self.gossip_source_config.as_ref(),
self.scoring_source_config.as_ref(),
self.liquidity_source_config.as_ref(),
seed_bytes,
logger,
Expand All @@ -562,6 +580,7 @@ impl NodeBuilder {
config,
self.chain_data_source_config.as_ref(),
self.gossip_source_config.as_ref(),
self.scoring_source_config.as_ref(),
self.liquidity_source_config.as_ref(),
seed_bytes,
logger,
Expand Down Expand Up @@ -654,6 +673,12 @@ impl ArcedNodeBuilder {
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
}

/// Configures the [`Node`] instance to source its external scores from the given URL. These scores are used to
/// augment the internal pathfinding scoring system to improve routing.
pub fn set_scoring_source(&self, url: String) {
self.inner.write().unwrap().scoring_source_config = Some(ScoringSourceConfig { url });
}

/// Configures the [`Node`] instance to source its inbound liquidity from the given
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
/// service.
Expand Down Expand Up @@ -806,6 +831,7 @@ impl ArcedNodeBuilder {
fn build_with_store_internal(
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
gossip_source_config: Option<&GossipSourceConfig>,
scoring_source_config: Option<&ScoringSourceConfig>,
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
logger: Arc<Logger>, kv_store: Arc<DynStore>,
) -> Result<Node, BuildError> {
Expand Down Expand Up @@ -950,26 +976,24 @@ fn build_with_store_internal(
},
};

let scorer = match io::utils::read_scorer(
let local_scorer = match io::utils::read_scorer(
Arc::clone(&kv_store),
Arc::clone(&network_graph),
Arc::clone(&logger),
) {
Ok(scorer) => Arc::new(Mutex::new(scorer)),
Ok(scorer) => scorer,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
let params = ProbabilisticScoringDecayParameters::default();
Arc::new(Mutex::new(ProbabilisticScorer::new(
params,
Arc::clone(&network_graph),
Arc::clone(&logger),
)))
ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger))
} else {
return Err(BuildError::ReadFailed);
}
},
};

let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));

let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
let router = Arc::new(DefaultRouter::new(
Arc::clone(&network_graph),
Expand Down Expand Up @@ -1129,6 +1153,17 @@ fn build_with_store_internal(
},
};

let scoring_source = if let Some(config) = scoring_source_config {
Some(Arc::new(ScoringSource::new(
config.url.clone(),
Arc::clone(&scorer),
Arc::clone(&node_metrics),
Arc::clone(&logger),
)))
} else {
None
};

let liquidity_source = liquidity_source_config.as_ref().and_then(|lsc| {
lsc.lsps2_service.as_ref().map(|(address, node_id, token)| {
let lsps2_client_config = Some(LSPS2ClientConfig {});
Expand Down Expand Up @@ -1303,6 +1338,7 @@ fn build_with_store_internal(
keys_manager,
network_graph,
gossip_source,
scoring_source,
liquidity_source,
kv_store,
logger,
Expand Down
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);
// The time in-between RGS sync attempts.
pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);

// The time in-between external scores sync attempts.
pub(crate) const EXTERNAL_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);

// The time in-between node announcement broadcast attempts.
pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);

Expand All @@ -78,6 +81,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5;
// The timeout after which we abort a RGS sync operation.
pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5;

// The timeout after which we abort a external scores sync operation.
pub(crate) const EXTERNAL_SCORES_SYNC_TIMEOUT_SECS: u64 = 5;

// The length in bytes of our wallets' keys seed.
pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64;

Expand Down
11 changes: 10 additions & 1 deletion src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
use crate::{Error, EventQueue, NodeMetrics, PaymentDetails};

use bytes::Bytes;
use lightning::io::Cursor;
use lightning::ln::msgs::DecodeError;
use lightning::routing::gossip::NetworkGraph;
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters};
use lightning::routing::scoring::{
ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
};
use lightning::util::persist::{
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
Expand Down Expand Up @@ -166,6 +169,12 @@ where
})
}

/// Read external scores from a Bytes object.
pub(crate) fn read_external_scores(bytes: Bytes) -> Result<ChannelLiquidities, DecodeError> {
let mut reader = Cursor::new(bytes);
ChannelLiquidities::read(&mut reader)
}

/// Read previously persisted events from the store.
pub(crate) fn read_event_queue<L: Deref + Clone>(
kv_store: Arc<DynStore>, logger: L,
Expand Down
53 changes: 52 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub mod logger;
mod message_handler;
pub mod payment;
mod peer_store;
mod scoring;
mod sweep;
mod tx_broadcaster;
mod types;
Expand Down Expand Up @@ -122,7 +123,8 @@ pub use builder::NodeBuilder as Builder;

use chain::ChainSource;
use config::{
default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL,
default_user_config, may_announce_channel, ChannelConfig, Config,
EXTERNAL_SCORES_SYNC_INTERVAL, EXTERNAL_SCORES_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL,
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
};
use connection::ConnectionManager;
Expand All @@ -137,6 +139,7 @@ use payment::{
UnifiedQrPayment,
};
use peer_store::{PeerInfo, PeerStore};
use scoring::ScoringSource;
use types::{
Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph,
KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet,
Expand Down Expand Up @@ -190,6 +193,7 @@ pub struct Node {
keys_manager: Arc<KeysManager>,
network_graph: Arc<Graph>,
gossip_source: Arc<GossipSource>,
scoring_source: Option<Arc<ScoringSource>>,
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
kv_store: Arc<DynStore>,
logger: Arc<Logger>,
Expand Down Expand Up @@ -304,6 +308,10 @@ impl Node {
});
}

if self.scoring_source.is_some() {
self.setup_external_scores_syncing(&runtime);
}

if let Some(listening_addresses) = &self.config.listening_addresses {
// Setup networking
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
Expand Down Expand Up @@ -634,6 +642,42 @@ impl Node {
Ok(())
}

/// Spawn a background task to sync external scores.
fn setup_external_scores_syncing(&self, runtime: &tokio::runtime::Runtime) {
let scoring_source = Arc::clone(&self.scoring_source.as_ref().unwrap());
log_info!(
self.logger,
"External scores background syncing from {} enabled",
scoring_source.get_url()
);

let external_scores_sync_logger = Arc::clone(&self.logger);
let mut stop_sync = self.stop_sender.subscribe();

runtime.spawn(async move {
let mut interval = tokio::time::interval(EXTERNAL_SCORES_SYNC_INTERVAL);
loop {
tokio::select! {
_ = stop_sync.changed() => {
log_trace!(
external_scores_sync_logger,
"Stopping background syncing external scores.",
);
return;
}
_ = interval.tick() => {
log_trace!(
external_scores_sync_logger,
"Background sync of external scores started.",
);

scoring_source.sync_external_scores().await;
}
}
}
});
}

/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
///
/// After this returns most API methods will return [`Error::NotRunning`].
Expand Down Expand Up @@ -725,6 +769,7 @@ impl Node {
locked_node_metrics.latest_fee_rate_cache_update_timestamp;
let latest_rgs_snapshot_timestamp =
locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64);
let latest_scores_sync_timestamp = locked_node_metrics.latest_scores_sync_timestamp;
let latest_node_announcement_broadcast_timestamp =
locked_node_metrics.latest_node_announcement_broadcast_timestamp;
let latest_channel_monitor_archival_height =
Expand All @@ -738,6 +783,7 @@ impl Node {
latest_onchain_wallet_sync_timestamp,
latest_fee_rate_cache_update_timestamp,
latest_rgs_snapshot_timestamp,
latest_scores_sync_timestamp,
latest_node_announcement_broadcast_timestamp,
latest_channel_monitor_archival_height,
}
Expand Down Expand Up @@ -1547,6 +1593,8 @@ pub struct NodeStatus {
///
/// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet.
pub latest_rgs_snapshot_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores.
pub latest_scores_sync_timestamp: Option<u64>,
/// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node
/// announcement.
///
Expand All @@ -1565,6 +1613,7 @@ pub(crate) struct NodeMetrics {
latest_onchain_wallet_sync_timestamp: Option<u64>,
latest_fee_rate_cache_update_timestamp: Option<u64>,
latest_rgs_snapshot_timestamp: Option<u32>,
latest_scores_sync_timestamp: Option<u64>,
latest_node_announcement_broadcast_timestamp: Option<u64>,
latest_channel_monitor_archival_height: Option<u32>,
}
Expand All @@ -1576,6 +1625,7 @@ impl Default for NodeMetrics {
latest_onchain_wallet_sync_timestamp: None,
latest_fee_rate_cache_update_timestamp: None,
latest_rgs_snapshot_timestamp: None,
latest_scores_sync_timestamp: None,
latest_node_announcement_broadcast_timestamp: None,
latest_channel_monitor_archival_height: None,
}
Expand All @@ -1589,6 +1639,7 @@ impl_writeable_tlv_based!(NodeMetrics, {
(6, latest_rgs_snapshot_timestamp, option),
(8, latest_node_announcement_broadcast_timestamp, option),
(10, latest_channel_monitor_archival_height, option),
(12, latest_scores_sync_timestamp, option),
});

pub(crate) fn total_anchor_channels_reserve_sats(
Expand Down
Loading

0 comments on commit 619c9c9

Please sign in to comment.