Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodical external pathfinding scores merge #449

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ panic = 'abort' # Abort on panic
default = []

[dependencies]
lightning = { version = "0.1.0", features = ["std"] }
lightning-types = { version = "0.2.0" }
lightning-invoice = { version = "0.33.0", features = ["std"] }
lightning-net-tokio = { version = "0.1.0" }
lightning-persister = { version = "0.1.0" }
lightning-background-processor = { version = "0.1.0", features = ["futures"] }
lightning-rapid-gossip-sync = { version = "0.1.0" }
lightning-block-sync = { version = "0.1.0", features = ["rpc-client", "tokio"] }
lightning-transaction-sync = { version = "0.1.0", features = ["esplora-async-https", "time"] }
lightning-liquidity = { version = "0.1.0", features = ["std"] }
#lightning = { version = "0.1.0", features = ["std"] }
#lightning-types = { version = "0.2.0" }
#lightning-invoice = { version = "0.33.0", features = ["std"] }
#lightning-net-tokio = { version = "0.1.0" }
#lightning-persister = { version = "0.1.0" }
#lightning-background-processor = { version = "0.1.0", features = ["futures"] }
#lightning-rapid-gossip-sync = { version = "0.1.0" }
#lightning-block-sync = { version = "0.1.0", features = ["rpc-client", "tokio"] }
#lightning-transaction-sync = { version = "0.1.0", features = ["esplora-async-https", "time"] }
#lightning-liquidity = { version = "0.1.0", features = ["std"] }

#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["std"] }
#lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
Expand All @@ -50,6 +50,18 @@ lightning-liquidity = { version = "0.1.0", features = ["std"] }
#lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["esplora-async-https", "time"] }
#lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }

lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["std"] }
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" }
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["std"] }
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" }
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" }
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["futures"] }
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" }
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["rpc-client", "tokio"] }
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["esplora-async-https", "time"] }
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" }
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" }

#lightning = { path = "../rust-lightning/lightning", features = ["std"] }
#lightning-types = { path = "../rust-lightning/lightning-types" }
#lightning-invoice = { path = "../rust-lightning/lightning-invoice", features = ["std"] }
Expand Down Expand Up @@ -89,8 +101,9 @@ prost = { version = "0.11.6", default-features = false}
winapi = { version = "0.3", features = ["winbase"] }

