Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do DB insertions in parallel #71

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 87 additions & 40 deletions src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::ops::Deref;
use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};
use lightning::log_info;
use lightning::routing::gossip::NetworkGraph;
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Mutex, Semaphore};

use crate::config;
use crate::types::GossipMessage;

const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15);
const INSERT_PARALELLISM: usize = 16;

pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
Expand Down Expand Up @@ -93,10 +95,14 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
let mut latest_persistence_log = Instant::now() - Duration::from_secs(60);
let mut i = 0u32;
let mut latest_graph_cache_time = Instant::now();
let insert_limiter = Arc::new(Semaphore::new(INSERT_PARALELLISM));
let connections_cache = Arc::new(Mutex::new(Vec::with_capacity(INSERT_PARALELLISM)));
#[cfg(test)]
let mut tasks_spawned = Vec::new();
// TODO: it would be nice to have some sort of timeout here so after 10 seconds of
// inactivity, some sort of message could be broadcast signaling the activation of request
// processing
while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
while let Some(gossip_message) = self.gossip_persistence_receiver.recv().await {
i += 1; // count the persisted gossip messages

if latest_persistence_log.elapsed().as_secs() >= 60 {
Expand All @@ -109,36 +115,56 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
self.persist_network_graph();
latest_graph_cache_time = Instant::now();
}
insert_limiter.acquire().await.unwrap().forget();

match &gossip_message {
let limiter_ref = Arc::clone(&insert_limiter);
let connections_cache_ref = Arc::clone(&connections_cache);
match gossip_message {
GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
let scid = announcement.contents.short_channel_id as i64;

// start with the type prefix, which is already known a priori
let mut announcement_signed = Vec::new();
announcement.write(&mut announcement_signed).unwrap();

if cfg!(test) && seen_override.is_some() {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
announcement_signed, \
seen \
) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
&scid,
&announcement_signed,
&(seen_override.unwrap() as f64)
])).await.unwrap().unwrap();
} else {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
announcement_signed \
) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
&scid,
&announcement_signed
])).await.unwrap().unwrap();
}
let _task = tokio::spawn(async move {
let client;
{
let mut connections_set = connections_cache_ref.lock().await;
if connections_set.is_empty() {
mem::drop(connections_set);
client = crate::connect_to_db().await;
} else {
client = connections_set.pop().unwrap();
}
}
if cfg!(test) && seen_override.is_some() {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
announcement_signed, \
seen \
) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
&scid,
&announcement_signed,
&(seen_override.unwrap() as f64)
])).await.unwrap().unwrap();
} else {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
short_channel_id, \
announcement_signed \
) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
&scid,
&announcement_signed
])).await.unwrap().unwrap();
}
let mut connections_set = connections_cache_ref.lock().await;
connections_set.push(client);
limiter_ref.add_permits(1);
});
#[cfg(test)]
tasks_spawned.push(_task);
}
GossipMessage::ChannelUpdate(update, seen_override) => {
let scid = update.contents.short_channel_id as i64;
Expand Down Expand Up @@ -193,25 +219,46 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
// this may not be used outside test cfg
let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64;

tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute(insertion_statement, &[
&scid,
&timestamp,
#[cfg(test)]
&_seen_timestamp,
&(update.contents.flags as i16),
&direction,
&disable,
&cltv_expiry_delta,
&htlc_minimum_msat,
&fee_base_msat,
&fee_proportional_millionths,
&htlc_maximum_msat,
&update_signed
])).await.unwrap().unwrap();
let _task = tokio::spawn(async move {
let client;
{
let mut connections_set = connections_cache_ref.lock().await;
if connections_set.is_empty() {
mem::drop(connections_set);
client = crate::connect_to_db().await;
} else {
client = connections_set.pop().unwrap();
}
}
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute(insertion_statement, &[
&scid,
&timestamp,
#[cfg(test)]
&_seen_timestamp,
&(update.contents.flags as i16),
&direction,
&disable,
&cltv_expiry_delta,
&htlc_minimum_msat,
&fee_base_msat,
&fee_proportional_millionths,
&htlc_maximum_msat,
&update_signed
])).await.unwrap().unwrap();
let mut connections_set = connections_cache_ref.lock().await;
connections_set.push(client);
limiter_ref.add_permits(1);
});
#[cfg(test)]
tasks_spawned.push(_task);
}
}
}
#[cfg(test)]
for task in tasks_spawned {
task.await;
}
}

fn persist_network_graph(&self) {
Expand Down
Loading