Skip to content

Commit

Permalink
split pkgs and test payjoin channel payment
Browse files Browse the repository at this point in the history
  • Loading branch information
jbesraa committed Apr 4, 2024
1 parent 5964414 commit e487ce3
Show file tree
Hide file tree
Showing 14 changed files with 671 additions and 618 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generated by Cargo
# will have compiled files and executables
/target/
/lightning-payjoin/target/

# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip
lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync", features = ["esplora-async"] }
lightning-liquidity = { path = "../lightning-liquidity", features = ["std"] }

lightning-payjoin = { path = "./lightning-payjoin" }
bdk = { version = "0.29.0", default-features = false, features = ["std", "async-interface", "use-esplora-async", "sqlite-bundled", "keys-bip39"]}

reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "blocking"] }
Expand Down
15 changes: 15 additions & 0 deletions lightning-payjoin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "lightning-payjoin"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", "time", "sync" ] }
http-body-util = "0.1.0"
hyper = {version = "1.2.0", features = ["http1", "server"]}
bytes = "1.5.0"
hyper-util = {version = "0.1.3", features = ["tokio"] }
payjoin = { version = "0.13.0", features = ["receive", "send"] }
bitcoin = "0.30.2"
139 changes: 139 additions & 0 deletions lightning-payjoin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::header::HeaderValue;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{HeaderMap, Request};
use hyper_util::rt::TokioIo;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::task::JoinError;

use self::utils::http_response;

pub trait PayjoinLNReceiver {
fn convert_payjoin_request_to_funding_tx(
&self, request: Request<Incoming>,
) -> impl std::future::Future<Output = Result<String, Box<dyn std::error::Error>>> + std::marker::Send;
}

#[derive(Clone)]
pub struct PayjoinService<P: PayjoinLNReceiver + Send + Sync + 'static + Clone> {
receiver_handler: P,
}

impl<P> PayjoinService<P>
where
P: PayjoinLNReceiver + Send + Sync + 'static + Clone,
{
pub fn new(receiver_handler: P) -> Self {
Self { receiver_handler }
}
pub async fn serve_incoming_payjoin_requests(&self, stream: TcpStream) -> Result<(), JoinError> {
let io = TokioIo::new(stream);
let payjoin_lightning = Arc::new(Mutex::new(PayjoinService::new(self.receiver_handler.clone())));
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(
io,
service_fn(move |http_request| {
Self::http_router(http_request, payjoin_lightning.clone())
}),
)
.await
{
println!("Error serving connection: {:?}", err);
}
})
.await
}
async fn convert_payjoin_request_to_funding_tx(
&self, request: Request<Incoming>,
) -> Result<String, Box<dyn std::error::Error>> {
self.receiver_handler.convert_payjoin_request_to_funding_tx(request).await
}
async fn http_router(
http_request: Request<Incoming>, payjoin_lightning: Arc<Mutex<PayjoinService<P>>>,
) -> Result<hyper::Response<Full<bytes::Bytes>>, hyper::Error> {
match (http_request.method(), http_request.uri().path()) {
(&hyper::Method::POST, "/payjoin") => {
let payjoin_lightning = payjoin_lightning.lock().await;
let payjoin_proposal =
payjoin_lightning.convert_payjoin_request_to_funding_tx(http_request).await.unwrap();
return http_response(payjoin_proposal);
},
_ => http_response("404".into()),
}
}
}

