Skip to content

Commit

Permalink
move schedule to lightning-payjoin
Browse files Browse the repository at this point in the history
  • Loading branch information
jbesraa committed Apr 6, 2024
1 parent 96bfba8 commit 588db45
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 191 deletions.
5 changes: 3 additions & 2 deletions lightning-payjoin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", "time", "sync" ] }
tokio = { version = "1", features = ["full"] }
http-body-util = "0.1.0"
hyper = {version = "1.2.0", features = ["http1", "server"]}
bytes = "1.5.0"
hyper-util = {version = "0.1.3", features = ["tokio"] }
payjoin = { version = "0.13.0", features = ["receive", "send"] }
bitcoin = "0.30.2"
async-trait = "0.1.79"
lightning = { git = "https://github.com/jbesraa/rust-lightning.git", rev = "d3e2d5a", features = ["std"] }
rand = "0.8.5"
197 changes: 127 additions & 70 deletions lightning-payjoin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use async_trait::async_trait;
use http_body_util::Full;
pub mod scheduler;
pub use scheduler::FundingTxParams;

use bitcoin::absolute::LockTime;
use bitcoin::psbt::Psbt;
use bitcoin::secp256k1::PublicKey;
use bitcoin::{base64, ScriptBuf};
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::header::HeaderValue;
use hyper::server::conn::http1;
Expand All @@ -12,31 +18,35 @@ use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::task::JoinError;

#[async_trait]
pub trait PayjoinLNReceiver {
fn convert_payjoin_request_to_funding_tx(
&self, request: Request<Incoming>,
) -> impl std::future::Future<Output = Result<String, Box<dyn std::error::Error>>> + std::marker::Send;
pub trait PayjoinLNReceiver: Send + Sync + 'static + Clone {
fn is_mine(&self, script: &ScriptBuf) -> Result<bool, Box<dyn std::error::Error>>;
fn notify_funding_generated(
&self, temporary_channel_id: [u8; 32], counterparty_node_id: PublicKey,
funding_tx: bitcoin::Transaction,
) -> Result<(), Box<dyn std::error::Error>>;
}

#[derive(Clone)]
pub struct PayjoinService<P: PayjoinLNReceiver + Send + Sync + 'static + Clone> {
receiver_handler: P,
scheduler: Arc<Mutex<scheduler::ChannelScheduler>>,
}