[dev-dependencies]
lightning = { version = "0.1.0", features = ["std", "_test_utils"] }
#lightning = { version = "0.1.0", features = ["std", "_test_utils"] }
#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std", "_test_utils"] }
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["std", "_test_utils"] }
#lightning = { path = "../rust-lightning/lightning", features = ["std", "_test_utils"] }
electrum-client = { version = "0.21.0", default-features = true }
bitcoincore-rpc = { version = "0.19.0", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ interface Builder {
void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password);
void set_gossip_source_p2p();
void set_gossip_source_rgs(string rgs_server_url);
void set_pathfinding_scores_source(string url);
void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token);
void set_storage_dir_path(string storage_dir_path);
void set_filesystem_logger(string? log_file_path, LogLevel? max_log_level);
Expand Down Expand Up @@ -275,6 +276,7 @@ dictionary NodeStatus {
u64? latest_onchain_wallet_sync_timestamp;
u64? latest_fee_rate_cache_update_timestamp;
u64? latest_rgs_snapshot_timestamp;
u64? latest_pathfinding_scores_sync_timestamp;
u64? latest_node_announcement_broadcast_timestamp;
u32? latest_channel_monitor_archival_height;
};
Expand Down
68 changes: 56 additions & 12 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use crate::event::EventQueue;
use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::GossipSource;
use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{read_node_metrics, write_node_metrics};
use crate::io::utils::{
read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics,
};
use crate::io::vss_store::VssStore;
use crate::liquidity::LiquiditySource;
use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger};
use crate::logger::{log_error, log_info, log_trace, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::store::PaymentStore;
use crate::peer_store::PeerStore;
Expand All @@ -41,7 +43,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
use lightning::routing::gossip::NodeAlias;
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::{
ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters,
CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
ProbabilisticScoringFeeParameters,
};
use lightning::sign::EntropySource;

Expand Down Expand Up @@ -97,6 +100,11 @@ enum GossipSourceConfig {
RapidGossipSync(String),
}

#[derive(Debug, Clone)]
struct PathfindingScoresSyncConfig {
url: String,
}

#[derive(Debug, Clone)]
struct LiquiditySourceConfig {
// LSPS2 service's (address, node_id, token)
Expand Down Expand Up @@ -211,6 +219,7 @@ pub struct NodeBuilder {
gossip_source_config: Option<GossipSourceConfig>,
liquidity_source_config: Option<LiquiditySourceConfig>,
log_writer_config: Option<LogWriterConfig>,
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
}

impl NodeBuilder {
Expand All @@ -227,13 +236,15 @@ impl NodeBuilder {
let gossip_source_config = None;
let liquidity_source_config = None;
let log_writer_config = None;
let pathfinding_scores_sync_config = None;
Self {
config,
entropy_source_config,
chain_data_source_config,
gossip_source_config,
liquidity_source_config,
log_writer_config,
pathfinding_scores_sync_config,
}
}

Expand Down Expand Up @@ -304,6 +315,14 @@ impl NodeBuilder {
self
}

/// Configures the [`Node`] instance to source its external scores from the given URL.
///
/// The external scores are merged into the local scoring system to improve routing.
pub fn set_pathfinding_scores_source(&mut self, url: String) -> &mut Self {
self.pathfinding_scores_sync_config = Some(PathfindingScoresSyncConfig { url });
self
}

/// Configures the [`Node`] instance to source its inbound liquidity from the given
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
/// service.
Expand Down Expand Up @@ -529,6 +548,7 @@ impl NodeBuilder {
config,
self.chain_data_source_config.as_ref(),
self.gossip_source_config.as_ref(),
self.pathfinding_scores_sync_config.as_ref(),
self.liquidity_source_config.as_ref(),
seed_bytes,
logger,
Expand All @@ -551,6 +571,7 @@ impl NodeBuilder {
config,
self.chain_data_source_config.as_ref(),
self.gossip_source_config.as_ref(),
self.pathfinding_scores_sync_config.as_ref(),
self.liquidity_source_config.as_ref(),
seed_bytes,
logger,
Expand Down Expand Up @@ -643,6 +664,13 @@ impl ArcedNodeBuilder {
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
}

/// Configures the [`Node`] instance to source its external scores from the given URL.
///
/// The external scores are merged into the local scoring system to improve routing.
pub fn set_pathfinding_scores_source(&self, url: String) {
self.inner.write().unwrap().set_pathfinding_scores_source(url);
}

/// Configures the [`Node`] instance to source its inbound liquidity from the given
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
/// service.
Expand Down Expand Up @@ -802,6 +830,7 @@ impl ArcedNodeBuilder {
fn build_with_store_internal(
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
gossip_source_config: Option<&GossipSourceConfig>,
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
logger: Arc<Logger>, kv_store: Arc<DynStore>,
) -> Result<Node, BuildError> {
Expand Down Expand Up @@ -957,26 +986,38 @@ fn build_with_store_internal(
},
};

let scorer = match io::utils::read_scorer(
let local_scorer = match io::utils::read_scorer(
Arc::clone(&kv_store),
Arc::clone(&network_graph),
Arc::clone(&logger),
) {
Ok(scorer) => Arc::new(Mutex::new(scorer)),
Ok(scorer) => scorer,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
let params = ProbabilisticScoringDecayParameters::default();
Arc::new(Mutex::new(ProbabilisticScorer::new(
params,
Arc::clone(&network_graph),
Arc::clone(&logger),
)))
ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger))
} else {
return Err(BuildError::ReadFailed);
}
},
};

let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));

// Restore external pathfinding scores from cache if possible.
match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(external_scores) => {
scorer.lock().unwrap().merge(external_scores, cur_time);
log_trace!(logger, "External scores from cache merged successfully");
},
Err(e) => {
if e.kind() != std::io::ErrorKind::NotFound {
log_error!(logger, "Error while reading external scores from cache: {}", e);
return Err(BuildError::ReadFailed);
}
},
}