pub mod utils {
use bitcoin::{absolute::LockTime, base64, psbt::Psbt, ScriptBuf};
use http_body_util::Full;
use hyper::{header::HeaderValue, HeaderMap};

pub fn body_to_psbt(headers: HeaderMap<HeaderValue>, mut body: impl std::io::Read) -> Psbt {
let content_length =
headers.get("content-length").unwrap().to_str().unwrap().parse::<u64>().unwrap();
let mut buf = vec![0; content_length as usize]; // 4_000_000 * 4 / 3 fits in u32
body.read_exact(&mut buf).unwrap();
let base64 = base64::decode(&buf).unwrap();
let psbt = Psbt::deserialize(&base64).unwrap();
psbt
}

pub fn from_original_psbt_to_funding_psbt(
output_script: ScriptBuf, channel_value_sat: u64, mut psbt: Psbt, locktime: LockTime,
is_mine: impl Fn(&ScriptBuf) -> Result<bool, Box<dyn std::error::Error>>,
) -> Psbt {
let multisig_script = output_script;
psbt.unsigned_tx.lock_time = locktime;
psbt.unsigned_tx
.output
.push(bitcoin::TxOut { value: channel_value_sat, script_pubkey: multisig_script.clone() });
psbt.unsigned_tx.output.retain(|output| {
let is_mine = is_mine(&output.script_pubkey).unwrap();
!is_mine || output.script_pubkey == multisig_script
});
let psbt = Psbt::from_unsigned_tx(psbt.unsigned_tx).unwrap();
psbt
}

pub fn amount_directed_to_us_sat(
psbt: Psbt, is_mine: impl Fn(&ScriptBuf) -> Result<bool, Box<dyn std::error::Error>>,
) -> u64 {
let mut ret = 0;
psbt.unsigned_tx.output.iter().for_each(|output| {
let is_mine = is_mine(&output.script_pubkey).unwrap();
if is_mine {
ret += output.value;
}
});
ret
}
pub fn http_response(
s: String,
) -> Result<hyper::Response<Full<bytes::Bytes>>, hyper::Error> {
Ok(hyper::Response::builder().body(Full::new(bytes::Bytes::from(s))).unwrap())
}
}

struct RequestHeaders(HashMap<String, String>);

impl payjoin::receive::Headers for RequestHeaders {
fn get_header(&self, key: &str) -> Option<&str> {
self.0.get(key).map(|e| e.as_str())
}
}

impl From<HeaderMap<HeaderValue>> for RequestHeaders {
fn from(req: HeaderMap<HeaderValue>) -> Self {
let mut h = HashMap::new();
for (k, v) in req.iter() {
h.insert(k.to_string(), v.to_str().unwrap().to_string());
}
RequestHeaders(h)
}
}
16 changes: 7 additions & 9 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::channel_scheduler::ChannelScheduler;
use crate::config::{
Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, DEFAULT_ESPLORA_SERVER_URL,
WALLET_KEYS_SEED_LEN,
Expand All @@ -10,10 +11,9 @@ use crate::io::sqlite_store::SqliteStore;
use crate::liquidity::LiquiditySource;
use crate::logger::{log_error, FilesystemLogger, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payjoin_handler::PayjoinChannelManager;
use crate::payment_store::PaymentStore;
use crate::peer_store::PeerStore;
use crate::pj_new_crate::ChannelScheduler;
use crate::pjoin::LDKPayjoinExecuter;
use crate::sweep::OutputSweeper;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
Expand Down Expand Up @@ -570,7 +570,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
tx_sync.client().clone(),
Arc::clone(&logger),
Arc::clone(&channel_scheduler)
Arc::clone(&channel_scheduler),
));
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
tx_sync.client().clone(),
Expand All @@ -589,7 +589,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
tx_sync.client().clone(),
Arc::clone(&logger),
Arc::clone(&channel_scheduler)
Arc::clone(&channel_scheduler),
));
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
tx_sync.client().clone(),
Expand Down Expand Up @@ -951,14 +951,12 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
};

let (stop_sender, _) = tokio::sync::watch::channel(());
let payjoin_executer = LDKPayjoinExecuter::new(
let payjoin_channels_handler = PayjoinChannelManager::new(
Arc::clone(&wallet),
Arc::clone(&logger),
Arc::clone(&peer_manager),
Arc::clone(&channel_manager),
Arc::clone(&channel_scheduler)
Arc::clone(&channel_scheduler),
);
let payjoin = Arc::new(LDKPayjoin::new(payjoin_executer));
let payjoin = Arc::new(LDKPayjoin::new(payjoin_channels_handler));

let is_listening = Arc::new(AtomicBool::new(false));
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
Expand Down
Loading

0 comments on commit e487ce3

Please sign in to comment.