diff --git a/lightning-payjoin/Cargo.toml b/lightning-payjoin/Cargo.toml index 4682443ec..7420d43f9 100644 --- a/lightning-payjoin/Cargo.toml +++ b/lightning-payjoin/Cargo.toml @@ -6,11 +6,12 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", "time", "sync" ] } +tokio = { version = "1", features = ["full"] } http-body-util = "0.1.0" hyper = {version = "1.2.0", features = ["http1", "server"]} bytes = "1.5.0" hyper-util = {version = "0.1.3", features = ["tokio"] } payjoin = { version = "0.13.0", features = ["receive", "send"] } bitcoin = "0.30.2" -async-trait = "0.1.79" +lightning = { git = "https://github.com/jbesraa/rust-lightning.git", rev = "d3e2d5a", features = ["std"] } +rand = "0.8.5" diff --git a/lightning-payjoin/src/lib.rs b/lightning-payjoin/src/lib.rs index 071ff06a8..214e2ed1c 100644 --- a/lightning-payjoin/src/lib.rs +++ b/lightning-payjoin/src/lib.rs @@ -1,5 +1,11 @@ -use async_trait::async_trait; -use http_body_util::Full; +pub mod scheduler; +pub use scheduler::FundingTxParams; + +use bitcoin::absolute::LockTime; +use bitcoin::psbt::Psbt; +use bitcoin::secp256k1::PublicKey; +use bitcoin::{base64, ScriptBuf}; +use http_body_util::{BodyExt, Full}; use hyper::body::Incoming; use hyper::header::HeaderValue; use hyper::server::conn::http1; @@ -12,31 +18,35 @@ use tokio::net::TcpStream; use tokio::sync::Mutex; use tokio::task::JoinError; -#[async_trait] -pub trait PayjoinLNReceiver { - fn convert_payjoin_request_to_funding_tx( - &self, request: Request, - ) -> impl std::future::Future>> + std::marker::Send; +pub trait PayjoinLNReceiver: Send + Sync + 'static + Clone { + fn is_mine(&self, script: &ScriptBuf) -> Result>; + fn notify_funding_generated( + &self, temporary_channel_id: [u8; 32], counterparty_node_id: PublicKey, + funding_tx: bitcoin::Transaction, + ) -> Result<(), Box>; } #[derive(Clone)] pub struct PayjoinService { receiver_handler: P, + scheduler: Arc>, } impl

PayjoinService

