Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a funding_value field in channel_announcements #86

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::hex_utils;
use crate::verifier::ChainVerifier;

use std::env;
use std::net::{SocketAddr, ToSocketAddrs};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use bitcoin::io::Cursor;
Expand All @@ -10,11 +13,15 @@ use bitcoin::hashes::hex::FromHex;
use bitcoin::secp256k1::PublicKey;
use futures::stream::{FuturesUnordered, StreamExt};
use lightning::ln::msgs::ChannelAnnouncement;
use lightning::util::logger::Logger;
use lightning::util::ser::Readable;
use lightning_block_sync::http::HttpEndpoint;
use lightning_block_sync::rest::RestClient;
use tokio_postgres::Config;

pub(crate) const SCHEMA_VERSION: i32 = 14;
use tokio::sync::Semaphore;

pub(crate) const SCHEMA_VERSION: i32 = 15;
pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
// generate symlinks based on a 3-hour-granularity
Expand Down Expand Up @@ -120,6 +127,7 @@ pub(crate) fn db_announcement_table_creation_query() -> &'static str {
"CREATE TABLE IF NOT EXISTS channel_announcements (
id SERIAL PRIMARY KEY,
short_channel_id bigint NOT NULL UNIQUE,
funding_amount_sats bigint NOT NULL,
announcement_signed BYTEA,
seen timestamp NOT NULL DEFAULT NOW()
)"
Expand Down Expand Up @@ -167,7 +175,9 @@ pub(crate) fn db_index_creation_query() -> &'static str {
"
}

pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) {
pub(crate) async fn upgrade_db<L: Deref + Clone + Send + Sync + 'static>(
schema: i32, client: &mut tokio_postgres::Client, logger: L,
) where L::Target: Logger {
if schema == 1 {
let tx = client.transaction().await.unwrap();
tx.execute("ALTER TABLE channel_updates DROP COLUMN chain_hash", &[]).await.unwrap();
Expand Down Expand Up @@ -313,6 +323,33 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
tx.execute("UPDATE config SET db_schema = 14 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
if schema >= 1 && schema <= 14 {
println!("Upgrading to schema 15 requiring UTXO lookups for each historical channel announcement. This may take some time");
// Note that we don't bother doing this one in a transaction, and as such need to support
// resuming on a crash.
let _ = client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await;
tokio::spawn(async move {
let client = crate::connect_to_db().await;
let mut scids = Box::pin(client.query_raw("SELECT DISTINCT ON (short_channel_id) short_channel_id FROM channel_announcements WHERE funding_amount_sats IS NULL;", &[0i64][1..]).await.unwrap());
let sem = Arc::new(Semaphore::new(16));
while let Some(scid_res) = scids.next().await {
let scid: i64 = scid_res.unwrap().get(0);
let permit = Arc::clone(&sem).acquire_owned().await.unwrap();
let logger = logger.clone();
tokio::spawn(async move {
let rest_client = Arc::new(RestClient::new(bitcoin_rest_endpoint()).unwrap());
let txo = ChainVerifier::retrieve_txo(rest_client, scid as u64, logger).await
.expect("We shouldn't have accepted a channel announce with a bad TXO");
let client = crate::connect_to_db().await;
client.execute("UPDATE channel_announcements SET funding_amount_sats = $1 WHERE short_channel_id = $2", &[&(txo.value.to_sat() as i64), &scid]).await.unwrap();
std::mem::drop(permit);
});
}
let _all_updates_complete = sem.acquire_many(16).await.unwrap();
client.execute("ALTER TABLE channel_announcements ALTER funding_amount_sats SET NOT NULL", &[]).await.unwrap();
client.execute("UPDATE config SET db_schema = 15 WHERE id = 1", &[]).await.unwrap();
});
}
if schema <= 1 || schema > SCHEMA_VERSION {
panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
}
Expand Down
11 changes: 10 additions & 1 deletion src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,16 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
counter.channel_announcements += 1;
}

let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
let mut funding_amount_sats = self.verifier.get_cached_funding_value(msg.contents.short_channel_id);
if funding_amount_sats.is_none() {
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async {
funding_amount_sats = self.verifier.retrieve_funding_value(msg.contents.short_channel_id).await.ok();
})});
}
let funding_amount_sats = funding_amount_sats
.expect("If we've accepted a ChannelAnnouncement, we must be able to fetch the TXO for it");

