diff --git a/src/builder.rs b/src/builder.rs index 24065b98f..af6e4522a 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -26,7 +26,7 @@ use crate::peer_store::PeerStore; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter, - OnionMessenger, PeerManager, + OnionMessenger, PeerManager, Persister, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -46,7 +46,7 @@ use lightning::routing::scoring::{ use lightning::sign::EntropySource; use lightning::util::persist::{ - read_channel_monitors, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::ReadableArgs; @@ -908,15 +908,6 @@ fn build_with_store_internal( let runtime = Arc::new(RwLock::new(None)); - // Initialize the ChainMonitor - let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( - Some(Arc::clone(&chain_source)), - Arc::clone(&tx_broadcaster), - Arc::clone(&logger), - Arc::clone(&fee_estimator), - Arc::clone(&kv_store), - )); - // Initialize the KeysManager let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| { log_error!(logger, "Failed to get current time: {}", e); @@ -932,6 +923,38 @@ fn build_with_store_internal( Arc::clone(&logger), )); + let persister = Arc::new(Persister::new( + Arc::clone(&kv_store), + Arc::clone(&logger), + 10, // (?) + Arc::clone(&keys_manager), + Arc::clone(&keys_manager), + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + )); + + // Read ChannelMonitor state from store + let channel_monitors = match persister.read_all_channel_monitors_with_updates() { + Ok(monitors) => monitors, + Err(e) => { + if e.kind() == lightning::io::ErrorKind::NotFound { + Vec::new() + } else { + log_error!(logger, "Failed to read channel monitors: {}", e.to_string()); + return Err(BuildError::ReadFailed); + } + }, + }; + + // Initialize the ChainMonitor + let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( + Some(Arc::clone(&chain_source)), + Arc::clone(&tx_broadcaster), + Arc::clone(&logger), + Arc::clone(&fee_estimator), + Arc::clone(&persister), + )); + // Initialize the network graph, scorer, and router let network_graph = match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) { @@ -974,23 +997,6 @@ fn build_with_store_internal( scoring_fee_params, )); - // Read ChannelMonitor state from store - let channel_monitors = match read_channel_monitors( - Arc::clone(&kv_store), - Arc::clone(&keys_manager), - Arc::clone(&keys_manager), - ) { - Ok(monitors) => monitors, - Err(e) => { - if e.kind() == lightning::io::ErrorKind::NotFound { - Vec::new() - } else { - log_error!(logger, "Failed to read channel monitors: {}", e.to_string()); - return Err(BuildError::ReadFailed); - } - }, - }; - let mut user_config = default_user_config(&config); if liquidity_source_config.and_then(|lsc| lsc.lsps2_service.as_ref()).is_some() { // Generally allow claiming underpaying HTLCs as the LSP will skim off some fee. We'll diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index aed03b6fc..1abb92436 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -7,13 +7,13 @@ use lightning::ln::functional_test_utils::{ connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block, - create_network, create_node_cfgs, create_node_chanmgrs, send_payment, + create_network, create_node_cfgs, create_node_chanmgrs, send_payment, check_closed_event, }; -use lightning::util::persist::{read_channel_monitors, KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN}; +use lightning::util::persist::{MonitorUpdatingPersister, MonitorName, KVStore, CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, KVSTORE_NAMESPACE_KEY_MAX_LEN}; use lightning::events::ClosureReason; use lightning::util::test_utils; -use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event}; +use lightning::{check_added_monitors, check_closed_broadcast}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; @@ -21,6 +21,8 @@ use rand::{thread_rng, Rng}; use std::panic::RefUnwindSafe; use std::path::PathBuf; +const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5; + pub(crate) fn random_storage_path() -> PathBuf { let mut temp_path = std::env::temp_dir(); let mut rng = thread_rng(); @@ -81,24 +83,53 @@ pub(crate) fn do_read_write_remove_list_persist(kv_s // Integration-test the given KVStore implementation. Test relaying a few payments and check that // the persisted data is updated the appropriate number of times. pub(crate) fn do_test_store(store_0: &K, store_1: &K) { + // This value is used later to limit how many iterations we perform. + let persister_0_max_pending_updates = 7; + // Intentionally set this to a smaller value to test a different alignment. + let persister_1_max_pending_updates = 3; + let chanmon_cfgs = create_chanmon_cfgs(2); + + let persister_0 = MonitorUpdatingPersister::new( + store_0, + &chanmon_cfgs[0].logger, + persister_0_max_pending_updates, + &chanmon_cfgs[0].keys_manager, + &chanmon_cfgs[0].keys_manager, + &chanmon_cfgs[0].tx_broadcaster, + &chanmon_cfgs[0].fee_estimator, + ); + + let persister_1 = MonitorUpdatingPersister::new( + store_1, + &chanmon_cfgs[1].logger, + persister_1_max_pending_updates, + &chanmon_cfgs[1].keys_manager, + &chanmon_cfgs[1].keys_manager, + &chanmon_cfgs[1].tx_broadcaster, + &chanmon_cfgs[1].fee_estimator, + ); + let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let chain_mon_0 = test_utils::TestChainMonitor::new( Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, - store_0, - node_cfgs[0].keys_manager, + &persister_0, + &chanmon_cfgs[0].keys_manager, ); + let chain_mon_1 = test_utils::TestChainMonitor::new( Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, - store_1, - node_cfgs[1].keys_manager, + &persister_1, + &chanmon_cfgs[1].keys_manager, ); + node_cfgs[0].chain_monitor = chain_mon_0; node_cfgs[1].chain_monitor = chain_mon_1; let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); @@ -106,29 +137,50 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { // Check that the persisted channel data is empty before any channels are // open. - let mut persisted_chan_data_0 = - read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap(); + let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_0.len(), 0); - let mut persisted_chan_data_1 = - read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap(); + let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_1.len(), 0); // Helper to make sure the channel is on the expected update ID. macro_rules! check_persisted_data { ($expected_update_id: expr) => { - persisted_chan_data_0 = - read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager) - .unwrap(); + persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); + // check that we stored only one monitor assert_eq!(persisted_chan_data_0.len(), 1); for (_, mon) in persisted_chan_data_0.iter() { assert_eq!(mon.get_latest_update_id(), $expected_update_id); + + let monitor_name = MonitorName::from(mon.get_funding_txo().0); + assert_eq!( + store_0 + .list( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_name.as_str() + ) + .unwrap() + .len() as u64, + mon.get_latest_update_id() % persister_0_max_pending_updates, + "Wrong number of updates stored in persister 0", + ); } - persisted_chan_data_1 = - read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager) - .unwrap(); + persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_1.len(), 1); for (_, mon) in persisted_chan_data_1.iter() { assert_eq!(mon.get_latest_update_id(), $expected_update_id); + + let monitor_name = MonitorName::from(mon.get_funding_txo().0); + assert_eq!( + store_1 + .list( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_name.as_str() + ) + .unwrap() + .len() as u64, + mon.get_latest_update_id() % persister_1_max_pending_updates, + "Wrong number of updates stored in persister 1", + ); } }; } @@ -138,52 +190,52 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { check_persisted_data!(0); // Send a few payments and make sure the monitors are updated to the latest. - send_payment(&nodes[0], &vec![&nodes[1]][..], 8000000); - check_persisted_data!(5); - send_payment(&nodes[1], &vec![&nodes[0]][..], 4000000); - check_persisted_data!(10); + send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000); + check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT); + send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000); + check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT); + + // Send a few more payments to try all the alignments of max pending updates with + // updates for a payment sent and received. + let mut sender = 0; + for i in 3..=persister_0_max_pending_updates * 2 { + let receiver; + if sender == 0 { + sender = 1; + receiver = 0; + } else { + sender = 0; + receiver = 1; + } + send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000); + check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT); + } // Force close because cooperative close doesn't result in any persisted // updates. - nodes[0] - .node - .force_close_broadcasting_latest_txn( - &nodes[0].node.list_channels()[0].channel_id, - &nodes[1].node.get_our_node_id(), - "whoops".to_string(), - ) - .unwrap(); - check_closed_event!( - nodes[0], - 1, - ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }, - [nodes[1].node.get_our_node_id()], - 100000 - ); + + let node_id_1 = nodes[1].node.get_our_node_id(); + let chan_id = nodes[0].node.list_channels()[0].channel_id; + let err_msg = "Channel force-closed".to_string(); + nodes[0].node.force_close_broadcasting_latest_txn(&chan_id, &node_id_1, err_msg).unwrap(); + + let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }; + check_closed_event(&nodes[0], 1, reason, false, &[node_id_1], 100000); check_closed_broadcast!(nodes[0], true); check_added_monitors!(nodes[0], 1); - let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap(); + let node_txn = nodes[0].tx_broadcaster.txn_broadcast(); assert_eq!(node_txn.len(), 1); + let txn = vec![node_txn[0].clone(), node_txn[0].clone()]; + let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn); + connect_block(&nodes[1], &dummy_block); - connect_block( - &nodes[1], - &create_dummy_block( - nodes[0].best_block_hash(), - 42, - vec![node_txn[0].clone(), node_txn[0].clone()], - ), - ); check_closed_broadcast!(nodes[1], true); - check_closed_event!( - nodes[1], - 1, - ClosureReason::CommitmentTxConfirmed, - [nodes[0].node.get_our_node_id()], - 100000 - ); + let reason = ClosureReason::CommitmentTxConfirmed; + let node_id_0 = nodes[0].node.get_our_node_id(); + check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000); check_added_monitors!(nodes[1], 1); // Make sure everything is persisted as expected after close. - check_persisted_data!(11); + check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1); } diff --git a/src/types.rs b/src/types.rs index 1c9ab64b9..749d638ed 100644 --- a/src/types.rs +++ b/src/types.rs @@ -23,7 +23,7 @@ use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; -use lightning::util::persist::KVStore; +use lightning::util::persist::{MonitorUpdatingPersister, KVStore}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; @@ -38,13 +38,22 @@ use std::sync::{Arc, Mutex}; pub(crate) type DynStore = dyn KVStore + Sync + Send; +pub type Persister = MonitorUpdatingPersister< + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc, Arc, Arc, Arc, - Arc, + Arc, >; pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager<