Skip to content

Commit

Permalink
f Add PayjoinScheduler and PayjoinServer
Browse files Browse the repository at this point in the history
  • Loading branch information
jbesraa committed Mar 1, 2024
1 parent b95432c commit 0c214d9
Show file tree
Hide file tree
Showing 8 changed files with 348 additions and 4 deletions.
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ lightning-liquidity = { version = "0.1.0-alpha.1", features = ["std"] }

bdk = { version = "0.29.0", default-features = false, features = ["std", "async-interface", "use-esplora-async", "sqlite-bundled", "keys-bip39"]}

reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "blocking"] }
rusqlite = { version = "0.28.0", features = ["bundled"] }
bitcoin = "0.30.2"
bip39 = "2.0.0"
Expand All @@ -69,6 +69,14 @@ tokio = { version = "1", default-features = false, features = [ "rt-multi-thread
esplora-client = { version = "0.6", default-features = false }
libc = "0.2"
uniffi = { version = "0.26.0", features = ["build"], optional = true }
payjoin = { version = "0.13.0", features = ["receive", "send"] }
http-body-util = "0.1.0"
bitcoincore-rpc = "0.17.0"
ureq = "2.9.1"
hyper = {version = "1.2.0", features = ["http1", "server"]}
rustls = "0.21.9"
bytes = "1.5.0"
hyper-util = {version = "0.1.3", features = ["tokio"] }

[target.'cfg(vss)'.dependencies]
vss-client = "0.2"
Expand Down
11 changes: 11 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::logger::{log_error, FilesystemLogger, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment_store::PaymentStore;
use crate::peer_store::PeerStore;
use crate::pj::{PayjoinExecuter, PayjoinScheduler};
use crate::sweep::OutputSweeper;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
Expand Down Expand Up @@ -943,6 +944,15 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
},
};

let payjoin_executer = Arc::new(PayjoinExecuter::new(
Arc::clone(&wallet),
Arc::clone(&logger),
Arc::clone(&peer_manager),
Arc::clone(&channel_manager),
));

let payjoin_scheduler = Arc::new(PayjoinScheduler::new(Arc::clone(&payjoin_executer)));

let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());

Ok(Node {
Expand All @@ -958,6 +968,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
channel_manager,
chain_monitor,
output_sweeper,
payjoin_scheduler,
peer_manager,
keys_manager,
network_graph,
Expand Down
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10;
// The length in bytes of our wallets' keys seed.
pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64;

// Port used by the Payjoin HTTP server.
pub(crate) const DEFAULT_PAYJOIN_HTTP_SERVER_PORT: u16 = 3227;

#[derive(Debug, Clone)]
/// Represents the configuration of an [`Node`] instance.
///
Expand Down Expand Up @@ -104,6 +107,8 @@ pub struct Config {
///
/// Any messages below this level will be excluded from the logs.
pub log_level: LogLevel,
/// Payjoin server port
pub payjoin_server_port: u16,
}

impl Default for Config {
Expand All @@ -120,6 +125,7 @@ impl Default for Config {
trusted_peers_0conf: Vec::new(),
probing_liquidity_limit_multiplier: DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER,
log_level: DEFAULT_LOG_LEVEL,
payjoin_server_port: DEFAULT_PAYJOIN_HTTP_SERVER_PORT,
}
}
}
Expand Down
62 changes: 59 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod logger;
mod message_handler;
mod payment_store;
mod peer_store;
mod pj;
mod sweep;
mod tx_broadcaster;
mod types;
Expand All @@ -107,6 +108,8 @@ pub use error::Error as NodeError;
use error::Error;

pub use event::Event;
use payjoin::Uri;
use pj::{PayjoinScheduler, PayjoinServer};
pub use types::ChannelConfig;

pub use io::utils::generate_entropy_mnemonic;
Expand Down Expand Up @@ -157,18 +160,20 @@ use lightning_transaction_sync::EsploraSyncClient;
use lightning::routing::router::{PaymentParameters, RouteParameters};
use lightning_invoice::{payment, Bolt11Invoice, Currency};

use bitcoin::hashes::sha256::Hash as Sha256;
use bitcoin::hashes::Hash;
use bitcoin::secp256k1::PublicKey;
use bitcoin::{hashes::sha256::Hash as Sha256, Amount};

use bitcoin::{Address, Txid};

use rand::Rng;

use std::default::Default;
use std::net::ToSocketAddrs;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime};
use std::{default::Default, str::FromStr};

