Skip to content

Commit

Permalink
add functionality to periodically update routing scores from an exter…
Browse files Browse the repository at this point in the history
…nal http source
  • Loading branch information
joostjager committed Jan 30, 2025
1 parent b388ee1 commit 58acc12
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 34 deletions.
46 changes: 23 additions & 23 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,27 @@ panic = 'abort' # Abort on panic
default = []

[dependencies]
lightning = { version = "0.1.0", features = ["std"] }
lightning-types = { version = "0.2.0" }
lightning-invoice = { version = "0.33.0", features = ["std"] }
lightning-net-tokio = { version = "0.1.0" }
lightning-persister = { version = "0.1.0" }
lightning-background-processor = { version = "0.1.0", features = ["futures"] }
lightning-rapid-gossip-sync = { version = "0.1.0" }
lightning-block-sync = { version = "0.1.0", features = ["rpc-client", "tokio"] }
lightning-transaction-sync = { version = "0.1.0", features = ["esplora-async-https", "time"] }
lightning-liquidity = { version = "0.1.0", features = ["std"] }

#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["std"] }
#lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
#lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["std"] }
#lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
#lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
#lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["futures"] }
#lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
#lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["rpc-client", "tokio"] }
#lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["esplora-async-https", "time"] }
#lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" }
#lightning = { version = "0.1.0", features = ["std"] }
#lightning-types = { version = "0.2.0" }
#lightning-invoice = { version = "0.33.0", features = ["std"] }
#lightning-net-tokio = { version = "0.1.0" }
#lightning-persister = { version = "0.1.0" }
#lightning-background-processor = { version = "0.1.0", features = ["futures"] }
#lightning-rapid-gossip-sync = { version = "0.1.0" }
#lightning-block-sync = { version = "0.1.0", features = ["rpc-client", "tokio"] }
#lightning-transaction-sync = { version = "0.1.0", features = ["esplora-async-https", "time"] }
#lightning-liquidity = { version = "0.1.0", features = ["std"] }

lightning = { git = "https://github.com/joostjager/rust-lightning", branch = "merge-scores", features = ["std"] }
lightning-types = { git = "https://github.com/joostjager/rust-lightning", branch = "merge-scores" }
lightning-invoice = { git = "https://github.com/joostjager/rust-lightning", branch = "merge-scores", features = ["std"] }
lightning-net-tokio = { git = "https://github.com/joostjager/rust-lightning", branch = "merge-scores" }
lightning-persister = { git = "https://github.com/joostjager/rust-lightning", branch = "merge-scores" }
lightning-background-processor = { git = "https://github.com/joostjager/rust-lightning", branch = "merge-scores", features = ["futures"] }
lightning-rapid-gossip-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "merge-scores" }
lightning-block-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "merge-scores", features = ["rpc-client", "tokio"] }
lightning-transaction-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "merge-scores", features = ["esplora-async-https", "time"] }
lightning-liquidity = { git = "https://github.com/joostjager/rust-lightning", branch = "merge-scores" }

#lightning = { path = "../rust-lightning/lightning", features = ["std"] }
#lightning-types = { path = "../rust-lightning/lightning-types" }
Expand Down Expand Up @@ -88,8 +88,8 @@ prost = { version = "0.11.6", default-features = false}
winapi = { version = "0.3", features = ["winbase"] }

[dev-dependencies]
lightning = { version = "0.1.0", features = ["std", "_test_utils"] }
#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std", "_test_utils"] }
#lightning = { version = "0.1.0", features = ["std", "_test_utils"] }
lightning = { git = "https://github.com/joostjager/rust-lightning", branch="merge-scores", features = ["std", "_test_utils"] }
#lightning = { path = "../rust-lightning/lightning", features = ["std", "_test_utils"] }
electrum-client = { version = "0.21.0", default-features = true }
bitcoincore-rpc = { version = "0.19.0", default-features = false }
Expand Down
21 changes: 13 additions & 8 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
use lightning::routing::gossip::NodeAlias;
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::{
ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters,
CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
ProbabilisticScoringFeeParameters,
};
use lightning::sign::EntropySource;

Expand Down Expand Up @@ -273,6 +274,12 @@ impl NodeBuilder {
self
}

/// Configures the [`Node`] instance to source its external scores from the given URL.
pub fn set_external_scores_source(&mut self, url: String) -> &mut Self {
self.config.external_scores_url = Some(url);
self
}

/// Configures the [`Node`] instance to source its inbound liquidity from the given
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
/// service.
Expand Down Expand Up @@ -877,26 +884,24 @@ fn build_with_store_internal(
},
};

let scorer = match io::utils::read_scorer(
let local_scorer = match io::utils::read_scorer(
Arc::clone(&kv_store),
Arc::clone(&network_graph),
Arc::clone(&logger),
) {
Ok(scorer) => Arc::new(Mutex::new(scorer)),
Ok(scorer) => scorer,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
let params = ProbabilisticScoringDecayParameters::default();
Arc::new(Mutex::new(ProbabilisticScorer::new(
params,
Arc::clone(&network_graph),
Arc::clone(&logger),
)))
ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger))
} else {
return Err(BuildError::ReadFailed);
}
},
};

let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));

let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
let router = Arc::new(DefaultRouter::new(
Arc::clone(&network_graph),
Expand Down
10 changes: 10 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);
// The time in-between RGS sync attempts.
pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);