let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
let router = Arc::new(DefaultRouter::new(
Arc::clone(&network_graph),
Expand Down Expand Up @@ -1078,8 +1119,8 @@ fn build_with_store_internal(

// Give ChannelMonitors to ChainMonitor
for (_blockhash, channel_monitor) in channel_monitors.into_iter() {
let funding_outpoint = channel_monitor.get_funding_txo().0;
chain_monitor.watch_channel(funding_outpoint, channel_monitor).map_err(|e| {
let channel_id = channel_monitor.channel_id();
chain_monitor.watch_channel(channel_id, channel_monitor).map_err(|e| {
log_error!(logger, "Failed to watch channel monitor: {:?}", e);
BuildError::InvalidChannelMonitor
})?;
Expand Down Expand Up @@ -1282,6 +1323,8 @@ fn build_with_store_internal(
let (stop_sender, _) = tokio::sync::watch::channel(());
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());

let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone());

Ok(Node {
runtime,
stop_sender,
Expand All @@ -1300,6 +1343,7 @@ fn build_with_store_internal(
keys_manager,
network_graph,
gossip_source,
pathfinding_scores_sync_url,
liquidity_source,
kv_store,
logger,
Expand Down
2 changes: 1 addition & 1 deletion src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ impl ChainSource {
if let Some(worst_channel_monitor_block_hash) = chain_monitor
.list_monitors()
.iter()
.flat_map(|(txo, _)| chain_monitor.get_monitor(*txo))
.flat_map(|channel_id| chain_monitor.get_monitor(*channel_id))
.map(|m| m.current_best_block())
.min_by_key(|b| b.height)
.map(|b| b.block_hash)
Expand Down
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);
// The time in-between RGS sync attempts.
pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);

// The time in-between external scores sync attempts.
pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);

// The time in-between node announcement broadcast attempts.
pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);

Expand All @@ -78,6 +81,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5;
// The timeout after which we abort a RGS sync operation.
pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5;

// The timeout after which we abort a external scores sync operation.
pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS: u64 = 5;

// The length in bytes of our wallets' keys seed.
pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64;

Expand Down
51 changes: 50 additions & 1 deletion src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::{Error, EventQueue, NodeMetrics, PaymentDetails};
use lightning::io::Cursor;
use lightning::ln::msgs::DecodeError;
use lightning::routing::gossip::NetworkGraph;
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters};
use lightning::routing::scoring::{
ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
};
use lightning::util::persist::{
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
Expand Down Expand Up @@ -52,6 +54,8 @@ use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;

pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache";

/// Generates a random [BIP 39] mnemonic.
///
/// The result may be used to initialize the [`Node`] entropy, i.e., can be given to
Expand Down Expand Up @@ -166,6 +170,51 @@ where
})
}

/// Read previously persisted external pathfinding scores from the cache.
pub(crate) fn read_external_pathfinding_scores_from_cache<L: Deref>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<ChannelLiquidities, std::io::Error>
where
L::Target: LdkLogger,
{
let mut reader = Cursor::new(kv_store.read(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
)?);
ChannelLiquidities::read(&mut reader).map_err(|e| {
log_error!(logger, "Failed to deserialize scorer: {}", e);
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer")
})
}

/// Persist external pathfinding scores to the cache.
pub(crate) fn write_external_pathfinding_scores_to_cache<L: Deref>(
kv_store: Arc<DynStore>, data: &ChannelLiquidities, logger: L,
) -> Result<(), Error>
where
L::Target: LdkLogger,
{
kv_store
.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
&data.encode(),
)
.map_err(|e| {
log_error!(
logger,
"Writing data to key {}/{}/{} failed due to: {}",
NODE_METRICS_PRIMARY_NAMESPACE,
NODE_METRICS_SECONDARY_NAMESPACE,
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
e
);
Error::PersistenceFailed
})
}

/// Read previously persisted events from the store.
pub(crate) fn read_event_queue<L: Deref + Clone>(
kv_store: Arc<DynStore>, logger: L,
Expand Down
Loading