Skip to content

Commit

Permalink
Use MonitorUpdatingPersister
Browse files Browse the repository at this point in the history
  • Loading branch information
arturgontijo committed Feb 1, 2025
1 parent 3b36020 commit 14494e1
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 83 deletions.
62 changes: 34 additions & 28 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::peer_store::PeerStore;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
OnionMessenger, PeerManager,
OnionMessenger, PeerManager, Persister,
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
Expand All @@ -46,7 +46,7 @@ use lightning::routing::scoring::{
use lightning::sign::EntropySource;

use lightning::util::persist::{
read_channel_monitors, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::ReadableArgs;
Expand Down Expand Up @@ -908,15 +908,6 @@ fn build_with_store_internal(

let runtime = Arc::new(RwLock::new(None));

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&chain_source)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Arc::clone(&fee_estimator),
Arc::clone(&kv_store),
));

// Initialize the KeysManager
let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| {
log_error!(logger, "Failed to get current time: {}", e);
Expand All @@ -932,6 +923,38 @@ fn build_with_store_internal(
Arc::clone(&logger),
));

let persister = Arc::new(Persister::new(
Arc::clone(&kv_store),
Arc::clone(&logger),
10, // (?)
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
));

// Read ChannelMonitor state from store
let channel_monitors = match persister.read_all_channel_monitors_with_updates() {
Ok(monitors) => monitors,
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound {
Vec::new()
} else {
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
return Err(BuildError::ReadFailed);
}
},
};

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&chain_source)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Arc::clone(&fee_estimator),
Arc::clone(&persister),
));

// Initialize the network graph, scorer, and router
let network_graph =
match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) {
Expand Down Expand Up @@ -974,23 +997,6 @@ fn build_with_store_internal(
scoring_fee_params,
));

// Read ChannelMonitor state from store
let channel_monitors = match read_channel_monitors(
Arc::clone(&kv_store),
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
) {
Ok(monitors) => monitors,
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound {
Vec::new()
} else {
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
return Err(BuildError::ReadFailed);
}
},
};

let mut user_config = default_user_config(&config);
if liquidity_source_config.and_then(|lsc| lsc.lsps2_service.as_ref()).is_some() {
// Generally allow claiming underpaying HTLCs as the LSP will skim off some fee. We'll
Expand Down
158 changes: 105 additions & 53 deletions src/io/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@

use lightning::ln::functional_test_utils::{
connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block,
create_network, create_node_cfgs, create_node_chanmgrs, send_payment,
create_network, create_node_cfgs, create_node_chanmgrs, send_payment, check_closed_event,
};
use lightning::util::persist::{read_channel_monitors, KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN};
use lightning::util::persist::{MonitorUpdatingPersister, MonitorName, KVStore, CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, KVSTORE_NAMESPACE_KEY_MAX_LEN};

use lightning::events::ClosureReason;
use lightning::util::test_utils;
use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event};
use lightning::{check_added_monitors, check_closed_broadcast};

use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};

use std::panic::RefUnwindSafe;
use std::path::PathBuf;

const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;

pub(crate) fn random_storage_path() -> PathBuf {
let mut temp_path = std::env::temp_dir();
let mut rng = thread_rng();
Expand Down Expand Up @@ -81,54 +83,104 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStore + RefUnwindSafe>(kv_s
// Integration-test the given KVStore implementation. Test relaying a few payments and check that
// the persisted data is updated the appropriate number of times.
pub(crate) fn do_test_store<K: KVStore>(store_0: &K, store_1: &K) {
// This value is used later to limit how many iterations we perform.
let persister_0_max_pending_updates = 7;
// Intentionally set this to a smaller value to test a different alignment.
let persister_1_max_pending_updates = 3;

let chanmon_cfgs = create_chanmon_cfgs(2);

let persister_0 = MonitorUpdatingPersister::new(
store_0,
&chanmon_cfgs[0].logger,
persister_0_max_pending_updates,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].fee_estimator,
);

let persister_1 = MonitorUpdatingPersister::new(
store_1,
&chanmon_cfgs[1].logger,
persister_1_max_pending_updates,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].fee_estimator,
);

let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);

let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].logger,
&chanmon_cfgs[0].fee_estimator,
store_0,
node_cfgs[0].keys_manager,
&persister_0,
&chanmon_cfgs[0].keys_manager,
);

let chain_mon_1 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[1].chain_source),
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].logger,
&chanmon_cfgs[1].fee_estimator,
store_1,
node_cfgs[1].keys_manager,
&persister_1,
&chanmon_cfgs[1].keys_manager,
);

node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

// Check that the persisted channel data is empty before any channels are
// open.
let mut persisted_chan_data_0 =
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_0.len(), 0);
let mut persisted_chan_data_1 =
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 0);