let gossip_message = GossipMessage::ChannelAnnouncement(msg, funding_amount_sats, None);
if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
Expand Down
14 changes: 9 additions & 5 deletions src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
logger: L
}

impl<L: Deref> GossipPersister<L> where L::Target: Logger {
impl<L: Deref + Clone + Send + Sync + 'static> GossipPersister<L> where L::Target: Logger {
pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
let (gossip_persistence_sender, gossip_persistence_receiver) =
mpsc::channel::<GossipMessage>(100);
Expand All @@ -50,7 +50,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {

let cur_schema = client.query("SELECT db_schema FROM config WHERE id = $1", &[&1]).await.unwrap();
if !cur_schema.is_empty() {
config::upgrade_db(cur_schema[0].get(0), &mut client).await;
config::upgrade_db(cur_schema[0].get(0), &mut client, self.logger.clone()).await;
}

let preparation = client.execute("set time zone UTC", &[]).await;
Expand Down Expand Up @@ -185,7 +185,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
#[cfg(test)]
tasks_spawned.push(_task);
},
GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
GossipMessage::ChannelAnnouncement(announcement, funding_value, seen_override) => {
let scid = announcement.contents.short_channel_id as i64;

// start with the type prefix, which is already known a priori
Expand All @@ -197,20 +197,24 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
funding_amount_sats, \
announcement_signed, \
seen \
) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
) VALUES ($1, $2, $3, TO_TIMESTAMP($4)) ON CONFLICT (short_channel_id) DO NOTHING", &[
&scid,
&(funding_value as i64),
&announcement_signed,
&(seen_override.unwrap() as f64)
])).await.unwrap().unwrap();
} else {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
funding_amount_sats, \
announcement_signed \
) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
) VALUES ($1, $2, $3) ON CONFLICT (short_channel_id) DO NOTHING", &[
&scid,
&(funding_value as i64),
&announcement_signed
])).await.unwrap().unwrap();
}
Expand Down
26 changes: 13 additions & 13 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ async fn test_trivial_setup() {
network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();

receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
drop(receiver);
Expand Down Expand Up @@ -357,7 +357,7 @@ async fn test_node_announcement_delta_detection() {
network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();

receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp))).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap();
}
Expand Down Expand Up @@ -450,7 +450,7 @@ async fn test_unidirectional_intermediate_update_consideration() {
network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();

receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp))).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
Expand Down Expand Up @@ -521,7 +521,7 @@ async fn test_bidirectional_intermediate_update_consideration() {
network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();

receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp))).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
Expand Down Expand Up @@ -577,7 +577,7 @@ async fn test_channel_reminders() {
network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();

receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
}
Expand All @@ -598,7 +598,7 @@ async fn test_channel_reminders() {
network_graph_arc.update_channel_unsigned(&update_7.contents).unwrap();
network_graph_arc.update_channel_unsigned(&update_8.contents).unwrap();

receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 10))).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 5))).await.unwrap();
receiver.send(GossipMessage::ChannelUpdate(update_3, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
Expand Down Expand Up @@ -653,7 +653,7 @@ async fn test_full_snapshot_recency() {
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
let announcement = generate_channel_announcement(short_channel_id);
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();

{ // direction false
{ // first update
Expand Down Expand Up @@ -734,7 +734,7 @@ async fn test_full_snapshot_recency_with_wrong_seen_order() {
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
let announcement = generate_channel_announcement(short_channel_id);
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();

{ // direction false
{ // first update, seen latest
Expand Down Expand Up @@ -815,7 +815,7 @@ async fn test_full_snapshot_recency_with_wrong_propagation_order() {
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
let announcement = generate_channel_announcement(short_channel_id);
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();

{ // direction false
// apply updates in their timestamp order
Expand Down Expand Up @@ -898,7 +898,7 @@ async fn test_full_snapshot_mutiny_scenario() {
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
let announcement = generate_channel_announcement(short_channel_id);
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();

{ // direction false
{
Expand Down Expand Up @@ -1036,13 +1036,13 @@ async fn test_full_snapshot_interlaced_channel_timestamps() {
{ // main channel
let announcement = generate_channel_announcement(main_channel_id);
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
}

{ // secondary channel
let announcement = generate_channel_announcement(secondary_channel_id);
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
}

{ // main channel
Expand Down Expand Up @@ -1145,7 +1145,7 @@ async fn test_full_snapshot_persistence() {
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
let announcement = generate_channel_announcement(short_channel_id);
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();

{ // direction true
let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
Expand Down
2 changes: 1 addition & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) type GossipPeerManager<L> = Arc<PeerManager<lightning_net_tokio::Sock
pub(crate) enum GossipMessage {
NodeAnnouncement(NodeAnnouncement, Option<u32>),
// the second element is an optional override for the seen value
ChannelAnnouncement(ChannelAnnouncement, Option<u32>),
ChannelAnnouncement(ChannelAnnouncement, u64, Option<u32>),
ChannelUpdate(ChannelUpdate, Option<u32>),
}

Expand Down
31 changes: 27 additions & 4 deletions src/verifier.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::io::ErrorKind;
use std::ops::Deref;
use std::sync::Arc;
Expand All @@ -23,6 +24,9 @@ pub(crate) struct ChainVerifier<L: Deref + Clone + Send + Sync + 'static> where
graph: Arc<NetworkGraph<L>>,
outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
peer_handler: Mutex<Option<GossipPeerManager<L>>>,
/// A cache on the funding amounts for each channel that we've looked up, mapping from SCID to
/// funding satoshis.
channel_funding_amounts: Arc<Mutex<HashMap<u64, u64>>>,
logger: L
}

Expand All @@ -35,14 +39,28 @@ impl<L: Deref + Clone + Send + Sync + 'static> ChainVerifier<L> where L::Target:
outbound_gossiper,
graph,
peer_handler: Mutex::new(None),
logger
channel_funding_amounts: Arc::new(Mutex::new(HashMap::new())),
logger,
}
}
pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager<L>) {
*self.peer_handler.lock().unwrap() = Some(peer_handler);
}

async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
pub(crate) fn get_cached_funding_value(&self, scid: u64) -> Option<u64> {
self.channel_funding_amounts.lock().unwrap().get(&scid).map(|v| *v)
}

pub(crate) async fn retrieve_funding_value(&self, scid: u64) -> Result<u64, UtxoLookupError> {
Self::retrieve_cache_txo(Arc::clone(&self.rest_client), Some(Arc::clone(&self.channel_funding_amounts)), scid, self.logger.clone())
.await.map(|txo| txo.value.to_sat())
}

pub(crate) async fn retrieve_txo(client: Arc<RestClient>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
Self::retrieve_cache_txo(client, None, short_channel_id, logger).await
}

async fn retrieve_cache_txo(client: Arc<RestClient>, channel_funding_amounts: Option<Arc<Mutex<HashMap<u64, u64>>>>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
let output_index = (short_channel_id & 0xffff) as u16;
Expand All @@ -57,7 +75,11 @@ impl<L: Deref + Clone + Send + Sync + 'static> ChainVerifier<L> where L::Target:
log_error!(logger, "Could't find output {} in transaction {}", output_index, transaction.compute_txid());
return Err(UtxoLookupError::UnknownTx);
}
Ok(transaction.output.swap_remove(output_index as usize))
let txo = transaction.output.swap_remove(output_index as usize);
if let Some(channel_funding_amounts) = channel_funding_amounts {
channel_funding_amounts.lock().unwrap().insert(short_channel_id, txo.value.to_sat());
}
Ok(txo)
}

async fn retrieve_block(client: Arc<RestClient>, block_height: u32, logger: L) -> Result<Block, UtxoLookupError> {
Expand Down Expand Up @@ -99,10 +121,11 @@ impl<L: Deref + Clone + Send + Sync + 'static> UtxoLookup for ChainVerifier<L> w
let graph_ref = Arc::clone(&self.graph);
let client_ref = Arc::clone(&self.rest_client);
let gossip_ref = Arc::clone(&self.outbound_gossiper);
let channel_funding_amounts_cache_ref = Arc::clone(&self.channel_funding_amounts);
let pm_ref = self.peer_handler.lock().unwrap().clone();
let logger_ref = self.logger.clone();
tokio::spawn(async move {
let res = Self::retrieve_utxo(client_ref, short_channel_id, logger_ref).await;
let res = Self::retrieve_cache_txo(client_ref, Some(channel_funding_amounts_cache_ref), short_channel_id, logger_ref).await;
fut.resolve(&*graph_ref, &*gossip_ref, res);
if let Some(pm) = pm_ref { pm.process_events(); }
});
Expand Down
Loading