use crate::pj::ScheduledChannel;

#[cfg(feature = "uniffi")]
uniffi::include_scaffolding!("ldk_node");
Expand All @@ -189,6 +194,7 @@ pub struct Node<K: KVStore + Sync + Send + 'static> {
channel_manager: Arc<ChannelManager<K>>,
chain_monitor: Arc<ChainMonitor<K>>,
output_sweeper: Arc<Sweeper<K>>,
payjoin_scheduler: Arc<PayjoinScheduler<K>>,
peer_manager: Arc<PeerManager<K>>,
keys_manager: Arc<KeysManager>,
network_graph: Arc<NetworkGraph>,
Expand Down Expand Up @@ -462,6 +468,30 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
});
}

let pj_scheduler = Arc::clone(&self.payjoin_scheduler);
let pj_server = PayjoinServer::new(pj_scheduler);
let mut stop_payjoin_server = self.stop_receiver.clone();
let pj_port = self.config.payjoin_server_port;
runtime.spawn(async move {
let addr = SocketAddr::from(([0, 0, 0, 0], pj_port));
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
loop {
let (stream, _) = match listener.accept().await {
Ok(res) => res,
Err(e) => {
println!("Failed to accept incoming payjoin connection: {}", e);
continue;
},
};
tokio::select! {
_ = stop_payjoin_server.changed() => {
return;
}
_ = pj_server.serve(stream) => {}
}
}
});

// Regularly reconnect to channel peers.
let connect_cm = Arc::clone(&self.channel_manager);
let connect_pm = Arc::clone(&self.peer_manager);
Expand Down Expand Up @@ -675,6 +705,32 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
self.runtime.read().unwrap().is_some()
}

/// Request a new channel to be opened with a remote peer.
pub fn schedule_payjoin_channel(
self, channel_amount_sats: u64, push_msat: Option<u64>, announce_channel: bool,
node_id: PublicKey,
) -> Result<String, Error> {
let address = self.wallet.get_new_address()?;
let user_channel_id: u128 = rand::thread_rng().gen::<u128>();
self.payjoin_scheduler.schedule(ScheduledChannel::new(
channel_amount_sats,
push_msat,
user_channel_id,
announce_channel,
node_id,
));
let amount = Amount::from_sat(channel_amount_sats);
let pj = format!("https://0.0.0.0:{}/payjoin", self.config.payjoin_server_port);
let pj_uri_string = format!("{}?amount={}&pj={}", address.to_qr_uri(), amount.to_btc(), pj);
assert!(Uri::from_str(&pj_uri_string).is_ok());
Ok(pj_uri_string)
}

/// List all scheduled payjoin channels.
pub fn list_scheduled_channels(&self) -> Result<Vec<ScheduledChannel>, Error> {
Ok(self.payjoin_scheduler.list_scheduled_channels())
}

/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
///
/// After this returns most API methods will return [`Error::NotRunning`].
Expand Down
126 changes: 126 additions & 0 deletions src/pj.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#![allow(dead_code)]
use crate::logger::FilesystemLogger;
use crate::types::{ChannelManager, PeerManager, Wallet};
use bitcoin::secp256k1::PublicKey;
use http_body_util::{BodyExt, Full};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::Request;
use hyper_util::rt::TokioIo;
use lightning::util::persist::KVStore;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
use tokio::task::JoinError;

#[derive(Clone)]
pub struct ScheduledChannel {
channel_amount_sats: u64,
push_msat: Option<u64>,
user_channel_id: u128,
announce_channel: bool,
node: PublicKey,
}

impl ScheduledChannel {
pub fn new(
channel_amount_sats: u64, push_msat: Option<u64>, user_channel_id: u128,
announce_channel: bool, node: PublicKey,
) -> Self {
Self { channel_amount_sats, push_msat, user_channel_id, announce_channel, node }
}
}

#[derive(Clone)]
pub struct PayjoinExecuter<K: KVStore + Sync + Send + 'static> {
wallet: Arc<Wallet>,
logger: Arc<FilesystemLogger>,
peer_manager: Arc<PeerManager<K>>,
channel_manager: Arc<ChannelManager<K>>,
}