// Helper to make sure the channel is on the expected update ID.
macro_rules! check_persisted_data {
($expected_update_id: expr) => {
persisted_chan_data_0 =
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager)
.unwrap();
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
// check that we stored only one monitor
assert_eq!(persisted_chan_data_0.len(), 1);
for (_, mon) in persisted_chan_data_0.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);

let monitor_name = MonitorName::from(mon.get_funding_txo().0);
assert_eq!(
store_0
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()
)
.unwrap()
.len() as u64,
mon.get_latest_update_id() % persister_0_max_pending_updates,
"Wrong number of updates stored in persister 0",
);
}
persisted_chan_data_1 =
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager)
.unwrap();
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 1);
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);

let monitor_name = MonitorName::from(mon.get_funding_txo().0);
assert_eq!(
store_1
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()
)
.unwrap()
.len() as u64,
mon.get_latest_update_id() % persister_1_max_pending_updates,
"Wrong number of updates stored in persister 1",
);
}
};
}
Expand All @@ -138,52 +190,52 @@ pub(crate) fn do_test_store<K: KVStore>(store_0: &K, store_1: &K) {
check_persisted_data!(0);

// Send a few payments and make sure the monitors are updated to the latest.
send_payment(&nodes[0], &vec![&nodes[1]][..], 8000000);
check_persisted_data!(5);
send_payment(&nodes[1], &vec![&nodes[0]][..], 4000000);
check_persisted_data!(10);
send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);

// Send a few more payments to try all the alignments of max pending updates with
// updates for a payment sent and received.
let mut sender = 0;
for i in 3..=persister_0_max_pending_updates * 2 {
let receiver;
if sender == 0 {
sender = 1;
receiver = 0;
} else {
sender = 0;
receiver = 1;
}
send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000);
check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
}

// Force close because cooperative close doesn't result in any persisted
// updates.
nodes[0]
.node
.force_close_broadcasting_latest_txn(
&nodes[0].node.list_channels()[0].channel_id,
&nodes[1].node.get_our_node_id(),
"whoops".to_string(),
)
.unwrap();
check_closed_event!(
nodes[0],
1,
ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) },
[nodes[1].node.get_our_node_id()],
100000
);

let node_id_1 = nodes[1].node.get_our_node_id();
let chan_id = nodes[0].node.list_channels()[0].channel_id;
let err_msg = "Channel force-closed".to_string();
nodes[0].node.force_close_broadcasting_latest_txn(&chan_id, &node_id_1, err_msg).unwrap();

let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) };
check_closed_event(&nodes[0], 1, reason, false, &[node_id_1], 100000);
check_closed_broadcast!(nodes[0], true);
check_added_monitors!(nodes[0], 1);

let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
assert_eq!(node_txn.len(), 1);
let txn = vec![node_txn[0].clone(), node_txn[0].clone()];
let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn);
connect_block(&nodes[1], &dummy_block);

connect_block(
&nodes[1],
&create_dummy_block(
nodes[0].best_block_hash(),
42,
vec![node_txn[0].clone(), node_txn[0].clone()],
),
);
check_closed_broadcast!(nodes[1], true);
check_closed_event!(
nodes[1],
1,
ClosureReason::CommitmentTxConfirmed,
[nodes[0].node.get_our_node_id()],
100000
);
let reason = ClosureReason::CommitmentTxConfirmed;
let node_id_0 = nodes[0].node.get_our_node_id();
check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
check_added_monitors!(nodes[1], 1);

// Make sure everything is persisted as expected after close.
check_persisted_data!(11);
check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1);
}
13 changes: 11 additions & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use lightning::routing::gossip;
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
use lightning::sign::InMemorySigner;
use lightning::util::persist::KVStore;
use lightning::util::persist::{MonitorUpdatingPersister, KVStore};
use lightning::util::ser::{Readable, Writeable, Writer};
use lightning::util::sweep::OutputSweeper;

Expand All @@ -38,13 +38,22 @@ use std::sync::{Arc, Mutex};

pub(crate) type DynStore = dyn KVStore + Sync + Send;

pub type Persister = MonitorUpdatingPersister<
Arc<DynStore>,
Arc<Logger>,
Arc<KeysManager>,
Arc<KeysManager>,
Arc<Broadcaster>,
Arc<OnchainFeeEstimator>,
>;

pub(crate) type ChainMonitor = chainmonitor::ChainMonitor<
InMemorySigner,
Arc<ChainSource>,
Arc<Broadcaster>,
Arc<OnchainFeeEstimator>,
Arc<Logger>,
Arc<DynStore>,
Arc<Persister>,
>;

pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager<
Expand Down

0 comments on commit 14494e1

Please sign in to comment.