// The time in-between external scores sync attempts.
pub(crate) const EXTERNAL_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);

// The time in-between node announcement broadcast attempts.
pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);

Expand All @@ -71,6 +74,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5;
// The timeout after which we abort a RGS sync operation.
pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5;

// The timeout after which we abort a external scores sync operation.
pub(crate) const EXTERNAL_SCORES_SYNC_TIMEOUT_SECS: u64 = 5;

// The length in bytes of our wallets' keys seed.
pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64;

Expand Down Expand Up @@ -162,6 +168,9 @@ pub struct Config {
/// **Note:** If unset, default parameters will be used, and you will be able to override the
/// parameters on a per-payment basis in the corresponding method calls.
pub sending_parameters: Option<SendingParameters>,

/// Optional URL to periodically fetch external scores from.
pub external_scores_url: Option<String>,
}

impl Default for Config {
Expand All @@ -177,6 +186,7 @@ impl Default for Config {
anchor_channels_config: Some(AnchorChannelsConfig::default()),
sending_parameters: None,
node_alias: None,
external_scores_url: None,
}
}
}
Expand Down
87 changes: 86 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ mod wallet;
pub use bip39;
pub use bitcoin;
pub use lightning;
use lightning::routing::scoring::{ChannelLiquidities, CombinedScorer};
use lightning::util::ser::Readable;
pub use lightning_invoice;
pub use lightning_types;
pub use vss_client;
Expand All @@ -122,7 +124,8 @@ pub use builder::NodeBuilder as Builder;

use chain::ChainSource;
use config::{
default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL,
default_user_config, may_announce_channel, ChannelConfig, Config,
EXTERNAL_SCORES_SYNC_INTERVAL, EXTERNAL_SCORES_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL,
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
};
use connection::ConnectionManager;
Expand Down Expand Up @@ -162,6 +165,7 @@ use bitcoin::secp256k1::PublicKey;
use rand::Rng;

use std::default::Default;
use std::io::Cursor;
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
Expand Down Expand Up @@ -306,6 +310,38 @@ impl Node {
});
}

if let Some(url) = &self.config.external_scores_url {
log_trace!(self.logger, "External scores background syncing from {} enabled", url);

let external_scores_sync_logger = Arc::clone(&self.logger);
let external_scores_scorer = Arc::clone(&self.scorer);
let mut stop_sync = self.stop_sender.subscribe();
let url = url.clone();

runtime.spawn(async move {
let mut interval = tokio::time::interval(EXTERNAL_SCORES_SYNC_INTERVAL);
loop {
tokio::select! {
_ = stop_sync.changed() => {
log_trace!(
external_scores_sync_logger,
"Stopping background syncing external scores.",
);
return;
}
_ = interval.tick() => {
log_trace!(
external_scores_sync_logger,
"Background sync of external scores started.",
);

sync_external_scores(&external_scores_sync_logger, &external_scores_scorer, &url).await;
}
}
}
});
}

if let Some(listening_addresses) = &self.config.listening_addresses {
// Setup networking
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
Expand Down Expand Up @@ -1612,3 +1648,52 @@ pub(crate) fn total_anchor_channels_reserve_sats(
* anchor_channels_config.per_channel_reserve_sats
})
}

async fn sync_external_scores(
logger: &FilesystemLogger, scorer: &Mutex<CombinedScorer<Arc<Graph>, Arc<FilesystemLogger>>>,
url: &String,
) -> () {
let response = tokio::time::timeout(
Duration::from_secs(EXTERNAL_SCORES_SYNC_TIMEOUT_SECS),
reqwest::get(url),
)
.await;

match response {
Ok(Ok(response)) => {
let body = response.bytes().await;
match body {
Err(e) => {
log_trace!(
logger,
"Failed to read external scores update from http source: {}",
e
);
return;
},
Ok(body) => {
let mut reader = Cursor::new(body);
match ChannelLiquidities::read(&mut reader) {
Ok(liquidities) => {
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
scorer.lock().unwrap().merge(liquidities, duration_since_epoch);
log_trace!(logger, "External scores merged successfully",);
},
Err(e) => {
log_trace!(logger, "Failed to parse external scores update: {}", e);
return;
},
}
},
}
},
Err(e) => {
log_trace!(logger, "Retrieving external scores timed out: {}", e);
return;
},
Ok(Err(e)) => {
log_trace!(logger, "Failed to retrieve external scores update: {}", e);
return;
},
}
}
5 changes: 3 additions & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use lightning::ln::peer_handler::IgnoringMessageHandler;
use lightning::ln::types::ChannelId;
use lightning::routing::gossip;
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
use lightning::routing::scoring::CombinedScorer;
use lightning::routing::scoring::ProbabilisticScoringFeeParameters;
use lightning::sign::InMemorySigner;
use lightning::util::persist::KVStore;
use lightning::util::ser::{Readable, Writeable, Writer};
Expand Down Expand Up @@ -91,7 +92,7 @@ pub(crate) type Router = DefaultRouter<
ProbabilisticScoringFeeParameters,
Scorer,
>;
pub(crate) type Scorer = ProbabilisticScorer<Arc<Graph>, Arc<FilesystemLogger>>;
pub(crate) type Scorer = CombinedScorer<Arc<Graph>, Arc<FilesystemLogger>>;

pub(crate) type Graph = gossip::NetworkGraph<Arc<FilesystemLogger>>;

Expand Down

0 comments on commit 58acc12

Please sign in to comment.