Skip to content

Commit

Permalink
Drop KVStore generic from Node
Browse files Browse the repository at this point in the history
.. switching to `dyn KVStore + Send + Sync`
  • Loading branch information
tnull committed Mar 28, 2024
1 parent b4ef265 commit fc8f501
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 152 deletions.
42 changes: 21 additions & 21 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::peer_store::PeerStore;
use crate::sweep::OutputSweeper;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetworkGraph,
OnionMessenger, PeerManager,
ChainMonitor, ChannelManager, DynStore, FakeMessageRouter, GossipSync, KeysManager,
NetworkGraph, OnionMessenger, PeerManager,
};
use crate::wallet::Wallet;
use crate::{LogLevel, Node};
Expand All @@ -33,7 +33,7 @@ use lightning::sign::EntropySource;

use lightning::util::config::UserConfig;
use lightning::util::persist::{
read_channel_monitors, KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY,
read_channel_monitors, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::ReadableArgs;
Expand Down Expand Up @@ -115,12 +115,18 @@ pub enum BuildError {
/// The given listening addresses are invalid, e.g. too many were passed.
InvalidListeningAddresses,
/// We failed to read data from the [`KVStore`].
///
/// [`KVStore`]: lightning::util::persist::KVStore
ReadFailed,
/// We failed to write data to the [`KVStore`].
///
/// [`KVStore`]: lightning::util::persist::KVStore
WriteFailed,
/// We failed to access the given `storage_dir_path`.
StoragePathAccessFailed,
/// We failed to setup our [`KVStore`].
///
/// [`KVStore`]: lightning::util::persist::KVStore
KVStoreSetupFailed,
/// We failed to setup the onchain wallet.
WalletSetupFailed,
Expand Down Expand Up @@ -299,7 +305,7 @@ impl NodeBuilder {

/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
/// previously configured.
pub fn build(&self) -> Result<Node<SqliteStore>, BuildError> {
pub fn build(&self) -> Result<Node, BuildError> {
let storage_dir_path = self.config.storage_dir_path.clone();
fs::create_dir_all(storage_dir_path.clone())
.map_err(|_| BuildError::StoragePathAccessFailed)?;
Expand All @@ -316,7 +322,7 @@ impl NodeBuilder {

/// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options
/// previously configured.
pub fn build_with_fs_store(&self) -> Result<Node<FilesystemStore>, BuildError> {
pub fn build_with_fs_store(&self) -> Result<Node, BuildError> {
let mut storage_dir_path: PathBuf = self.config.storage_dir_path.clone().into();
storage_dir_path.push("fs_store");

Expand All @@ -329,9 +335,7 @@ impl NodeBuilder {
/// Builds a [`Node`] instance with a [`VssStore`] backend and according to the options
/// previously configured.
#[cfg(any(vss, vss_test))]
pub fn build_with_vss_store(
&self, url: String, store_id: String,
) -> Result<Node<VssStore>, BuildError> {
pub fn build_with_vss_store(&self, url: String, store_id: String) -> Result<Node, BuildError> {
let logger = setup_logger(&self.config)?;

let seed_bytes = seed_bytes_from_config(
Expand Down Expand Up @@ -369,9 +373,7 @@ impl NodeBuilder {
}

/// Builds a [`Node`] instance according to the options previously configured.
pub fn build_with_store<K: KVStore + Sync + Send + 'static>(
&self, kv_store: Arc<K>,
) -> Result<Node<K>, BuildError> {
pub fn build_with_store(&self, kv_store: Arc<DynStore>) -> Result<Node, BuildError> {
let logger = setup_logger(&self.config)?;
let seed_bytes = seed_bytes_from_config(
&self.config,
Expand Down Expand Up @@ -500,31 +502,29 @@ impl ArcedNodeBuilder {

/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
/// previously configured.
pub fn build(&self) -> Result<Arc<Node<SqliteStore>>, BuildError> {
pub fn build(&self) -> Result<Arc<Node>, BuildError> {
self.inner.read().unwrap().build().map(Arc::new)
}

/// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options
/// previously configured.
pub fn build_with_fs_store(&self) -> Result<Arc<Node<FilesystemStore>>, BuildError> {
pub fn build_with_fs_store(&self) -> Result<Arc<Node>, BuildError> {
self.inner.read().unwrap().build_with_fs_store().map(Arc::new)
}

/// Builds a [`Node`] instance according to the options previously configured.
pub fn build_with_store<K: KVStore + Sync + Send + 'static>(
&self, kv_store: Arc<K>,
) -> Result<Arc<Node<K>>, BuildError> {
pub fn build_with_store(&self, kv_store: Arc<DynStore>) -> Result<Arc<Node>, BuildError> {
self.inner.read().unwrap().build_with_store(kv_store).map(Arc::new)
}
}

/// Builds a [`Node`] instance according to the options previously configured.
fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
fn build_with_store_internal(
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
gossip_source_config: Option<&GossipSourceConfig>,
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
logger: Arc<FilesystemLogger>, kv_store: Arc<K>,
) -> Result<Node<K>, BuildError> {
logger: Arc<FilesystemLogger>, kv_store: Arc<DynStore>,
) -> Result<Node, BuildError> {
// Initialize the on-chain wallet and chain access
let xprv = bitcoin::bip32::ExtendedPrivKey::new_master(config.network.into(), &seed_bytes)
.map_err(|e| {
Expand Down Expand Up @@ -604,7 +604,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
));

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor<K>> = Arc::new(chainmonitor::ChainMonitor::new(
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&tx_sync)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Expand Down Expand Up @@ -735,7 +735,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
channel_monitor_references,
);
let (_hash, channel_manager) =
<(BlockHash, ChannelManager<K>)>::read(&mut reader, read_args).map_err(|e| {
<(BlockHash, ChannelManager)>::read(&mut reader, read_args).map_err(|e| {
log_error!(logger, "Failed to read channel manager from KVStore: {}", e);
BuildError::ReadFailed
})?;
Expand Down
42 changes: 20 additions & 22 deletions src/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::types::{Sweeper, Wallet};
use crate::types::{DynStore, Sweeper, Wallet};
use crate::{
hex_utils, ChannelManager, Config, Error, NetworkGraph, PeerInfo, PeerStore, UserChannelId,
};
Expand All @@ -20,7 +20,6 @@ use lightning::impl_writeable_tlv_based_enum;
use lightning::ln::{ChannelId, PaymentHash};
use lightning::routing::gossip::NodeId;
use lightning::util::errors::APIError;
use lightning::util::persist::KVStore;
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};

use lightning_liquidity::lsps2::utils::compute_opening_fee;
Expand Down Expand Up @@ -138,22 +137,22 @@ impl_writeable_tlv_based_enum!(Event,
};
);

pub struct EventQueue<K: KVStore + Sync + Send, L: Deref>
pub struct EventQueue<L: Deref>
where
L::Target: Logger,
{
queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
notifier: Condvar,
kv_store: Arc<K>,
kv_store: Arc<DynStore>,
logger: L,
}

impl<K: KVStore + Sync + Send, L: Deref> EventQueue<K, L>
impl<L: Deref> EventQueue<L>
where
L::Target: Logger,
{
pub(crate) fn new(kv_store: Arc<K>, logger: L) -> Self {
pub(crate) fn new(kv_store: Arc<DynStore>, logger: L) -> Self {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let waker = Arc::new(Mutex::new(None));
let notifier = Condvar::new();
Expand Down Expand Up @@ -228,13 +227,13 @@ where
}
}

impl<K: KVStore + Sync + Send, L: Deref> ReadableArgs<(Arc<K>, L)> for EventQueue<K, L>
impl<L: Deref> ReadableArgs<(Arc<DynStore>, L)> for EventQueue<L>
where
L::Target: Logger,
{
#[inline]
fn read<R: lightning::io::Read>(
reader: &mut R, args: (Arc<K>, L),
reader: &mut R, args: (Arc<DynStore>, L),
) -> Result<Self, lightning::ln::msgs::DecodeError> {
let (kv_store, logger) = args;
let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
Expand Down Expand Up @@ -292,32 +291,31 @@ impl Future for EventFuture {
}
}

pub(crate) struct EventHandler<K: KVStore + Sync + Send, L: Deref>
pub(crate) struct EventHandler<L: Deref>
where
L::Target: Logger,
{
event_queue: Arc<EventQueue<K, L>>,
event_queue: Arc<EventQueue<L>>,
wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager<K>>,
output_sweeper: Arc<Sweeper<K>>,
channel_manager: Arc<ChannelManager>,
output_sweeper: Arc<Sweeper>,
network_graph: Arc<NetworkGraph>,
payment_store: Arc<PaymentStore<K, L>>,
peer_store: Arc<PeerStore<K, L>>,
payment_store: Arc<PaymentStore<L>>,
peer_store: Arc<PeerStore<L>>,
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
logger: L,
config: Arc<Config>,
}

impl<K: KVStore + Sync + Send + 'static, L: Deref> EventHandler<K, L>
impl<L: Deref> EventHandler<L>
where
L::Target: Logger,
{
pub fn new(
event_queue: Arc<EventQueue<K, L>>, wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager<K>>, output_sweeper: Arc<Sweeper<K>>,
network_graph: Arc<NetworkGraph>, payment_store: Arc<PaymentStore<K, L>>,
peer_store: Arc<PeerStore<K, L>>, runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
logger: L, config: Arc<Config>,
event_queue: Arc<EventQueue<L>>, wallet: Arc<Wallet>, channel_manager: Arc<ChannelManager>,
output_sweeper: Arc<Sweeper>, network_graph: Arc<NetworkGraph>,
payment_store: Arc<PaymentStore<L>>, peer_store: Arc<PeerStore<L>>,
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>, logger: L, config: Arc<Config>,
) -> Self {
Self {
event_queue,
Expand Down Expand Up @@ -891,7 +889,7 @@ mod tests {

#[tokio::test]
async fn event_queue_persistence() {
let store = Arc::new(TestStore::new(false));
let store: Arc<DynStore> = Arc::new(TestStore::new(false));
let logger = Arc::new(TestLogger::new());
let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
assert_eq!(event_queue.next_event(), None);
Expand Down Expand Up @@ -928,7 +926,7 @@ mod tests {

#[tokio::test]
async fn event_queue_concurrency() {
let store = Arc::new(TestStore::new(false));
let store: Arc<DynStore> = Arc::new(TestStore::new(false));
let logger = Arc::new(TestLogger::new());
let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
assert_eq!(event_queue.next_event(), None);
Expand Down
57 changes: 27 additions & 30 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ use crate::config::WALLET_KEYS_SEED_LEN;
use crate::logger::log_error;
use crate::peer_store::PeerStore;
use crate::sweep::SpendableOutputInfo;
use crate::types::DynStore;
use crate::{Error, EventQueue, PaymentDetails};

use lightning::routing::gossip::NetworkGraph;
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters};
use lightning::util::logger::Logger;
use lightning::util::persist::{
KVStore, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN,
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::{Readable, ReadableArgs, Writeable};
use lightning::util::string::PrintableString;
Expand Down Expand Up @@ -93,8 +94,8 @@ where
}

/// Read a previously persisted [`NetworkGraph`] from the store.
pub(crate) fn read_network_graph<K: KVStore + Sync + Send, L: Deref + Clone>(
kv_store: Arc<K>, logger: L,
pub(crate) fn read_network_graph<L: Deref + Clone>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<NetworkGraph<L>, std::io::Error>
where
L::Target: Logger,
Expand All @@ -111,12 +112,8 @@ where
}

/// Read a previously persisted [`ProbabilisticScorer`] from the store.
pub(crate) fn read_scorer<
K: KVStore + Send + Sync,
G: Deref<Target = NetworkGraph<L>>,
L: Deref + Clone,
>(
kv_store: Arc<K>, network_graph: G, logger: L,
pub(crate) fn read_scorer<G: Deref<Target = NetworkGraph<L>>, L: Deref + Clone>(
kv_store: Arc<DynStore>, network_graph: G, logger: L,
) -> Result<ProbabilisticScorer<G, L>, std::io::Error>
where
L::Target: Logger,
Expand All @@ -135,9 +132,9 @@ where
}

/// Read previously persisted events from the store.
pub(crate) fn read_event_queue<K: KVStore + Sync + Send, L: Deref + Clone>(
kv_store: Arc<K>, logger: L,
) -> Result<EventQueue<K, L>, std::io::Error>
pub(crate) fn read_event_queue<L: Deref + Clone>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<EventQueue<L>, std::io::Error>
where
L::Target: Logger,
{
Expand All @@ -153,9 +150,9 @@ where
}

/// Read previously persisted peer info from the store.
pub(crate) fn read_peer_info<K: KVStore + Sync + Send, L: Deref + Clone>(
kv_store: Arc<K>, logger: L,
) -> Result<PeerStore<K, L>, std::io::Error>
pub(crate) fn read_peer_info<L: Deref + Clone>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<PeerStore<L>, std::io::Error>
where
L::Target: Logger,
{
Expand All @@ -171,8 +168,8 @@ where
}

/// Read previously persisted payments information from the store.
pub(crate) fn read_payments<K: KVStore + Sync + Send, L: Deref>(
kv_store: Arc<K>, logger: L,
pub(crate) fn read_payments<L: Deref>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<Vec<PaymentDetails>, std::io::Error>
where
L::Target: Logger,
Expand Down Expand Up @@ -201,8 +198,8 @@ where
}

/// Read previously persisted spendable output information from the store.
pub(crate) fn read_spendable_outputs<K: KVStore + Sync + Send, L: Deref>(
kv_store: Arc<K>, logger: L,
pub(crate) fn read_spendable_outputs<L: Deref>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<Vec<SpendableOutputInfo>, std::io::Error>
where
L::Target: Logger,
Expand Down Expand Up @@ -230,8 +227,8 @@ where
Ok(res)
}

pub(crate) fn read_latest_rgs_sync_timestamp<K: KVStore + Sync + Send, L: Deref>(
kv_store: Arc<K>, logger: L,
pub(crate) fn read_latest_rgs_sync_timestamp<L: Deref>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<u32, std::io::Error>
where
L::Target: Logger,
Expand All @@ -250,8 +247,8 @@ where
})
}

pub(crate) fn write_latest_rgs_sync_timestamp<K: KVStore + Sync + Send, L: Deref>(
updated_timestamp: u32, kv_store: Arc<K>, logger: L,
pub(crate) fn write_latest_rgs_sync_timestamp<L: Deref>(
updated_timestamp: u32, kv_store: Arc<DynStore>, logger: L,
) -> Result<(), Error>
where
L::Target: Logger,
Expand All @@ -277,8 +274,8 @@ where
})
}

pub(crate) fn read_latest_node_ann_bcast_timestamp<K: KVStore + Sync + Send, L: Deref>(
kv_store: Arc<K>, logger: L,
pub(crate) fn read_latest_node_ann_bcast_timestamp<L: Deref>(
kv_store: Arc<DynStore>, logger: L,
) -> Result<u64, std::io::Error>
where
L::Target: Logger,
Expand All @@ -301,8 +298,8 @@ where
})
}

pub(crate) fn write_latest_node_ann_bcast_timestamp<K: KVStore + Sync + Send, L: Deref>(
updated_timestamp: u64, kv_store: Arc<K>, logger: L,
pub(crate) fn write_latest_node_ann_bcast_timestamp<L: Deref>(
updated_timestamp: u64, kv_store: Arc<DynStore>, logger: L,
) -> Result<(), Error>
where
L::Target: Logger,
Expand Down
Loading

0 comments on commit fc8f501

Please sign in to comment.