impl<P> PayjoinService<P>
where
P: PayjoinLNReceiver + Send + Sync + 'static + Clone,
{
pub fn new(receiver_handler: P) -> Self {
Self { receiver_handler }
pub fn new(receiver_handler: P, scheduler: Arc<Mutex<scheduler::ChannelScheduler>>) -> Self {
Self { receiver_handler, scheduler }
}

pub async fn serve_incoming_payjoin_requests(
&self, stream: TcpStream,
) -> Result<(), JoinError> {
let io = TokioIo::new(stream);
let payjoin_lightning =
Arc::new(Mutex::new(PayjoinService::new(self.receiver_handler.clone())));
let receiver = self.receiver_handler.clone();
let scheduler = self.scheduler.clone();
let payjoin_lightning = Arc::new(Mutex::new(PayjoinService::new(receiver, scheduler)));
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(
Expand All @@ -52,15 +62,69 @@ where
})
.await
}

async fn convert_payjoin_request_to_funding_tx(
&self, request: Request<Incoming>,
) -> Result<String, Box<dyn std::error::Error>> {
self.receiver_handler.convert_payjoin_request_to_funding_tx(request).await
let is_output_mine =
|script: &ScriptBuf| self.receiver_handler.is_mine(script).map_err(|e| e.into());
let (psbt, amount_to_us) = extract_psbt_from_http_request(request, is_output_mine).await?;
let channel = match self.scheduler.lock().await.get_next_channel(amount_to_us) {
Some(channel) => channel,
None => {
panic!("No channel available for payjoin");
},
};
assert!(channel.is_channel_accepted());
let locktime = match channel.locktime() {
Some(locktime) => locktime,
None => unreachable!(),
};
let output_script = match channel.output_script() {
Some(output_script) => output_script,
None => unreachable!(),
};
let temporary_channel_id = match channel.temporary_channel_id() {
Some(temporary_channel_id) => temporary_channel_id,
None => unreachable!(),
};
let psbt = from_original_psbt_to_funding_psbt(
output_script,
channel.channel_value_satoshi(),
psbt,
locktime,
is_output_mine,
);
let funding_tx = psbt.clone().extract_tx();
self.scheduler
.lock()
.await
.mark_as_funding_tx_created(channel.user_channel_id(), funding_tx.clone());
let counterparty_node_id = channel.node_id();
let _ = self.receiver_handler.notify_funding_generated(
temporary_channel_id.0,
counterparty_node_id,
funding_tx.clone(),
)?;
let res = tokio::time::timeout(tokio::time::Duration::from_secs(3), async move {
let txid = funding_tx.clone().txid();
loop {
if self.scheduler.lock().await.is_funding_tx_signed(txid) {
break;
}
}
})
.await;
if res.is_err() {
panic!("Funding tx not signed");
// broadcast original tx
}
Ok(psbt.to_string())
}

async fn http_router(
http_request: Request<Incoming>, payjoin_lightning: Arc<Mutex<PayjoinService<P>>>,
) -> Result<hyper::Response<Full<bytes::Bytes>>, hyper::Error> {
use self::utils::http_response;
match (http_request.method(), http_request.uri().path()) {
(&hyper::Method::POST, "/payjoin") => {
let payjoin_lightning = payjoin_lightning.lock().await;
Expand All @@ -75,65 +139,58 @@ where
}
}

pub mod utils {
use bitcoin::{absolute::LockTime, base64, psbt::Psbt, ScriptBuf};
use http_body_util::{BodyExt, Full};
use hyper::{body::Incoming, header::HeaderValue, HeaderMap};

pub async fn extract_psbt_from_http_request(
request: hyper::Request<Incoming>,
is_mine: impl Fn(&ScriptBuf) -> Result<bool, Box<dyn std::error::Error>>,
) -> Result<(Psbt, u64), Box<dyn std::error::Error>> {
let headers = request.headers().clone();
let body = request.into_body().collect().await?;
let body = String::from_utf8(body.to_bytes().to_vec()).unwrap();
let psbt = body_to_psbt(headers.clone(), body.as_bytes());
let amount_to_us = amount_directed_to_us_sat(psbt.clone(), is_mine);
Ok((psbt, amount_to_us))
}
pub fn body_to_psbt(headers: HeaderMap<HeaderValue>, mut body: impl std::io::Read) -> Psbt {
let content_length =
headers.get("content-length").unwrap().to_str().unwrap().parse::<u64>().unwrap();
let mut buf = vec![0; content_length as usize]; // 4_000_000 * 4 / 3 fits in u32
body.read_exact(&mut buf).unwrap();
let base64 = base64::decode(&buf).unwrap();
let psbt = Psbt::deserialize(&base64).unwrap();
psbt
}
pub async fn extract_psbt_from_http_request(
request: hyper::Request<Incoming>,
is_mine: impl Fn(&ScriptBuf) -> Result<bool, Box<dyn std::error::Error>>,
) -> Result<(Psbt, u64), Box<dyn std::error::Error>> {
let headers = request.headers().clone();
let body = request.into_body().collect().await?;
let body = String::from_utf8(body.to_bytes().to_vec()).unwrap();
let psbt = body_to_psbt(headers.clone(), body.as_bytes());
let amount_to_us = amount_directed_to_us_sat(psbt.clone(), is_mine);
Ok((psbt, amount_to_us))
}
pub fn body_to_psbt(headers: HeaderMap<HeaderValue>, mut body: impl std::io::Read) -> Psbt {
let content_length =
headers.get("content-length").unwrap().to_str().unwrap().parse::<u64>().unwrap();
let mut buf = vec![0; content_length as usize]; // 4_000_000 * 4 / 3 fits in u32
body.read_exact(&mut buf).unwrap();
let base64 = base64::decode(&buf).unwrap();
let psbt = Psbt::deserialize(&base64).unwrap();
psbt
}

pub fn from_original_psbt_to_funding_psbt(
output_script: ScriptBuf, channel_value_sat: u64, mut psbt: Psbt, locktime: LockTime,
is_mine: impl Fn(&ScriptBuf) -> Result<bool, Box<dyn std::error::Error>>,
) -> Psbt {
let multisig_script = output_script;
psbt.unsigned_tx.lock_time = locktime;
psbt.unsigned_tx.output.push(bitcoin::TxOut {
value: channel_value_sat,
script_pubkey: multisig_script.clone(),
});
psbt.unsigned_tx.output.retain(|output| {
let is_mine = is_mine(&output.script_pubkey).unwrap();
!is_mine || output.script_pubkey == multisig_script
});
let psbt = Psbt::from_unsigned_tx(psbt.unsigned_tx).unwrap();
psbt
}
pub fn from_original_psbt_to_funding_psbt(
output_script: ScriptBuf, channel_value_sat: u64, mut psbt: Psbt, locktime: LockTime,
is_mine: impl Fn(&ScriptBuf) -> Result<bool, Box<dyn std::error::Error>>,
) -> Psbt {
let multisig_script = output_script;
psbt.unsigned_tx.lock_time = locktime;
psbt.unsigned_tx
.output
.push(bitcoin::TxOut { value: channel_value_sat, script_pubkey: multisig_script.clone() });
psbt.unsigned_tx.output.retain(|output| {
let is_mine = is_mine(&output.script_pubkey).unwrap();
!is_mine || output.script_pubkey == multisig_script
});
let psbt = Psbt::from_unsigned_tx(psbt.unsigned_tx).unwrap();
psbt
}

fn amount_directed_to_us_sat(
psbt: Psbt, is_mine: impl Fn(&ScriptBuf) -> Result<bool, Box<dyn std::error::Error>>,
) -> u64 {
let mut ret = 0;
psbt.unsigned_tx.output.iter().for_each(|output| {
let is_mine = is_mine(&output.script_pubkey).unwrap();
if is_mine {
ret += output.value;
}
});
ret
}
pub fn http_response(s: String) -> Result<hyper::Response<Full<bytes::Bytes>>, hyper::Error> {
Ok(hyper::Response::builder().body(Full::new(bytes::Bytes::from(s))).unwrap())
}
fn amount_directed_to_us_sat(
psbt: Psbt, is_mine: impl Fn(&ScriptBuf) -> Result<bool, Box<dyn std::error::Error>>,
) -> u64 {
let mut ret = 0;
psbt.unsigned_tx.output.iter().for_each(|output| {
let is_mine = is_mine(&output.script_pubkey).unwrap();
if is_mine {
ret += output.value;
}
});
ret
}
pub fn http_response(s: String) -> Result<hyper::Response<Full<bytes::Bytes>>, hyper::Error> {
Ok(hyper::Response::builder().body(Full::new(bytes::Bytes::from(s))).unwrap())
}

struct RequestHeaders(HashMap<String, String>);
Expand Down
Loading

0 comments on commit 588db45

Please sign in to comment.