From 0c214d945fa51f66de2f786925dc5a119ac211e0 Mon Sep 17 00:00:00 2001 From: jbesraa Date: Wed, 21 Feb 2024 19:28:13 +0200 Subject: [PATCH] f Add `PayjoinScheduler` and `PayjoinServer` --- Cargo.toml | 10 ++- src/builder.rs | 11 +++ src/config.rs | 6 ++ src/lib.rs | 62 +++++++++++++- src/pj.rs | 126 +++++++++++++++++++++++++++++ src/wallet.rs | 12 +++ tests/common.rs | 1 + tests/integration_tests_payjoin.rs | 124 ++++++++++++++++++++++++++++ 8 files changed, 348 insertions(+), 4 deletions(-) create mode 100644 src/pj.rs create mode 100644 tests/integration_tests_payjoin.rs diff --git a/Cargo.toml b/Cargo.toml index 7e4a3b348..8b6251a11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ lightning-liquidity = { version = "0.1.0-alpha.1", features = ["std"] } bdk = { version = "0.29.0", default-features = false, features = ["std", "async-interface", "use-esplora-async", "sqlite-bundled", "keys-bip39"]} -reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "blocking"] } rusqlite = { version = "0.28.0", features = ["bundled"] } bitcoin = "0.30.2" bip39 = "2.0.0" @@ -69,6 +69,14 @@ tokio = { version = "1", default-features = false, features = [ "rt-multi-thread esplora-client = { version = "0.6", default-features = false } libc = "0.2" uniffi = { version = "0.26.0", features = ["build"], optional = true } +payjoin = { version = "0.13.0", features = ["receive", "send"] } +http-body-util = "0.1.0" +bitcoincore-rpc = "0.17.0" +ureq = "2.9.1" +hyper = {version = "1.2.0", features = ["http1", "server"]} +rustls = "0.21.9" +bytes = "1.5.0" +hyper-util = {version = "0.1.3", features = ["tokio"] } [target.'cfg(vss)'.dependencies] vss-client = "0.2" diff --git a/src/builder.rs b/src/builder.rs index cc0c9adeb..d958e2baa 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -12,6 +12,7 @@ use crate::logger::{log_error, FilesystemLogger, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment_store::PaymentStore; use crate::peer_store::PeerStore; +use crate::pj::{PayjoinExecuter, PayjoinScheduler}; use crate::sweep::OutputSweeper; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ @@ -943,6 +944,15 @@ fn build_with_store_internal( }, }; + let payjoin_executer = Arc::new(PayjoinExecuter::new( + Arc::clone(&wallet), + Arc::clone(&logger), + Arc::clone(&peer_manager), + Arc::clone(&channel_manager), + )); + + let payjoin_scheduler = Arc::new(PayjoinScheduler::new(Arc::clone(&payjoin_executer))); + let (stop_sender, stop_receiver) = tokio::sync::watch::channel(()); Ok(Node { @@ -958,6 +968,7 @@ fn build_with_store_internal( channel_manager, chain_monitor, output_sweeper, + payjoin_scheduler, peer_manager, keys_manager, network_graph, diff --git a/src/config.rs b/src/config.rs index 945d712c9..1e33a3615 100644 --- a/src/config.rs +++ b/src/config.rs @@ -44,6 +44,9 @@ pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10; // The length in bytes of our wallets' keys seed. pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64; +// Port used by the Payjoin HTTP server. +pub(crate) const DEFAULT_PAYJOIN_HTTP_SERVER_PORT: u16 = 3227; + #[derive(Debug, Clone)] /// Represents the configuration of an [`Node`] instance. /// @@ -104,6 +107,8 @@ pub struct Config { /// /// Any messages below this level will be excluded from the logs. pub log_level: LogLevel, + /// Payjoin server port + pub payjoin_server_port: u16, } impl Default for Config { @@ -120,6 +125,7 @@ impl Default for Config { trusted_peers_0conf: Vec::new(), probing_liquidity_limit_multiplier: DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER, log_level: DEFAULT_LOG_LEVEL, + payjoin_server_port: DEFAULT_PAYJOIN_HTTP_SERVER_PORT, } } } diff --git a/src/lib.rs b/src/lib.rs index d0b6e9993..0bcfb0e1b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,6 +89,7 @@ mod logger; mod message_handler; mod payment_store; mod peer_store; +mod pj; mod sweep; mod tx_broadcaster; mod types; @@ -107,6 +108,8 @@ pub use error::Error as NodeError; use error::Error; pub use event::Event; +use payjoin::Uri; +use pj::{PayjoinScheduler, PayjoinServer}; pub use types::ChannelConfig; pub use io::utils::generate_entropy_mnemonic; @@ -157,18 +160,20 @@ use lightning_transaction_sync::EsploraSyncClient; use lightning::routing::router::{PaymentParameters, RouteParameters}; use lightning_invoice::{payment, Bolt11Invoice, Currency}; -use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; use bitcoin::secp256k1::PublicKey; +use bitcoin::{hashes::sha256::Hash as Sha256, Amount}; use bitcoin::{Address, Txid}; use rand::Rng; -use std::default::Default; -use std::net::ToSocketAddrs; +use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime}; +use std::{default::Default, str::FromStr}; + +use crate::pj::ScheduledChannel; #[cfg(feature = "uniffi")] uniffi::include_scaffolding!("ldk_node"); @@ -189,6 +194,7 @@ pub struct Node { channel_manager: Arc>, chain_monitor: Arc>, output_sweeper: Arc>, + payjoin_scheduler: Arc>, peer_manager: Arc>, keys_manager: Arc, network_graph: Arc, @@ -462,6 +468,30 @@ impl Node { }); } + let pj_scheduler = Arc::clone(&self.payjoin_scheduler); + let pj_server = PayjoinServer::new(pj_scheduler); + let mut stop_payjoin_server = self.stop_receiver.clone(); + let pj_port = self.config.payjoin_server_port; + runtime.spawn(async move { + let addr = SocketAddr::from(([0, 0, 0, 0], pj_port)); + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); + loop { + let (stream, _) = match listener.accept().await { + Ok(res) => res, + Err(e) => { + println!("Failed to accept incoming payjoin connection: {}", e); + continue; + }, + }; + tokio::select! { + _ = stop_payjoin_server.changed() => { + return; + } + _ = pj_server.serve(stream) => {} + } + } + }); + // Regularly reconnect to channel peers. let connect_cm = Arc::clone(&self.channel_manager); let connect_pm = Arc::clone(&self.peer_manager); @@ -675,6 +705,32 @@ impl Node { self.runtime.read().unwrap().is_some() } + /// Request a new channel to be opened with a remote peer. + pub fn schedule_payjoin_channel( + self, channel_amount_sats: u64, push_msat: Option, announce_channel: bool, + node_id: PublicKey, + ) -> Result { + let address = self.wallet.get_new_address()?; + let user_channel_id: u128 = rand::thread_rng().gen::(); + self.payjoin_scheduler.schedule(ScheduledChannel::new( + channel_amount_sats, + push_msat, + user_channel_id, + announce_channel, + node_id, + )); + let amount = Amount::from_sat(channel_amount_sats); + let pj = format!("https://0.0.0.0:{}/payjoin", self.config.payjoin_server_port); + let pj_uri_string = format!("{}?amount={}&pj={}", address.to_qr_uri(), amount.to_btc(), pj); + assert!(Uri::from_str(&pj_uri_string).is_ok()); + Ok(pj_uri_string) + } + + /// List all scheduled payjoin channels. + pub fn list_scheduled_channels(&self) -> Result, Error> { + Ok(self.payjoin_scheduler.list_scheduled_channels()) + } + /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`]. /// /// After this returns most API methods will return [`Error::NotRunning`]. diff --git a/src/pj.rs b/src/pj.rs new file mode 100644 index 000000000..772c8189e --- /dev/null +++ b/src/pj.rs @@ -0,0 +1,126 @@ +#![allow(dead_code)] +use crate::logger::FilesystemLogger; +use crate::types::{ChannelManager, PeerManager, Wallet}; +use bitcoin::secp256k1::PublicKey; +use http_body_util::{BodyExt, Full}; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::Request; +use hyper_util::rt::TokioIo; +use lightning::util::persist::KVStore; +use std::sync::{Arc, Mutex}; +use tokio::net::TcpStream; +use tokio::task::JoinError; + +#[derive(Clone)] +pub struct ScheduledChannel { + channel_amount_sats: u64, + push_msat: Option, + user_channel_id: u128, + announce_channel: bool, + node: PublicKey, +} + +impl ScheduledChannel { + pub fn new( + channel_amount_sats: u64, push_msat: Option, user_channel_id: u128, + announce_channel: bool, node: PublicKey, + ) -> Self { + Self { channel_amount_sats, push_msat, user_channel_id, announce_channel, node } + } +} + +#[derive(Clone)] +pub struct PayjoinExecuter { + wallet: Arc, + logger: Arc, + peer_manager: Arc>, + channel_manager: Arc>, +} + +impl PayjoinExecuter { + pub fn new( + wallet: Arc, logger: Arc, peer_manager: Arc>, + channel_manager: Arc>, + ) -> Self { + Self { wallet, logger, peer_manager, channel_manager } + } + + pub fn test(&self) -> String { + let connected_peers = self.peer_manager.get_peer_node_ids(); + for (node_id, con_addr_opt) in connected_peers { + println!("Connected to peer: {:?}", node_id); + println!("Connected to peer: {:?}", con_addr_opt); + } + "test".to_string() + } +} + +#[derive(Clone)] +pub struct PayjoinScheduler { + channels: Arc>>, + executer: Arc>, +} + +impl PayjoinScheduler { + pub fn new(executer: Arc>) -> Self { + Self { executer, channels: Arc::new(Mutex::new(Vec::new())) } + } + + pub fn schedule(&self, channel: ScheduledChannel) { + self.channels.lock().unwrap().push(channel); + } + + pub fn list_scheduled_channels(&self) -> Vec { + self.channels.lock().unwrap().clone() + } +} + +fn make_http_response(s: String) -> Result>, hyper::Error> { + Ok(hyper::Response::builder().body(Full::new(bytes::Bytes::from(s))).unwrap()) +} + +async fn payjoin_handler( + http_request: Request, pj_scheduler: Arc>, +) -> Result>, hyper::Error> { + match (http_request.method(), http_request.uri().path()) { + (&hyper::Method::POST, "/payjoin") => { + let _headers = http_request.headers().clone(); + let body = http_request.into_body().collect().await?; + let body = String::from_utf8(body.to_bytes().to_vec()).unwrap(); + let res = pj_scheduler.executer.test(); + assert_eq!(&res, "test"); + return make_http_response(body); + }, + _ => make_http_response("404".into()), + } +} + +pub struct PayjoinServer { + scheduler: Arc>, +} + +impl PayjoinServer { + pub fn new(scheduler: Arc>) -> Self { + Self { scheduler } + } + + pub async fn serve(&self, stream: TcpStream) -> Result<(), JoinError> { + let io = TokioIo::new(stream); + let state = self.scheduler.clone(); + tokio::task::spawn(async move { + if let Err(err) = http1::Builder::new() + .serve_connection( + io, + service_fn(move |http_request| payjoin_handler(http_request, state.clone())), + ) + .await + { + println!("Error serving connection: {:?}", err); + } else { + println!("Connection here"); + } + }) + .await + } +} diff --git a/src/wallet.rs b/src/wallet.rs index 51f6345e2..c068e27b3 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -2,6 +2,7 @@ use crate::logger::{log_error, log_info, log_trace, Logger}; use crate::Error; +use bitcoin::psbt::Psbt; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::ln::msgs::{DecodeError, UnsignedGossipMessage}; @@ -171,6 +172,17 @@ where Ok(self.inner.lock().unwrap().get_balance()?) } + pub(crate) fn _list_unspent(&self) -> Result, Error> { + Ok(self.inner.lock().unwrap().list_unspent()?) + } + + pub(crate) fn _wallet_process_psbt(&self, psbt: &Psbt) -> Result { + let wallet = self.inner.lock().unwrap(); + let mut psbt = psbt.clone(); + wallet.sign(&mut psbt, SignOptions::default())?; + Ok(psbt) + } + /// Send funds to the given address. /// /// If `amount_msat_or_drain` is `None` the wallet will be drained, i.e., all available funds will be diff --git a/tests/common.rs b/tests/common.rs index 815056b82..016a967e9 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -141,6 +141,7 @@ pub(crate) fn random_config() -> Config { config.listening_addresses = Some(rand_listening_addresses); config.log_level = LogLevel::Gossip; + config.payjoin_server_port = random_port(); config } diff --git a/tests/integration_tests_payjoin.rs b/tests/integration_tests_payjoin.rs new file mode 100644 index 000000000..99887d2a9 --- /dev/null +++ b/tests/integration_tests_payjoin.rs @@ -0,0 +1,124 @@ +mod common; + +use bitcoin::Amount; +use common::setup_bitcoind_and_electrsd; + +use bitcoincore_rpc::{Client as BitcoindClient, RpcApi}; + +use crate::common::{premine_and_distribute_funds, random_config, setup_node, setup_two_nodes}; + +mod payjoin_sender { + use bitcoincore_rpc::Client as BitcoindClient; + use bitcoincore_rpc::RpcApi; + use payjoin::bitcoin::address::NetworkChecked; + use std::collections::HashMap; + use std::str::FromStr; + + use bitcoincore_rpc::bitcoin::psbt::Psbt; + use payjoin::send::RequestBuilder; + + pub fn try_payjoin( + sender_wallet: &BitcoindClient, pj_uri: payjoin::Uri<'static, NetworkChecked>, + ) -> (String, String) { + // Step 1. Extract the parameters from the payjoin URI + let amount_to_send = pj_uri.amount.unwrap(); + let receiver_address = pj_uri.address.clone(); + // Step 2. Construct URI request parameters, a finalized "Original PSBT" paying `.amount` to `.address` + let mut outputs = HashMap::with_capacity(1); + outputs.insert(receiver_address.to_string(), amount_to_send); + let options = bitcoincore_rpc::json::WalletCreateFundedPsbtOptions { + lock_unspent: Some(false), + fee_rate: Some(bitcoincore_rpc::bitcoin::Amount::from_sat(10000)), + ..Default::default() + }; + let sender_psbt = sender_wallet + .wallet_create_funded_psbt( + &[], // inputs + &outputs, + None, // locktime + Some(options), + None, + ) + .unwrap(); + let psbt = + sender_wallet.wallet_process_psbt(&sender_psbt.psbt, None, None, None).unwrap().psbt; + let psbt = Psbt::from_str(&psbt).unwrap(); + // Step 4. Construct the request with the PSBT and parameters + let (req, _ctx) = RequestBuilder::from_psbt_and_uri(psbt.clone(), pj_uri) + .unwrap() + .build_with_additional_fee( + bitcoincore_rpc::bitcoin::Amount::from_sat(1), + None, + bitcoincore_rpc::bitcoin::FeeRate::MIN, + true, + ) + .unwrap() + .extract_v1() + .unwrap(); + // Step 5. Send the request and receive response + // let payjoin_url = pj_uri.extras.e + // BITCOIN:BCRT1Q0S724W239Z2XQGSZV6TE96HMYLEDCTX3GDFZEP?amount=0.01&pj=https://localhost:3000 + let url_http = req.url.as_str().replace("https", "http"); + dbg!(&url_http); + let res = ureq::post(&url_http) + .set("content-type", "text/plain") + .send_string(&String::from_utf8(req.body.clone()).unwrap()) + .unwrap(); + let res = res.into_string().unwrap(); + (res, String::from_utf8(req.body).unwrap()) + // Step 6. Process the response + // + // An `Ok` response should include a Payjoin Proposal PSBT. + // Check that it's signed, following protocol, not trying to steal or otherwise error. + //let psbt = ctx.process_response(&mut res.as_bytes()).unwrap(); + //// Step 7. Sign and finalize the Payjoin Proposal PSBT + //// + //// Most software can handle adding the last signatures to a PSBT without issue. + //let psbt = sender_wallet + // .wallet_process_psbt(&base64::encode(psbt.serialize()), None, None, None) + // .unwrap() + // .psbt; + //let tx = sender_wallet.finalize_psbt(&psbt, Some(true)).unwrap().hex.unwrap(); + //// Step 8. Broadcast the Payjoin Transaction + //let txid = sender_wallet.send_raw_transaction(&tx).unwrap(); + //txid + } +} + +#[test] +fn payjoin() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let payjoin_sender_wallet: BitcoindClient = bitcoind.create_wallet("payjoin_sender").unwrap(); + let config_a = random_config(); + let node_a = setup_node(&electrsd, config_a); + // sleep for 1 minutes + // std::thread::sleep(std::time::Duration::from_secs(60)); + // let (node_a, node_b) = setup_two_nodes(&electrsd, false); + let addr_a = node_a.new_onchain_address().unwrap(); + let addr_sender = payjoin_sender_wallet.get_new_address(None, None).unwrap().assume_checked(); + let premine_amount_sat = 100_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_sender], + Amount::from_sat(premine_amount_sat), + ); + node_a.sync_wallets().unwrap(); + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + assert_eq!( + payjoin_sender_wallet.get_balances().unwrap().mine.trusted.to_sat(), + premine_amount_sat + ); + assert_eq!(node_a.next_event(), None); + let funding_amount_sat = 80_000; + let node_id = node_a.node_id(); + let pj_uri = + node_a.schedule_payjoin_channel(funding_amount_sat, Some(10_000), true, node_id).unwrap(); + println!("Payjoin URI: {:?}", pj_uri); + dbg!(&pj_uri); + let (receiver_response, sender_original_psbt) = payjoin_sender::try_payjoin( + &payjoin_sender_wallet, + payjoin::Uri::try_from(pj_uri).unwrap().assume_checked(), + ); + assert_eq!(receiver_response, sender_original_psbt); +}