where P: PayjoinLNReceiver + Send + Sync + 'static + Clone, { - pub fn new(receiver_handler: P) -> Self { - Self { receiver_handler } + pub fn new(receiver_handler: P, scheduler: Arc>) -> Self { + Self { receiver_handler, scheduler } } + pub async fn serve_incoming_payjoin_requests( &self, stream: TcpStream, ) -> Result<(), JoinError> { let io = TokioIo::new(stream); - let payjoin_lightning = - Arc::new(Mutex::new(PayjoinService::new(self.receiver_handler.clone()))); + let receiver = self.receiver_handler.clone(); + let scheduler = self.scheduler.clone(); + let payjoin_lightning = Arc::new(Mutex::new(PayjoinService::new(receiver, scheduler))); tokio::task::spawn(async move { if let Err(err) = http1::Builder::new() .serve_connection( @@ -52,15 +62,69 @@ where }) .await } + async fn convert_payjoin_request_to_funding_tx( &self, request: Request, ) -> Result> { - self.receiver_handler.convert_payjoin_request_to_funding_tx(request).await + let is_output_mine = + |script: &ScriptBuf| self.receiver_handler.is_mine(script).map_err(|e| e.into()); + let (psbt, amount_to_us) = extract_psbt_from_http_request(request, is_output_mine).await?; + let channel = match self.scheduler.lock().await.get_next_channel(amount_to_us) { + Some(channel) => channel, + None => { + panic!("No channel available for payjoin"); + }, + }; + assert!(channel.is_channel_accepted()); + let locktime = match channel.locktime() { + Some(locktime) => locktime, + None => unreachable!(), + }; + let output_script = match channel.output_script() { + Some(output_script) => output_script, + None => unreachable!(), + }; + let temporary_channel_id = match channel.temporary_channel_id() { + Some(temporary_channel_id) => temporary_channel_id, + None => unreachable!(), + }; + let psbt = from_original_psbt_to_funding_psbt( + output_script, + channel.channel_value_satoshi(), + psbt, + locktime, + is_output_mine, + ); + let funding_tx = psbt.clone().extract_tx(); + self.scheduler + .lock() + .await + .mark_as_funding_tx_created(channel.user_channel_id(), funding_tx.clone()); + let counterparty_node_id = channel.node_id(); + let _ = self.receiver_handler.notify_funding_generated( + temporary_channel_id.0, + counterparty_node_id, + funding_tx.clone(), + )?; + let res = tokio::time::timeout(tokio::time::Duration::from_secs(3), async move { + let txid = funding_tx.clone().txid(); + loop { + if self.scheduler.lock().await.is_funding_tx_signed(txid) { + break; + } + } + }) + .await; + if res.is_err() { + panic!("Funding tx not signed"); + // broadcast original tx + } + Ok(psbt.to_string()) } + async fn http_router( http_request: Request, payjoin_lightning: Arc>>, ) -> Result>, hyper::Error> { - use self::utils::http_response; match (http_request.method(), http_request.uri().path()) { (&hyper::Method::POST, "/payjoin") => { let payjoin_lightning = payjoin_lightning.lock().await; @@ -75,65 +139,58 @@ where } } -pub mod utils { - use bitcoin::{absolute::LockTime, base64, psbt::Psbt, ScriptBuf}; - use http_body_util::{BodyExt, Full}; - use hyper::{body::Incoming, header::HeaderValue, HeaderMap}; - - pub async fn extract_psbt_from_http_request( - request: hyper::Request, - is_mine: impl Fn(&ScriptBuf) -> Result>, - ) -> Result<(Psbt, u64), Box> { - let headers = request.headers().clone(); - let body = request.into_body().collect().await?; - let body = String::from_utf8(body.to_bytes().to_vec()).unwrap(); - let psbt = body_to_psbt(headers.clone(), body.as_bytes()); - let amount_to_us = amount_directed_to_us_sat(psbt.clone(), is_mine); - Ok((psbt, amount_to_us)) - } - pub fn body_to_psbt(headers: HeaderMap, mut body: impl std::io::Read) -> Psbt { - let content_length = - headers.get("content-length").unwrap().to_str().unwrap().parse::().unwrap(); - let mut buf = vec![0; content_length as usize]; // 4_000_000 * 4 / 3 fits in u32 - body.read_exact(&mut buf).unwrap(); - let base64 = base64::decode(&buf).unwrap(); - let psbt = Psbt::deserialize(&base64).unwrap(); - psbt - } +pub async fn extract_psbt_from_http_request( + request: hyper::Request, + is_mine: impl Fn(&ScriptBuf) -> Result>, +) -> Result<(Psbt, u64), Box> { + let headers = request.headers().clone(); + let body = request.into_body().collect().await?; + let body = String::from_utf8(body.to_bytes().to_vec()).unwrap(); + let psbt = body_to_psbt(headers.clone(), body.as_bytes()); + let amount_to_us = amount_directed_to_us_sat(psbt.clone(), is_mine); + Ok((psbt, amount_to_us)) +} +pub fn body_to_psbt(headers: HeaderMap, mut body: impl std::io::Read) -> Psbt { + let content_length = + headers.get("content-length").unwrap().to_str().unwrap().parse::().unwrap(); + let mut buf = vec![0; content_length as usize]; // 4_000_000 * 4 / 3 fits in u32 + body.read_exact(&mut buf).unwrap(); + let base64 = base64::decode(&buf).unwrap(); + let psbt = Psbt::deserialize(&base64).unwrap(); + psbt +} - pub fn from_original_psbt_to_funding_psbt( - output_script: ScriptBuf, channel_value_sat: u64, mut psbt: Psbt, locktime: LockTime, - is_mine: impl Fn(&ScriptBuf) -> Result>, - ) -> Psbt { - let multisig_script = output_script; - psbt.unsigned_tx.lock_time = locktime; - psbt.unsigned_tx.output.push(bitcoin::TxOut { - value: channel_value_sat, - script_pubkey: multisig_script.clone(), - }); - psbt.unsigned_tx.output.retain(|output| { - let is_mine = is_mine(&output.script_pubkey).unwrap(); - !is_mine || output.script_pubkey == multisig_script - }); - let psbt = Psbt::from_unsigned_tx(psbt.unsigned_tx).unwrap(); - psbt - } +pub fn from_original_psbt_to_funding_psbt( + output_script: ScriptBuf, channel_value_sat: u64, mut psbt: Psbt, locktime: LockTime, + is_mine: impl Fn(&ScriptBuf) -> Result>, +) -> Psbt { + let multisig_script = output_script; + psbt.unsigned_tx.lock_time = locktime; + psbt.unsigned_tx + .output + .push(bitcoin::TxOut { value: channel_value_sat, script_pubkey: multisig_script.clone() }); + psbt.unsigned_tx.output.retain(|output| { + let is_mine = is_mine(&output.script_pubkey).unwrap(); + !is_mine || output.script_pubkey == multisig_script + }); + let psbt = Psbt::from_unsigned_tx(psbt.unsigned_tx).unwrap(); + psbt +} - fn amount_directed_to_us_sat( - psbt: Psbt, is_mine: impl Fn(&ScriptBuf) -> Result>, - ) -> u64 { - let mut ret = 0; - psbt.unsigned_tx.output.iter().for_each(|output| { - let is_mine = is_mine(&output.script_pubkey).unwrap(); - if is_mine { - ret += output.value; - } - }); - ret - } - pub fn http_response(s: String) -> Result>, hyper::Error> { - Ok(hyper::Response::builder().body(Full::new(bytes::Bytes::from(s))).unwrap()) - } +fn amount_directed_to_us_sat( + psbt: Psbt, is_mine: impl Fn(&ScriptBuf) -> Result>, +) -> u64 { + let mut ret = 0; + psbt.unsigned_tx.output.iter().for_each(|output| { + let is_mine = is_mine(&output.script_pubkey).unwrap(); + if is_mine { + ret += output.value; + } + }); + ret +} +pub fn http_response(s: String) -> Result>, hyper::Error> { + Ok(hyper::Response::builder().body(Full::new(bytes::Bytes::from(s))).unwrap()) } struct RequestHeaders(HashMap); diff --git a/src/channel_scheduler.rs b/lightning-payjoin/src/scheduler.rs similarity index 91% rename from src/channel_scheduler.rs rename to lightning-payjoin/src/scheduler.rs index 89b50e8a5..e214926cd 100644 --- a/src/channel_scheduler.rs +++ b/lightning-payjoin/src/scheduler.rs @@ -23,7 +23,7 @@ impl ChannelScheduler { address, ); match channel.state { - ScheduledChannelState::Created(_) => { + ScheduledChannelState::ChannelCreated(_) => { self.channels.push(channel); }, _ => {}, @@ -35,7 +35,7 @@ impl ChannelScheduler { for channel in &mut self.channels { if channel.user_channel_id() == user_channel_id { match channel.state.clone() { - ScheduledChannelState::Created(created) => { + ScheduledChannelState::ChannelCreated(created) => { channel.state = ScheduledChannelState::ChannelAccepted(created, funding_tx_params); }, @@ -80,16 +80,9 @@ impl ChannelScheduler { } } } - pub fn is_initialized_channel(&self, user_channel_id: u128) -> bool { + pub fn is_channel_created(&self, user_channel_id: u128) -> bool { if let Some(c) = self.internal_find_channel(user_channel_id) { - return c.is_channel_initialized(); - } - false - } - #[cfg(test)] - fn is_accepted_channel(&self, user_channel_id: u128) -> bool { - if let Some(c) = self.internal_find_channel(user_channel_id) { - return c.is_channel_accepted(); + return c.is_channel_created(); } false } @@ -128,6 +121,13 @@ impl ChannelScheduler { }); channel } + #[cfg(test)] + fn is_channel_accepted(&self, user_channel_id: u128) -> bool { + if let Some(c) = self.internal_find_channel(user_channel_id) { + return c.is_channel_accepted(); + } + false + } // TODO: remove channel from list? } @@ -148,7 +148,7 @@ impl FundingTxParams { #[derive(Clone, Debug)] pub enum ScheduledChannelState { - Created(ScheduledChannelData), + ChannelCreated(ScheduledChannelData), ChannelAccepted(ScheduledChannelData, FundingTxParams), FundingTxCreated(ScheduledChannelData, FundingTxParams, Transaction), FundingTxSigned(ScheduledChannelData, FundingTxParams, Transaction, Txid), @@ -184,12 +184,12 @@ impl ScheduledChannel { address, created_at: tokio::time::Instant::now().elapsed().as_secs(), }; - ScheduledChannel { state: ScheduledChannelState::Created(c) } + ScheduledChannel { state: ScheduledChannelState::ChannelCreated(c) } } - pub fn is_channel_initialized(&self) -> bool { + pub fn is_channel_created(&self) -> bool { match self.state { - ScheduledChannelState::Created(_) => true, + ScheduledChannelState::ChannelCreated(_) => true, _ => false, } } @@ -217,7 +217,7 @@ impl ScheduledChannel { pub fn channel_value_satoshi(&self) -> u64 { match self.state.clone() { - ScheduledChannelState::Created(c, ..) => c.channel_value_satoshi, + ScheduledChannelState::ChannelCreated(c, ..) => c.channel_value_satoshi, ScheduledChannelState::ChannelAccepted(c, ..) => c.channel_value_satoshi, ScheduledChannelState::FundingTxCreated(c, ..) => c.channel_value_satoshi, ScheduledChannelState::FundingTxSigned(c, ..) => c.channel_value_satoshi, @@ -226,7 +226,7 @@ impl ScheduledChannel { pub fn user_channel_id(&self) -> u128 { match self.state.clone() { - ScheduledChannelState::Created(c, ..) => c.user_channel_id, + ScheduledChannelState::ChannelCreated(c, ..) => c.user_channel_id, ScheduledChannelState::ChannelAccepted(c, ..) => c.user_channel_id, ScheduledChannelState::FundingTxCreated(c, ..) => c.user_channel_id, ScheduledChannelState::FundingTxSigned(c, ..) => c.user_channel_id, @@ -235,7 +235,7 @@ impl ScheduledChannel { pub fn node_id(&self) -> PublicKey { match self.state.clone() { - ScheduledChannelState::Created(c, ..) => c.node_id, + ScheduledChannelState::ChannelCreated(c, ..) => c.node_id, ScheduledChannelState::ChannelAccepted(c, ..) => c.node_id, ScheduledChannelState::FundingTxCreated(c, ..) => c.node_id, ScheduledChannelState::FundingTxSigned(c, ..) => c.node_id, @@ -244,7 +244,7 @@ impl ScheduledChannel { pub fn announce_channel(&self) -> bool { match self.state.clone() { - ScheduledChannelState::Created(c, ..) => c.announce_channel, + ScheduledChannelState::ChannelCreated(c, ..) => c.announce_channel, ScheduledChannelState::ChannelAccepted(c, ..) => c.announce_channel, ScheduledChannelState::FundingTxCreated(c, ..) => c.announce_channel, ScheduledChannelState::FundingTxSigned(c, ..) => c.announce_channel, @@ -253,7 +253,7 @@ impl ScheduledChannel { pub fn address(&self) -> SocketAddress { match self.state.clone() { - ScheduledChannelState::Created(c, ..) => c.address.clone(), + ScheduledChannelState::ChannelCreated(c, ..) => c.address.clone(), ScheduledChannelState::ChannelAccepted(c, ..) => c.address.clone(), ScheduledChannelState::FundingTxCreated(c, ..) => c.address.clone(), ScheduledChannelState::FundingTxSigned(c, ..) => c.address.clone(), @@ -262,7 +262,7 @@ impl ScheduledChannel { pub fn push_msat(&self) -> Option { match self.state.clone() { - ScheduledChannelState::Created(c, ..) => c.push_msat, + ScheduledChannelState::ChannelCreated(c, ..) => c.push_msat, ScheduledChannelState::ChannelAccepted(c, ..) => c.push_msat, ScheduledChannelState::FundingTxCreated(c, ..) => c.push_msat, ScheduledChannelState::FundingTxSigned(c, ..) => c.push_msat, @@ -324,7 +324,7 @@ impl ScheduledChannel { pub fn created_at(&self) -> u64 { match self.state.clone() { - ScheduledChannelState::Created(c, ..) => c.created_at, + ScheduledChannelState::ChannelCreated(c, ..) => c.created_at, ScheduledChannelState::ChannelAccepted(c, ..) => c.created_at, ScheduledChannelState::FundingTxCreated(c, ..) => c.created_at, ScheduledChannelState::FundingTxSigned(c, ..) => c.created_at, @@ -373,7 +373,7 @@ mod tests { test_address.unwrap(), ); assert_eq!(channel_scheduler.channels.len(), 1); - assert_eq!(channel_scheduler.is_initialized_channel(user_channel_id), true); + assert_eq!(channel_scheduler.is_channel_created(user_channel_id), true); channel_scheduler.mark_as_channel_accepted( user_channel_id, FundingTxParams::new( @@ -382,7 +382,7 @@ mod tests { ChannelId::new_zero(), ), ); - assert_eq!(channel_scheduler.is_accepted_channel(user_channel_id), true); + assert_eq!(channel_scheduler.is_channel_accepted(user_channel_id), true); let str_psbt = "cHNidP8BAHMCAAAAAY8nutGgJdyYGXWiBEb45Hoe9lWGbkxh/6bNiOJdCDuDAAAAAAD+////AtyVuAUAAAAAF6kUHehJ8GnSdBUOOv6ujXLrWmsJRDCHgIQeAAAAAAAXqRR3QJbbz0hnQ8IvQ0fptGn+votneofTAAAAAAEBIKgb1wUAAAAAF6kU3k4ekGHKWRNbA1rV5tR5kEVDVNCHAQcXFgAUx4pFclNVgo1WWAdN1SYNX8tphTABCGsCRzBEAiB8Q+A6dep+Rz92vhy26lT0AjZn4PRLi8Bf9qoB/CMk0wIgP/Rj2PWZ3gEjUkTlhDRNAQ0gXwTO7t9n+V14pZ6oljUBIQMVmsAaoNWHVMS02LfTSe0e388LNitPa1UQZyOihY+FFgABABYAFEb2Giu6c4KO5YW0pfw3lGp9jMUUAAA="; let mock_transaction = Psbt::from_str(str_psbt).unwrap().unsigned_tx; channel_scheduler.mark_as_funding_tx_created(user_channel_id, mock_transaction.clone()); diff --git a/src/builder.rs b/src/builder.rs index a252f3958..2f21e0395 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,4 +1,3 @@ -use crate::channel_scheduler::ChannelScheduler; use crate::config::{ Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, DEFAULT_ESPLORA_SERVER_URL, WALLET_KEYS_SEED_LEN, @@ -41,6 +40,7 @@ use lightning::util::persist::{ }; use lightning::util::ser::ReadableArgs; +use lightning_payjoin::scheduler::ChannelScheduler; use lightning_persister::fs_store::FilesystemStore; use lightning_transaction_sync::EsploraSyncClient; @@ -951,12 +951,10 @@ fn build_with_store_internal( }; let (stop_sender, _) = tokio::sync::watch::channel(()); - let payjoin_channels_handler = PayjoinChannelManager::new( - Arc::clone(&wallet), - Arc::clone(&channel_manager), - Arc::clone(&channel_scheduler), - ); - let payjoin = Arc::new(LDKPayjoin::new(payjoin_channels_handler)); + let payjoin_channels_handler = + PayjoinChannelManager::new(Arc::clone(&wallet), Arc::clone(&channel_manager)); + let payjoin = + Arc::new(LDKPayjoin::new(payjoin_channels_handler, Arc::clone(&channel_scheduler))); let is_listening = Arc::new(AtomicBool::new(false)); let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None)); diff --git a/src/event.rs b/src/event.rs index 084fa8ed7..0e446ca3f 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,4 +1,3 @@ -use crate::channel_scheduler::{ChannelScheduler, FundingTxParams}; use crate::types::{Sweeper, Wallet}; use crate::{ hex_utils, ChannelManager, Config, Error, NetworkGraph, PeerInfo, PeerStore, UserChannelId, @@ -30,6 +29,8 @@ use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; use bitcoin::OutPoint; +use lightning_payjoin::scheduler::ChannelScheduler; +use lightning_payjoin::FundingTxParams; use rand::{thread_rng, Rng}; use core::future::Future; @@ -356,7 +357,7 @@ where let locktime = LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO); let mut channel_scheduler = self.channel_scheduler.lock().await; - if channel_scheduler.is_initialized_channel(user_channel_id) { + if channel_scheduler.is_channel_created(user_channel_id) { let funding_tx_params = FundingTxParams::new(output_script.clone(), locktime, temporary_channel_id); channel_scheduler.mark_as_channel_accepted(user_channel_id, funding_tx_params); diff --git a/src/lib.rs b/src/lib.rs index 76ed5858a..af2588b9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,7 +96,6 @@ mod types; mod uniffi_types; mod wallet; -use crate::channel_scheduler::{ChannelScheduler, ScheduledChannel}; use crate::payjoin_handler::LDKPayjoin; pub use bitcoin; pub use lightning; @@ -108,8 +107,8 @@ pub use error::Error as NodeError; use error::Error; pub use event::Event; +use lightning_payjoin::scheduler::{ChannelScheduler, ScheduledChannel}; pub use types::{BestBlock, ChannelConfig}; -mod channel_scheduler; mod payjoin_handler; pub use io::utils::generate_entropy_mnemonic; diff --git a/src/payjoin_handler.rs b/src/payjoin_handler.rs index 9be182714..286473b46 100644 --- a/src/payjoin_handler.rs +++ b/src/payjoin_handler.rs @@ -1,16 +1,15 @@ -use crate::channel_scheduler::ChannelScheduler; use crate::types::{ChannelManager, Wallet}; +use bitcoin::secp256k1::PublicKey; use bitcoin::ScriptBuf; -use hyper::body::Incoming; -use hyper::Request; +use lightning::ln::ChannelId; use lightning::util::persist::KVStore; +use lightning_payjoin::scheduler::ChannelScheduler; use lightning_payjoin::{PayjoinLNReceiver, PayjoinService}; use std::sync::Arc; use tokio::sync::Mutex; -pub struct PayjoinChannelManager { +pub(crate) struct PayjoinChannelManager { channel_manager: Arc>, - channel_scheduler: Arc>, wallet: Arc, } @@ -19,106 +18,50 @@ where K: KVStore + Sync + Send + 'static, { fn clone(&self) -> Self { - Self { - channel_manager: self.channel_manager.clone(), - wallet: self.wallet.clone(), - channel_scheduler: self.channel_scheduler.clone(), - } + Self { channel_manager: self.channel_manager.clone(), wallet: self.wallet.clone() } } } impl PayjoinChannelManager { - pub fn new( - wallet: Arc, channel_manager: Arc>, - channel_scheduler: Arc>, - ) -> Self { - Self { wallet, channel_manager, channel_scheduler } + pub(crate) fn new(wallet: Arc, channel_manager: Arc>) -> Self { + Self { wallet, channel_manager } } } impl PayjoinLNReceiver for PayjoinChannelManager { - async fn convert_payjoin_request_to_funding_tx( - &self, request: Request, - ) -> Result> { - let is_output_mine = |script: &ScriptBuf| self.wallet.is_mine(script).map_err(|e| e.into()); - let (psbt, amount_to_us) = - lightning_payjoin::utils::extract_psbt_from_http_request(request, is_output_mine) - .await?; - let channel = match self.channel_scheduler.lock().await.get_next_channel(amount_to_us) { - Some(channel) => channel, - None => { - // conduct regular payjoin, - panic!("No channel available for payjoin"); - }, - }; - assert!(channel.is_channel_accepted()); - let locktime = match channel.locktime() { - Some(locktime) => locktime, - // get_next_channel should return only accepted channel which - // should have locktime - None => unreachable!(), - }; - let output_script = match channel.output_script() { - Some(output_script) => output_script, - // get_next_channel should return only accepted channel which - // should have output_script - None => unreachable!(), - }; - let temporary_channel_id = match channel.temporary_channel_id() { - Some(temporary_channel_id) => temporary_channel_id, - // get_next_channel should return only accepted channel which - // should have temporary_channel_id - None => unreachable!(), - }; - let psbt = lightning_payjoin::utils::from_original_psbt_to_funding_psbt( - output_script, - channel.channel_value_satoshi(), - psbt, - locktime, - is_output_mine, - ); - let funding_tx = psbt.clone().extract_tx(); - self.channel_scheduler - .lock() - .await - .mark_as_funding_tx_created(channel.user_channel_id(), funding_tx.clone()); - let counterparty_node_id = channel.node_id(); - let _ = self - .channel_manager + fn is_mine(&self, script: &ScriptBuf) -> Result> { + self.wallet.is_mine(script).map_err(|e| e.into()) + } + + fn notify_funding_generated( + &self, temporary_channel_id: [u8; 32], counterparty_node_id: PublicKey, + funding_tx: bitcoin::Transaction, + ) -> Result<(), Box> { + self.channel_manager .funding_transaction_generated( - &temporary_channel_id, + &ChannelId(temporary_channel_id), &counterparty_node_id, funding_tx.clone(), ) - .unwrap(); - let res = tokio::time::timeout(tokio::time::Duration::from_secs(3), async move { - let txid = funding_tx.clone().txid(); - loop { - if self.channel_scheduler.lock().await.is_funding_tx_signed(txid) { - break; - } - } - }) - .await; - if res.is_err() { - panic!("Funding tx not signed"); - // broadcast original tx - } - Ok(psbt.to_string()) + .map_err(|_e| "Error notifying funding generated".into()) } } -pub struct LDKPayjoin { +pub(crate) struct LDKPayjoin { inner: Arc>>>, } impl LDKPayjoin { - pub fn new(handler: PayjoinChannelManager) -> Self { - let handler = PayjoinService::new(handler); + pub(crate) fn new( + handler: PayjoinChannelManager, scheduler: Arc>, + ) -> Self { + let handler = PayjoinService::new(handler, scheduler); Self { inner: Arc::new(Mutex::new(handler)) } } - pub async fn serve(&self, stream: tokio::net::TcpStream) -> Result<(), tokio::task::JoinError> { + pub(crate) async fn serve( + &self, stream: tokio::net::TcpStream, + ) -> Result<(), tokio::task::JoinError> { self.inner.lock().await.serve_incoming_payjoin_requests(stream).await } } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index b001cd9a0..55baae65f 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -1,4 +1,3 @@ -use crate::channel_scheduler::ChannelScheduler; use crate::logger::{log_bytes, log_debug, log_error, log_trace, Logger}; use lightning::chain::chaininterface::BroadcasterInterface; @@ -8,6 +7,7 @@ use esplora_client::AsyncClient as EsploraClient; use bitcoin::Transaction; +use lightning_payjoin::scheduler::ChannelScheduler; use tokio::sync::mpsc; use tokio::sync::Mutex; diff --git a/tests/integration_tests_payjoin.rs b/tests/integration_tests_payjoin.rs index d72bbd5a9..dcaa0e26d 100644 --- a/tests/integration_tests_payjoin.rs +++ b/tests/integration_tests_payjoin.rs @@ -20,7 +20,7 @@ mod mock_payjoin_sender { use bitcoincore_rpc::bitcoin::psbt::Psbt; use payjoin::send::RequestBuilder; - pub fn try_payjoin( + pub fn send_payjoin_transaction( sender_wallet: &BitcoindClient, pj_uri: payjoin::Uri<'static, NetworkChecked>, ) -> (String, Txid) { // Step 1. Extract the parameters from the payjoin URI @@ -127,7 +127,7 @@ fn payjoin() { // sleep for 1 seconds std::thread::sleep(std::time::Duration::from_secs(1)); - let (_, txid) = mock_payjoin_sender::try_payjoin( + let (_, txid) = mock_payjoin_sender::send_payjoin_transaction( &payjoin_sender_wallet, payjoin::Uri::try_from(pj_uri.unwrap()).unwrap().assume_checked(), );