Skip to content

Commit

Permalink
cache external pathfinding scores
Browse files Browse the repository at this point in the history
Save external pathfinding scores in a cache so that they will be
available immediately after a node restart. Otherwise there might be a
time window where new scores need to be downloaded still and the node
operates on local data only.
  • Loading branch information
joostjager committed Feb 11, 2025
1 parent 51915a9 commit d6bb7cf
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 9 deletions.
21 changes: 19 additions & 2 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use crate::event::EventQueue;
use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::GossipSource;
use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{read_node_metrics, write_node_metrics};
use crate::io::utils::{
read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics,
};
use crate::io::vss_store::VssStore;
use crate::liquidity::LiquiditySource;
use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger};
use crate::logger::{log_error, log_info, log_trace, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::store::PaymentStore;
use crate::peer_store::PeerStore;
Expand Down Expand Up @@ -1002,6 +1004,21 @@ fn build_with_store_internal(

let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));

// Restore external pathfinding scores from cache if possible.
match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger))
{
Ok(external_scores) => {
scorer.lock().unwrap().merge(external_scores, cur_time);
log_trace!(logger, "External scores from cache merged successfully");
}
Err(e) => {
if e.kind() != std::io::ErrorKind::NotFound {
log_error!(logger, "Error while reading external scores from cache: {}", e);
return Err(BuildError::ReadFailed);
}
},
}

let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
let router = Arc::new(DefaultRouter::new(
Arc::clone(&network_graph),
Expand Down
51 changes: 50 additions & 1 deletion src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::{Error, EventQueue, NodeMetrics, PaymentDetails};
use lightning::io::Cursor;
use lightning::ln::msgs::DecodeError;
use lightning::routing::gossip::NetworkGraph;
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters};
use lightning::routing::scoring::{
ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
};
use lightning::util::persist::{
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
Expand Down Expand Up @@ -52,6 +54,8 @@ use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;

pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache";

/// Generates a random [BIP 39] mnemonic.
///
/// The result may be used to initialize the [`Node`] entropy, i.e., can be given to
Expand Down Expand Up @@ -166,6 +170,51 @@ where
})
}

/// Read previously persisted external pathfinding scores from the cache.
pub(crate) fn read_external_pathfinding_scores_from_cache<L: Deref>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<ChannelLiquidities, std::io::Error>
where
L::Target: LdkLogger,
{
let mut reader = Cursor::new(kv_store.read(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
)?);
ChannelLiquidities::read(&mut reader).map_err(|e| {
log_error!(logger, "Failed to deserialize scorer: {}", e);
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer")
})
}

/// Persist external pathfinding scores to the cache.
pub(crate) fn write_external_pathfinding_scores_to_cache<L: Deref>(
kv_store: Arc<DynStore>, data: &ChannelLiquidities, logger: L,
) -> Result<(), Error>
where
L::Target: LdkLogger,
{
kv_store
.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
&data.encode(),
)
.map_err(|e| {
log_error!(
logger,
"Writing data to key {}/{}/{} failed due to: {}",
NODE_METRICS_PRIMARY_NAMESPACE,
NODE_METRICS_SECONDARY_NAMESPACE,
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
e
);
Error::PersistenceFailed
})
}

/// Read previously persisted events from the store.
pub(crate) fn read_event_queue<L: Deref + Clone>(
kv_store: Arc<DynStore>, logger: L,
Expand Down
22 changes: 16 additions & 6 deletions src/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::{
time::{Duration, SystemTime},
};

use crate::{logger::LdkLogger, NodeMetrics, Scorer};
use crate::io::utils::write_external_pathfinding_scores_to_cache;
use crate::logger::LdkLogger;
use crate::{
write_node_metrics, DynStore, Logger, EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL,
EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
write_node_metrics, DynStore, Logger, NodeMetrics, Scorer,
EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
};
use lightning::{
log_error, log_info, log_trace, routing::scoring::ChannelLiquidities, util::ser::Readable,
Expand Down Expand Up @@ -78,15 +79,24 @@ async fn sync_external_scores(
let mut reader = Cursor::new(body.unwrap());
match ChannelLiquidities::read(&mut reader) {
Ok(liquidities) => {
if let Err(e) = write_external_pathfinding_scores_to_cache(
Arc::clone(&kv_store),
&liquidities,
logger,
) {
log_error!(logger, "Failed to persist external scores to cache: {}", e);
}

let duration_since_epoch =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
scorer.lock().unwrap().merge(liquidities, duration_since_epoch);
let mut locked_node_metrics = node_metrics.write().unwrap();
locked_node_metrics.latest_pathfinding_scores_sync_timestamp =
Some(duration_since_epoch.as_secs());
write_node_metrics(&*locked_node_metrics, kv_store, logger).unwrap_or_else(|e| {
log_error!(logger, "Persisting node metrics failed: {}", e);
});
write_node_metrics(&*locked_node_metrics, Arc::clone(&kv_store), logger)
.unwrap_or_else(|e| {
log_error!(logger, "Persisting node metrics failed: {}", e);
});
log_trace!(logger, "External scores merged successfully");
},
Err(e) => {
Expand Down

0 comments on commit d6bb7cf

Please sign in to comment.