impl<K: KVStore + Sync + Send + 'static> PayjoinExecuter<K> {
pub fn new(
wallet: Arc<Wallet>, logger: Arc<FilesystemLogger>, peer_manager: Arc<PeerManager<K>>,
channel_manager: Arc<ChannelManager<K>>,
) -> Self {
Self { wallet, logger, peer_manager, channel_manager }
}

pub fn test(&self) -> String {
let connected_peers = self.peer_manager.get_peer_node_ids();
for (node_id, con_addr_opt) in connected_peers {
println!("Connected to peer: {:?}", node_id);
println!("Connected to peer: {:?}", con_addr_opt);
}
"test".to_string()
}
}

#[derive(Clone)]
pub struct PayjoinScheduler<K: KVStore + Sync + Send + 'static> {
channels: Arc<Mutex<Vec<ScheduledChannel>>>,
executer: Arc<PayjoinExecuter<K>>,
}

impl<K: KVStore + Sync + Send + 'static> PayjoinScheduler<K> {
pub fn new(executer: Arc<PayjoinExecuter<K>>) -> Self {
Self { executer, channels: Arc::new(Mutex::new(Vec::new())) }
}

pub fn schedule(&self, channel: ScheduledChannel) {
self.channels.lock().unwrap().push(channel);
}

pub fn list_scheduled_channels(&self) -> Vec<ScheduledChannel> {
self.channels.lock().unwrap().clone()
}
}

fn make_http_response(s: String) -> Result<hyper::Response<Full<bytes::Bytes>>, hyper::Error> {
Ok(hyper::Response::builder().body(Full::new(bytes::Bytes::from(s))).unwrap())
}

async fn payjoin_handler<K: KVStore + Sync + Send + 'static>(
http_request: Request<hyper::body::Incoming>, pj_scheduler: Arc<PayjoinScheduler<K>>,
) -> Result<hyper::Response<Full<bytes::Bytes>>, hyper::Error> {
match (http_request.method(), http_request.uri().path()) {
(&hyper::Method::POST, "/payjoin") => {
let _headers = http_request.headers().clone();
let body = http_request.into_body().collect().await?;
let body = String::from_utf8(body.to_bytes().to_vec()).unwrap();
let res = pj_scheduler.executer.test();
assert_eq!(&res, "test");
return make_http_response(body);
},
_ => make_http_response("404".into()),
}
}

pub struct PayjoinServer<K: KVStore + Sync + Send + 'static> {
scheduler: Arc<PayjoinScheduler<K>>,
}

impl<K: KVStore + Sync + Send + 'static> PayjoinServer<K> {
pub fn new(scheduler: Arc<PayjoinScheduler<K>>) -> Self {
Self { scheduler }
}

pub async fn serve(&self, stream: TcpStream) -> Result<(), JoinError> {
let io = TokioIo::new(stream);
let state = self.scheduler.clone();
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(
io,
service_fn(move |http_request| payjoin_handler(http_request, state.clone())),
)
.await
{
println!("Error serving connection: {:?}", err);
} else {
println!("Connection here");
}
})
.await
}
}
12 changes: 12 additions & 0 deletions src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::logger::{log_error, log_info, log_trace, Logger};

use crate::Error;

use bitcoin::psbt::Psbt;
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};

use lightning::ln::msgs::{DecodeError, UnsignedGossipMessage};
Expand Down Expand Up @@ -171,6 +172,17 @@ where
Ok(self.inner.lock().unwrap().get_balance()?)
}

pub(crate) fn _list_unspent(&self) -> Result<Vec<bdk::LocalUtxo>, Error> {
Ok(self.inner.lock().unwrap().list_unspent()?)
}

pub(crate) fn _wallet_process_psbt(&self, psbt: &Psbt) -> Result<Psbt, Error> {
let wallet = self.inner.lock().unwrap();
let mut psbt = psbt.clone();
wallet.sign(&mut psbt, SignOptions::default())?;
Ok(psbt)
}

/// Send funds to the given address.
///
/// If `amount_msat_or_drain` is `None` the wallet will be drained, i.e., all available funds will be
Expand Down
1 change: 1 addition & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pub(crate) fn random_config() -> Config {
config.listening_addresses = Some(rand_listening_addresses);

config.log_level = LogLevel::Gossip;
config.payjoin_server_port = random_port();

config
}
Expand Down
Loading

0 comments on commit 0c214d9

Please sign in to comment.