diff --git a/Cargo.lock b/Cargo.lock index 0dde48cb..86ba6163 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,6 +86,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" +[[package]] +name = "async-recursion" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.31", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -277,6 +288,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -424,6 +444,15 @@ dependencies = [ "toml", ] +[[package]] +name = "cpufeatures" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.0.1" @@ -506,6 +535,16 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "cryptoxide" version = "0.4.4" @@ -518,10 +557,21 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "dolos" version = "0.4.1" dependencies = [ + "async-recursion", "async-stream", "async-trait", "bech32 0.8.1", @@ -707,6 +757,16 @@ dependencies = [ "syn 2.0.31", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.10" @@ -965,6 +1025,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "keccak" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f6d5ed8676d904364de097082f4e7d240b571b67989ced0240f08b7f966f940" +dependencies = [ + "cpufeatures", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -979,9 +1048,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "libloading" @@ -1107,9 +1176,9 @@ checksum = "5ef3a5eb0af5d357a7e44287d7ddd094f47de68cda7086c4917578f62e4294df" [[package]] name = "minicbor" -version = "0.19.1" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7005aaf257a59ff4de471a9d5538ec868a21586534fff7f85dd97d4043a6139" +checksum = "9d15f4203d71fdf90903c2696e55426ac97a363c67b218488a73b534ce7aca10" dependencies = [ "half", "minicbor-derive", @@ -1237,12 +1306,13 @@ checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" [[package]] name = "pallas" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad974cd8d2a275cefb78069ffece5f7a9b7144056d990624df5459c93eff0376" +version = "0.21.0" +source = "git+https://github.com/txpipe/pallas?rev=00575b4c550cf042e354922699be23d420c8a813#00575b4c550cf042e354922699be23d420c8a813" dependencies = [ "pallas-addresses", + "pallas-applying", "pallas-codec", + "pallas-configs", "pallas-crypto", "pallas-network", "pallas-primitives", @@ -1252,9 +1322,8 @@ dependencies = [ [[package]] name = "pallas-addresses" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50c4a7dfa19aa0df0214696c5f7987c155598e7b88ae766ebe2356ba9cec22b3" +version = "0.21.0" +source = "git+https://github.com/txpipe/pallas?rev=00575b4c550cf042e354922699be23d420c8a813#00575b4c550cf042e354922699be23d420c8a813" dependencies = [ "base58", "bech32 0.9.1", @@ -1262,25 +1331,52 @@ dependencies = [ "hex", "pallas-codec", "pallas-crypto", + "sha3", "thiserror", ] +[[package]] +name = "pallas-applying" +version = "0.21.0" +source = "git+https://github.com/txpipe/pallas?rev=00575b4c550cf042e354922699be23d420c8a813#00575b4c550cf042e354922699be23d420c8a813" +dependencies = [ + "pallas-addresses", + "pallas-codec", + "pallas-crypto", + "pallas-primitives", + "pallas-traverse", + "rand", +] + [[package]] name = "pallas-codec" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9af0e4ba3d3a14270af462220c9a2c64fd81667565bad8969e5d0b7783d0c27a" +version = "0.21.0" +source = "git+https://github.com/txpipe/pallas?rev=00575b4c550cf042e354922699be23d420c8a813#00575b4c550cf042e354922699be23d420c8a813" dependencies = [ "hex", - "minicbor 0.19.1", + "minicbor 0.20.0", "serde", + "thiserror", +] + +[[package]] +name = "pallas-configs" +version = "0.21.0" +source = "git+https://github.com/txpipe/pallas?rev=00575b4c550cf042e354922699be23d420c8a813#00575b4c550cf042e354922699be23d420c8a813" +dependencies = [ + "base64 0.21.4", + "hex", + "pallas-addresses", + "pallas-codec", + "pallas-crypto", + "serde", + "serde_json", ] [[package]] name = "pallas-crypto" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0aa8d624bde287a0eccf62cc5fbc5b391adb237cfb14fbe884b0bcfff11d1c8" +version = "0.21.0" +source = "git+https://github.com/txpipe/pallas?rev=00575b4c550cf042e354922699be23d420c8a813#00575b4c550cf042e354922699be23d420c8a813" dependencies = [ "cryptoxide", "hex", @@ -1292,15 +1388,16 @@ dependencies = [ [[package]] name = "pallas-network" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60ea2c066436de4050244d4e9ea7e3e2726d58c1d94668e2553480c35040f31c" +version = "0.21.0" +source = "git+https://github.com/txpipe/pallas?rev=00575b4c550cf042e354922699be23d420c8a813#00575b4c550cf042e354922699be23d420c8a813" dependencies = [ "byteorder", "hex", "itertools", "pallas-codec", "pallas-crypto", + "rand", + "socket2 0.5.5", "thiserror", "tokio", "tracing", @@ -1308,9 +1405,8 @@ dependencies = [ [[package]] name = "pallas-primitives" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4258913d6c5f8787a17735c3c4220fcc73ca379845f7937b9d518bbad5964a4" +version = "0.21.0" +source = "git+https://github.com/txpipe/pallas?rev=00575b4c550cf042e354922699be23d420c8a813#00575b4c550cf042e354922699be23d420c8a813" dependencies = [ "base58", "bech32 0.9.1", @@ -1324,24 +1420,23 @@ dependencies = [ [[package]] name = "pallas-traverse" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc2efb1e50ccf9796d5464c5ea4ce164ddb256333f412cad1509748ffa1417f1" +version = "0.21.0" +source = "git+https://github.com/txpipe/pallas?rev=00575b4c550cf042e354922699be23d420c8a813#00575b4c550cf042e354922699be23d420c8a813" dependencies = [ "hex", "pallas-addresses", "pallas-codec", "pallas-crypto", "pallas-primitives", + "paste", "serde", "thiserror", ] [[package]] name = "pallas-utxorpc" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5c13a8d9134f02963ea2dd1cd60d69a22653d6ba10d8f3490c9bc79f1248287" +version = "0.21.0" +source = "git+https://github.com/txpipe/pallas?rev=00575b4c550cf042e354922699be23d420c8a813#00575b4c550cf042e354922699be23d420c8a813" dependencies = [ "pallas-codec", "pallas-primitives", @@ -1372,6 +1467,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "pathdiff" version = "0.2.1" @@ -1829,6 +1930,16 @@ dependencies = [ "serde", ] +[[package]] +name = "sha3" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75872d278a8f37ef87fa0ddbda7802605cb18344497949862c0d4dcb291eba60" +dependencies = [ + "digest", + "keccak", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -1877,9 +1988,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.3" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", "windows-sys", @@ -2094,7 +2205,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", - "socket2 0.5.3", + "socket2 0.5.5", "tokio-macros", "windows-sys", ] @@ -2330,6 +2441,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "typenum" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -2436,6 +2553,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + [[package]] name = "want" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index e86047bd..a8d344ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,8 @@ authors = ["Santiago Carmuega "] [dependencies] -pallas = "=0.19.1" +# pallas = "=0.21.0" +pallas = { git = "https://github.com/txpipe/pallas", rev = "00575b4c550cf042e354922699be23d420c8a813" } # pallas = { path = "../pallas/pallas" } gasket = { version = "^0.5", features = ["derive"] } @@ -49,6 +50,7 @@ tonic-web = "0.9.2" tokio-stream = { version = "0.1.14", features = ["sync"] } futures-util = "0.3.28" async-stream = "0.3.5" +async-recursion = "1.0.5" [dev-dependencies] tempfile = "3.3.0" diff --git a/examples/sync-preview/dolos.toml b/examples/sync-preview/dolos.toml index d68c06a5..03af1435 100644 --- a/examples/sync-preview/dolos.toml +++ b/examples/sync-preview/dolos.toml @@ -4,7 +4,7 @@ network_magic = 2 [rolldb] path = "./tmp/rolldb" -k_param = 1000 +k_param = 10000 [applydb] path = "./tmp/applydb" @@ -13,5 +13,5 @@ path = "./tmp/applydb" listen_address = "[::]:50051" [serve.ouroboros] -listen_address = "localhost:30013" +listen_address = "127.0.0.1:30013" magic = 2 diff --git a/src/bin/dolos/daemon.rs b/src/bin/dolos/daemon.rs index 0acb6d6b..9f2ad43d 100644 --- a/src/bin/dolos/daemon.rs +++ b/src/bin/dolos/daemon.rs @@ -14,12 +14,8 @@ pub async fn run( policy: &gasket::runtime::Policy, _args: &Args, ) -> Result<(), Error> { - tracing::subscriber::set_global_default( - tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::INFO) - .finish(), - ) - .unwrap(); + tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) + .unwrap(); let rolldb_path = config .rolldb @@ -27,8 +23,12 @@ pub async fn run( .as_deref() .unwrap_or_else(|| Path::new("/rolldb")); - let rolldb = - RollDB::open(rolldb_path, config.rolldb.k_param.unwrap_or(1000)).map_err(Error::storage)?; + let rolldb = RollDB::open( + rolldb_path, + config.rolldb.k_param.unwrap_or(1000), + config.rolldb.k_param_buffer.unwrap_or_default(), + ) + .map_err(Error::storage)?; let applydb_path = config .applydb diff --git a/src/bin/dolos/main.rs b/src/bin/dolos/main.rs index 3fb0dcf6..e2dc773d 100644 --- a/src/bin/dolos/main.rs +++ b/src/bin/dolos/main.rs @@ -32,6 +32,7 @@ struct Cli { pub struct RolldbConfig { path: Option, k_param: Option, + k_param_buffer: Option, } #[derive(Deserialize)] diff --git a/src/bin/dolos/read.rs b/src/bin/dolos/read.rs index 50f3decc..55cef75a 100644 --- a/src/bin/dolos/read.rs +++ b/src/bin/dolos/read.rs @@ -20,6 +20,7 @@ pub fn run(config: &super::Config, _args: &Args) -> Result<(), Error> { .as_deref() .unwrap_or_else(|| Path::new("/rolldb")), config.rolldb.k_param.unwrap_or(1000), + config.rolldb.k_param_buffer.unwrap_or_default(), ) .unwrap(); diff --git a/src/bin/dolos/serve.rs b/src/bin/dolos/serve.rs index 68fef2d1..4c08c4cb 100644 --- a/src/bin/dolos/serve.rs +++ b/src/bin/dolos/serve.rs @@ -20,8 +20,12 @@ pub async fn run(config: super::Config, _args: &Args) -> Result<(), Error> { .as_deref() .unwrap_or_else(|| Path::new("/rolldb")); - let db = - RollDB::open(rolldb_path, config.rolldb.k_param.unwrap_or(1000)).map_err(Error::config)?; + let db = RollDB::open( + rolldb_path, + config.rolldb.k_param.unwrap_or(1000), + config.rolldb.k_param_buffer.unwrap_or_default(), + ) + .map_err(Error::config)?; dolos::serve::serve(config.serve, db).await?; diff --git a/src/bin/dolos/sync.rs b/src/bin/dolos/sync.rs index f55ccef7..da7b785e 100644 --- a/src/bin/dolos/sync.rs +++ b/src/bin/dolos/sync.rs @@ -26,8 +26,12 @@ pub fn run( .as_deref() .unwrap_or_else(|| Path::new("/rolldb")); - let rolldb = - RollDB::open(rolldb_path, config.rolldb.k_param.unwrap_or(1000)).map_err(Error::storage)?; + let rolldb = RollDB::open( + rolldb_path, + config.rolldb.k_param.unwrap_or(1000), + config.rolldb.k_param_buffer.unwrap_or_default(), + ) + .map_err(Error::storage)?; let applydb_path = config .applydb diff --git a/src/serve/ouroboros/blockfetch.rs b/src/serve/ouroboros/blockfetch.rs new file mode 100644 index 00000000..1ea8bc42 --- /dev/null +++ b/src/serve/ouroboros/blockfetch.rs @@ -0,0 +1,69 @@ +use pallas::network::miniprotocols::{ + blockfetch::{self, BlockRequest}, + Point, +}; +use tracing::{error, info, warn}; + +use crate::{prelude::Error, storage::rolldb::RollDB}; + +pub async fn handle_blockfetch(db: RollDB, mut protocol: blockfetch::Server) -> Result<(), Error> { + loop { + match protocol.recv_while_idle().await { + Ok(Some(BlockRequest((p1, p2)))) => { + let from = match p1 { + Point::Origin => None, + Point::Specific(slot, hash) => { + let parsed_hash = TryInto::<[u8; 32]>::try_into(hash) + .map_err(|_| Error::client("malformed hash"))? + .into(); + + Some((slot, parsed_hash)) + } + }; + + let to = match p2 { + Point::Origin => return protocol.send_no_blocks().await.map_err(Error::server), + Point::Specific(slot, hash) => { + let parsed_hash = TryInto::<[u8; 32]>::try_into(hash) + .map_err(|_| Error::client("malformed hash"))? + .into(); + + (slot, parsed_hash) + } + }; + + if let Some(mut iter) = db.read_chain_range(from, to).map_err(Error::storage)? { + protocol.send_start_batch().await.map_err(Error::server)?; + + while let Some(point) = iter.next() { + let (_, hash) = point.map_err(Error::storage)?; + + let block_bytes = match db.get_block(hash).map_err(Error::storage)? { + Some(b) => b, + None => { + error!("could not find block bytes for {hash}"); + return Err(Error::server( + "could not find block bytes for block in chainkv", + )); + } + }; + + protocol + .send_block(block_bytes) + .await + .map_err(Error::server)?; + } + + protocol.send_batch_done().await.map_err(Error::server)?; + } else { + return protocol.send_no_blocks().await.map_err(Error::server); + } + } + Ok(None) => info!("peer ended blockfetch protocol"), + Err(e) => { + warn!("error receiving blockfetch message: {:?}", e); + return Err(Error::client(e)); + } + } + } +} diff --git a/src/serve/ouroboros/chainsync.rs b/src/serve/ouroboros/chainsync.rs new file mode 100644 index 00000000..65fbaf2f --- /dev/null +++ b/src/serve/ouroboros/chainsync.rs @@ -0,0 +1,394 @@ +use async_recursion::async_recursion; +use pallas::{ + crypto::hash::Hash, + ledger::traverse::MultiEraBlock, + network::miniprotocols::{ + chainsync::{self, ClientRequest, HeaderContent, Tip}, + Point, + }, +}; +use tracing::{debug, info, instrument, warn}; + +use crate::{ + prelude::Error, + storage::{ + kvtable::DBInt, + rolldb::{wal::WalAction, RollDB}, + }, +}; + +pub struct N2NChainSyncHandler { + roll_db: RollDB, + protocol: chainsync::N2NServer, + intersect: Option, + cursor: Option<(u64, Hash<32>)>, +} + +impl N2NChainSyncHandler { + pub fn new(roll_db: RollDB, protocol: chainsync::N2NServer) -> Result { + Ok(Self { + roll_db, + protocol, + intersect: None, + cursor: None, + }) + } + + #[instrument(skip_all)] + pub async fn begin(&mut self) -> Result<(), Error> { + info!("beginning n2n chainsync handler"); + match self + .protocol + .recv_while_idle() + .await + .map_err(Error::server)? + { + Some(ClientRequest::Intersect(points)) => self.handle_intersect(points).await, + Some(ClientRequest::RequestNext) => self.handle_crawling(None).await, + None => { + debug!("client ended protocol"); + return Ok(()); + } + } + } + + // TODO: loop instead? + #[async_recursion] + #[instrument(skip_all)] + async fn handle_intersect(&mut self, points: Vec) -> Result<(), Error> { + info!(?points, "handling intersect request"); + + let tip = self + .roll_db + .find_tip() + .map_err(Error::server)? + .map(db_tip_to_protocol) + .unwrap_or(Tip(Point::Origin, 0)); + + if let Some(found) = self.find_valid_intersection(&points) { + info!(?found, "found intersect point"); + + self.intersect = Some(found.clone()); + self.cursor = None; + + self.protocol + .send_intersect_found(found, tip) + .await + .map_err(Error::server)?; + } else { + warn!("could not intersect"); + + self.intersect = None; + self.cursor = None; + + self.protocol + .send_intersect_not_found(tip) + .await + .map_err(Error::server)?; + } + + // --- + + match self + .protocol + .recv_while_idle() + .await + .map_err(Error::server)? + { + Some(ClientRequest::Intersect(points)) => return self.handle_intersect(points).await, + Some(ClientRequest::RequestNext) => { + return self + .handle_crawling(self.intersect.as_ref().map(|x| x.slot_or_default())) + .await + } + None => { + debug!("client ended protocol"); + return Ok(()); + } + } + } + + #[instrument(skip_all)] + async fn handle_crawling(&mut self, from: Option) -> Result<(), Error> { + info!(?from, "entering chainkv crawling mode"); + + // --- initialise new crawler + + let mut crawler = self.roll_db.crawl_chain_from(from); + + let mut tip = self + .roll_db + .find_tip() + .map_err(Error::server)? + .map(db_tip_to_protocol) + .unwrap_or(Tip(Point::Origin, 0)); + + let mut mutable_slot = tip + .0 + .slot_or_default() + .saturating_sub(self.roll_db.k_param()); + + info!(?tip, ?mutable_slot, "fetched tip from db"); + + // --- keep sending blocks while we receive RequestNexts + + // if we intersected with crawler with a point then skip that point + if matches!(self.intersect.as_ref(), Some(Point::Specific(_, _))) { + crawler.next(); + } + + loop { + if let Some(next) = crawler.next() { + info!(?next, "next chainkv point"); + let (slot, hash) = next.map_err(Error::server)?; + + tip = self + .roll_db + .find_tip() + .map_err(Error::server)? + .map(db_tip_to_protocol) + .unwrap_or(Tip(Point::Origin, 0)); + + // --- if we have reached mutable part of chainKV snapshot, + // check if we can swap to the WAL, otherwise take new snapshot + + if slot >= mutable_slot { + if let Some((slot, hash)) = self.cursor { + if let Some(seq) = self + .roll_db + .apply_position_in_wal(slot, &hash) + .map_err(Error::server)? + { + info!(?self.cursor, "cursor found on WAL, switching to WAL crawling"); + drop(crawler); + return self.crawl_with_wal(Some(seq)).await; + } else { + info!(?self.cursor, "mutable but no WAL intersect, refreshing chainKV crawler"); + + // take new chainKV snapshot + crawler = self.roll_db.crawl_chain_from(self.cursor.map(|x| x.0)); + + // update mutable point for new snapshot + + tip = self + .roll_db + .find_tip() + .map_err(Error::server)? + .map(db_tip_to_protocol) + .unwrap_or(Tip(Point::Origin, 0)); + + mutable_slot = tip + .0 + .slot_or_default() + .saturating_sub(self.roll_db.k_param()); + + // skip cursor (iterator starts at cursor) + crawler.next(); + + continue; + } + } else { + // if we are immediately mutable (have no cursor), + // skip chainKV and crawl WAL from beginning + + info!(?self.cursor, "mutable without cursor, switching to WAL crawling"); + + drop(crawler); + + if let Some(Point::Specific(i_slot, i_hash)) = self.intersect.as_ref() { + let i_hash: [u8; 32] = i_hash.clone().try_into().unwrap(); + + let seq = self + .roll_db + .apply_position_in_wal(*i_slot, &(i_hash.into())) + .map_err(Error::server)? + .ok_or(Error::server("intersect in chainkv but not WAL despite being immediately mutable"))?; + + return self.crawl_with_wal(Some(seq)).await; + } else { + return self.crawl_with_wal(None).await; + } + } + } + + // --- send block to client + + let block = self + .roll_db + .get_block(hash) + .map_err(Error::server)? + .expect("block content not found"); + + let block = MultiEraBlock::decode(&block).expect("invalid block cbor"); + + let content = HeaderContent { + variant: block.era() as u8, + byron_prefix: None, + cbor: block.header().cbor().to_vec(), + }; + + self.protocol + .send_roll_forward(content, tip) + .await + .map_err(Error::server)?; + + self.cursor = Some((slot, hash)); + + // --- + + match self + .protocol + .recv_while_idle() + .await + .map_err(Error::server)? + { + Some(ClientRequest::RequestNext) => info!("client request next"), + Some(ClientRequest::Intersect(points)) => { + drop(crawler); + return self.handle_intersect(points).await; + } + None => { + warn!("client ended protocol"); + return Ok(()); + } + } + } else { + return Err(Error::server( + "chainKV exhausted without finding WAL intersection", + )); + } + } + } + + #[instrument(skip_all)] + async fn crawl_with_wal(&mut self, from: Option) -> Result<(), Error> { + info!(?from, "entering WAL crawling mode"); + + let mut last_seq = None; + + let intersected = from.is_some(); + + // TODO: race condition between checking wal contains point and creating iterator + let mut crawler = self.roll_db.crawl_wal(from.map(|x| x.into())); + + // skip the WAL intersect + if intersected { + crawler.next(); + } + + // --- keep iterating WAL while we receive RequestNexts + + loop { + if let Some(next) = crawler.next() { + info!(?next, "next WAL entry"); + let (seq, wal_value) = next.map_err(Error::server)?; + + last_seq = Some(seq); + + let tip = self + .roll_db + .find_tip() + .map_err(Error::server)? + .map(db_tip_to_protocol) + .unwrap_or(Tip(Point::Origin, 0)); + + // --- + + let slot = wal_value.slot(); + let hash = *wal_value.hash(); + + match wal_value.action() { + WalAction::Apply => { + let block = self + .roll_db + .get_block(hash) + .map_err(Error::server)? + .expect("block content not found"); + + let block = MultiEraBlock::decode(&block).expect("invalid block cbor"); + + let content = HeaderContent { + variant: block.era() as u8, + byron_prefix: None, + cbor: block.header().cbor().to_vec(), + }; + + self.protocol + .send_roll_forward(content, tip) + .await + .map_err(Error::server)?; + + self.cursor = Some((slot, hash)); + } + WalAction::Mark => { + self.protocol + .send_roll_backward(Point::Specific(slot, hash.to_vec()), tip) + .await + .map_err(Error::server)?; + + self.cursor = Some((slot, hash)); + } + // skip this wal action without trying to receive a new message + WalAction::Undo => continue, + }; + } else { + info!(?self.cursor, "sending await reply"); + + self.protocol + .send_await_reply() + .await + .map_err(Error::server)?; + + self.roll_db.tip_change.notified().await; + info!(?last_seq, "tip change notified, refreshing WAL crawler"); + drop(crawler); + crawler = self.roll_db.crawl_wal(last_seq); + crawler.next(); + + continue; + } + + // --- + + match self + .protocol + .recv_while_idle() + .await + .map_err(Error::server)? + { + Some(ClientRequest::RequestNext) => info!("client request next"), + Some(ClientRequest::Intersect(points)) => { + drop(crawler); + return self.handle_intersect(points).await; + } + None => { + warn!("client ended protocol"); + return Ok(()); + } + } + } + } + + fn find_valid_intersection(&self, points: &[Point]) -> Option { + for point in points { + match point { + Point::Origin => return Some(point.clone()), + Point::Specific(slot, hash) => { + let hash: [u8; 32] = hash[0..32].try_into().unwrap(); + let hash = Hash::<32>::from(hash); + + if self.roll_db.chain_contains(*slot, &hash).unwrap() { + return Some(point.clone()); + } + } + } + } + + None + } +} + +fn db_tip_to_protocol(tip: (u64, Hash<32>)) -> Tip { + // TODO: get block height from db + Tip(Point::Specific(tip.0, tip.1.to_vec()), 0) +} diff --git a/src/serve/ouroboros/mod.rs b/src/serve/ouroboros/mod.rs index a116d999..d3b625ec 100644 --- a/src/serve/ouroboros/mod.rs +++ b/src/serve/ouroboros/mod.rs @@ -1,201 +1,50 @@ -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::MultiEraBlock; +use std::time::Duration; + use pallas::network::facades::PeerServer; -use pallas::network::miniprotocols::blockfetch::{self, BlockRequest}; -use pallas::network::miniprotocols::chainsync::{HeaderContent, Tip}; -use pallas::network::miniprotocols::{chainsync, Point}; +use pallas::network::miniprotocols::keepalive; use serde::{Deserialize, Serialize}; use tokio::join; use tokio::net::TcpListener; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{info, instrument}; use crate::prelude::*; use crate::storage::rolldb::RollDB; +use self::blockfetch::handle_blockfetch; +use self::chainsync::N2NChainSyncHandler; + #[cfg(test)] mod tests; +mod blockfetch; +mod chainsync; + #[derive(Serialize, Deserialize, Clone)] pub struct Config { listen_address: String, magic: u64, } -async fn handle_blockfetch(db: RollDB, mut protocol: blockfetch::Server) -> Result<(), Error> { - loop { - match protocol.recv_while_idle().await { - Ok(Some(BlockRequest((p1, p2)))) => { - let from = match p1 { - Point::Origin => None, - Point::Specific(slot, hash) => { - let parsed_hash = TryInto::<[u8; 32]>::try_into(hash) - .map_err(|_| Error::client("malformed hash"))? - .into(); - - Some((slot, parsed_hash)) - } - }; - - let to = match p2 { - Point::Origin => return protocol.send_no_blocks().await.map_err(Error::server), - Point::Specific(slot, hash) => { - let parsed_hash = TryInto::<[u8; 32]>::try_into(hash) - .map_err(|_| Error::client("malformed hash"))? - .into(); - - (slot, parsed_hash) - } - }; - - if let Some(mut iter) = db.read_chain_range(from, to).map_err(Error::storage)? { - protocol.send_start_batch().await.map_err(Error::server)?; - - while let Some(point) = iter.next() { - let (_, hash) = point.map_err(Error::storage)?; - - let block_bytes = match db.get_block(hash).map_err(Error::storage)? { - Some(b) => b, - None => { - error!("could not find block bytes for {hash}"); - return Err(Error::server( - "could not find block bytes for block in chainkv", - )); - } - }; - - protocol - .send_block(block_bytes) - .await - .map_err(Error::server)?; - } - - protocol.send_batch_done().await.map_err(Error::server)?; - } else { - return protocol.send_no_blocks().await.map_err(Error::server); - } - } - Ok(None) => info!("peer ended blockfetch protocol"), - Err(e) => { - warn!("error receiving blockfetch message: {:?}", e); - return Err(Error::client(e)); - } - } - } -} - -fn find_valid_intersection(db: &RollDB, points: &[Point]) -> Option { - for point in points { - match point { - Point::Origin => return Some(point.clone()), - Point::Specific(slot, hash) => { - let hash: [u8; 32] = hash[0..32].try_into().unwrap(); - let hash = Hash::<32>::from(hash); - - if db.chain_contains(*slot, &hash).unwrap() { - return Some(point.clone()); - } - } - } - } - - None -} - -fn db_tip_to_protocol(tip: (u64, Hash<32>)) -> Tip { - // TODO: get block height from db - Tip(Point::Specific(tip.0, tip.1.to_vec()), 0) -} - -#[instrument(skip_all)] -async fn handle_chainsync(db: RollDB, mut protocol: chainsync::N2NServer) -> Result<(), Error> { - let mut intersect = db.crawl_chain_from(None); - - loop { - debug!("waiting for request"); - - let req = protocol.recv_while_idle().await.map_err(Error::server)?; - - debug!("new client request"); - - let tip = db - .find_tip() - .map_err(Error::server)? - .map(db_tip_to_protocol) - .unwrap_or(Tip(Point::Origin, 0)); - - debug!(?tip, "fetched tip from db"); - - match req { - Some(x) => match x { - chainsync::ClientRequest::Intersect(points) => { - debug!(?points, "intersect request"); - - if let Some(found) = find_valid_intersection(&db, &points) { - intersect = match found { - Point::Origin => db.crawl_chain_from(None), - Point::Specific(x, _) => db.crawl_chain_from(Some(x)), - }; - - protocol - .send_intersect_found(found, tip) - .await - .map_err(Error::server)?; - } else { - protocol - .send_intersect_not_found(tip) - .await - .map_err(Error::server)?; - } - } - chainsync::ClientRequest::RequestNext => { - let next = intersect.next(); - - if let Some(next) = next { - let (_, hash) = next.map_err(Error::server)?; - - let block = db - .get_block(hash) - .map_err(Error::server)? - .expect("block content not found"); - - let block = MultiEraBlock::decode(&block).expect("invalid block cbor"); - - let content = HeaderContent { - variant: 1, // TODO - byron_prefix: None, - cbor: block.header().cbor().to_vec(), - }; - - protocol - .send_roll_forward(content, tip) - .await - .map_err(Error::server)?; - } else { - protocol.send_await_reply().await.map_err(Error::server)?; - } - } - }, - None => todo!(), - } - } -} - #[instrument(skip_all)] async fn peer_session(db: RollDB, peer: PeerServer) -> Result<(), Error> { let PeerServer { - blockfetch, + plexer, chainsync, - plexer_handle, + blockfetch, + keepalive, .. } = peer; - let l1 = handle_chainsync(db.clone(), chainsync); + let mut n2n_chainsync_handler = N2NChainSyncHandler::new(db.clone(), chainsync)?; + + let l1 = n2n_chainsync_handler.begin(); let l2 = handle_blockfetch(db.clone(), blockfetch); + let l3 = handle_keepalive(keepalive); - join!(l1, l2); + let _ = join!(l1, l2, l3); - plexer_handle.abort(); + plexer.abort().await; Ok(()) } @@ -217,6 +66,19 @@ pub async fn serve(config: Config, db: RollDB) -> Result<(), Error> { let db = db.clone(); - let handle = tokio::spawn(async move { peer_session(db, peer).await }); + let _handle = tokio::spawn(async move { peer_session(db, peer).await }); } } + +async fn handle_keepalive(mut keepalive: keepalive::Server) -> Result<(), Error> { + while keepalive + .keepalive_receive_and_respond() + .await + .map_err(Error::server)? + .is_some() + { + tokio::time::sleep(Duration::from_secs(15)).await + } + + Ok(()) +} diff --git a/src/serve/ouroboros/tests.rs b/src/serve/ouroboros/tests.rs index 1d7cdcdd..0ca33625 100644 --- a/src/serve/ouroboros/tests.rs +++ b/src/serve/ouroboros/tests.rs @@ -4,11 +4,11 @@ use pallas::{ crypto::hash::Hash, network::{ facades::PeerClient, - miniprotocols::{chainsync, Point, MAINNET_MAGIC}, + miniprotocols::{Point, MAINNET_MAGIC}, }, }; -const dummy_blocks: [(u64, &str); 5] = [ +const DUMMY_BLOCKS: [(u64, &str); 5] = [ ( 0, "15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45", @@ -31,14 +31,20 @@ const dummy_blocks: [(u64, &str); 5] = [ ), ]; +const DUMMY_BLOCK_BYTES: &str = "820183851a2d964a09582089d9b5a5b8ddc8d7e5a6795e9774d97faf1efea59b2caf7eaf9f8c5b32059df484830058200e5751c026e543b2e8ab2eb06099daa1d1e5df47778f7787faab45cdf12fe3a85820afc0da64183bf2664f3d4eec7238d524ba607faeeab24fc100eb861dba69971b8300582025777aca9e4a73d48fc73b4f961d345b06d4a6f349cb7916570d35537d53479f5820d36a2619a672494604e11bb447cbcf5231e9f2ba25c2169177edc941bd50ad6c5820afc0da64183bf2664f3d4eec7238d524ba607faeeab24fc100eb861dba69971b58204e66280cd94d591072349bec0a3090a53aa945562efb6d08d56e53654b0e40988482000058401bc97a2fe02c297880ce8ecfd997fe4c1ec09ee10feeee9f686760166b05281d6283468ffd93becb0c956ccddd642df9b1244c915911185fa49355f6f22bfab98101820282840058401bc97a2fe02c297880ce8ecfd997fe4c1ec09ee10feeee9f686760166b05281d6283468ffd93becb0c956ccddd642df9b1244c915911185fa49355f6f22bfab9584061261a95b7613ee6bf2067dad77b70349729b0c50d57bc1cf30de0db4a1e73a885d0054af7c23fc6c37919dba41c602a57e2d0f9329a7954b867338d6fb2c9455840e03e62f083df5576360e60a32e22bbb07b3c8df4fcab8079f1d6f61af3954d242ba8a06516c395939f24096f3df14e103a7d9c2b80a68a9363cf1f27c7a4e307584044f18ef23db7d2813415cb1b62e8f3ead497f238edf46bb7a97fd8e9105ed9775e8421d18d47e05a2f602b700d932c181e8007bbfb231d6f1a050da4ebeeba048483000000826a63617264616e6f2d736c00a058204ba92aa320c60acc9ad7b9a64f2eda55c4d2ec28e604faf186708b4f0c4e8edf849fff8300d9010280d90102809fff82809fff81a0"; + type ServerHandle = tokio::task::JoinHandle>; async fn setup_server_client_pair(port: u32) -> (ServerHandle, PeerClient) { - let mut db = crate::storage::rolldb::RollDB::open_tmp(100).unwrap(); - - for (slot, hash) in dummy_blocks.iter() { - db.roll_forward(*slot, Hash::<32>::from_str(hash).unwrap(), vec![1u8; 200]) - .unwrap(); + let mut db = crate::storage::rolldb::RollDB::open_tmp(100, 0).unwrap(); + + for (slot, hash) in DUMMY_BLOCKS.iter() { + db.roll_forward( + *slot, + Hash::<32>::from_str(hash).unwrap(), + hex::decode(DUMMY_BLOCK_BYTES).unwrap(), + ) + .unwrap(); } let server = tokio::spawn(super::serve( @@ -58,6 +64,12 @@ async fn setup_server_client_pair(port: u32) -> (ServerHandle, PeerClient) { #[tokio::test] async fn test_blockfetch() { + // let _ = tracing::subscriber::set_global_default( + // tracing_subscriber::FmtSubscriber::builder() + // .with_max_level(tracing::Level::DEBUG) + // .finish(), + // ); + // use servers in different ports until we implement some sort of test harness let (server, mut client) = setup_server_client_pair(30031).await; @@ -86,8 +98,15 @@ async fn test_blockfetch() { server.abort(); } -//#[tokio::test] -async fn do_chainsync() { +#[ignore = "broken"] +#[tokio::test] +async fn test_chainsync() { + // let _ = tracing::subscriber::set_global_default( + // tracing_subscriber::FmtSubscriber::builder() + // .with_max_level(tracing::Level::DEBUG) + // .finish(), + // ); + // use servers in different ports until we implement some sort of test harness let (server, mut client) = setup_server_client_pair(30032).await; @@ -98,20 +117,14 @@ async fn do_chainsync() { let (point, _) = client .chainsync() - .find_intersect(known_points) + .find_intersect(known_points.clone()) .await .unwrap(); - // assert point matches + assert_eq!(point.unwrap(), known_points[0]); - for _ in 0..3 { - let next = client.chainsync().request_next().await.unwrap(); - - match next { - chainsync::NextResponse::RollForward(h, _) => {} - _ => unreachable!(), - }; - } + // hangs here on receiving next block, even though server sends it + let _next = client.chainsync().request_next().await.unwrap(); server.abort(); } diff --git a/src/storage/kvtable.rs b/src/storage/kvtable.rs index ab4974e0..64ea8af3 100644 --- a/src/storage/kvtable.rs +++ b/src/storage/kvtable.rs @@ -44,6 +44,7 @@ impl From for Hash<32> { } } +#[derive(Debug)] pub struct DBInt(pub u64); impl From for Box<[u8]> { diff --git a/src/storage/rolldb/mod.rs b/src/storage/rolldb/mod.rs index ba046d56..b5ad04ab 100644 --- a/src/storage/rolldb/mod.rs +++ b/src/storage/rolldb/mod.rs @@ -20,6 +20,8 @@ pub struct RollDB { pub tip_change: Arc, wal_seq: u64, k_param: u64, + // overlap immutable part of chainkv and WAL by k_param_buffer slots + k_param_buffer: u64, } pub struct BlockKV; @@ -46,7 +48,7 @@ impl<'a> Iterator for ChainEntryIterator<'a> { } impl RollDB { - pub fn open(path: impl AsRef, k_param: u64) -> Result { + pub fn open(path: impl AsRef, k_param: u64, k_param_buffer: u64) -> Result { let mut opts = Options::default(); opts.create_if_missing(true); opts.create_missing_column_families(true); @@ -65,14 +67,19 @@ impl RollDB { tip_change: Arc::new(tokio::sync::Notify::new()), wal_seq, k_param, + k_param_buffer, }) } + pub fn k_param(&self) -> u64 { + self.k_param + } + #[cfg(test)] - pub fn open_tmp(k_param: u64) -> Result { + pub fn open_tmp(k_param: u64, k_param_buffer: u64) -> Result { let path = tempfile::tempdir().unwrap().into_path(); - RollDB::open(path.clone(), k_param) + RollDB::open(path.clone(), k_param, k_param_buffer) } pub fn get_block(&self, hash: Hash<32>) -> Result, Error> { @@ -198,23 +205,20 @@ impl RollDB { pub fn crawl_wal_from_cursor( &self, start_after: Option<(BlockSlot, BlockHash)>, - ) -> Result> + '_, Error> { + ) -> Result> + '_>, Error> + { if let Some((slot, hash)) = start_after { - // TODO: Not sure this is 100% accurate: - // i.e Apply(X), Apply(cursor), Undo(cursor), Mark(x) - // We want to start at Apply(cursor) or Mark(cursor), but even then, - // what if we have more than one Apply(cursor), how do we know - // which is correct? + // try find most recent Apply(cursor) or Mark(cursor) in the WAL let found = WalKV::scan_until(&self.db, rocksdb::IteratorMode::End, |v| { - v.slot() == slot && v.hash().eq(&hash) + !v.is_undo() && v.slot() == slot && v.hash().eq(&hash) })?; match found { - Some(DBInt(seq)) => Ok(self.crawl_wal(Some(seq))), - None => Err(Error::NotFound), + Some(DBInt(seq)) => Ok(Some(self.crawl_wal(Some(seq)))), + None => Ok(None), } } else { - Ok(self.crawl_wal(None)) + Ok(Some(self.crawl_wal(None))) } } @@ -336,7 +340,9 @@ impl RollDB { // get the number of slots that have passed since the wal point let slot_delta = tip - value.slot(); - if slot_delta <= self.k_param { + // k_param_buffer is so we have can have some overlap between WAL + // and immutable chainKV + if slot_delta <= self.k_param + self.k_param_buffer { break; } else { wal::WalKV::stage_delete(&self.db, wal_key, &mut batch); @@ -359,15 +365,16 @@ impl RollDB { Ok(false) } - /// Check if a point (pair of slot and block hash) exists in the WalKV - pub fn wal_contains(&self, slot: BlockSlot, hash: &BlockHash) -> Result { - if let Some(_) = WalKV::scan_until(&self.db, rocksdb::IteratorMode::End, |v| { - v.slot() == slot && v.hash().eq(hash) - })? { - Ok(true) - } else { - Ok(false) - } + /// Return the WAL sequence number for the most recent occurence of an apply + /// action for the specified point, should it exist + pub fn apply_position_in_wal( + &self, + slot: BlockSlot, + hash: &BlockHash, + ) -> Result, Error> { + WalKV::scan_until(&self.db, rocksdb::IteratorMode::End, |v| { + v.slot() == slot && v.hash().eq(hash) && !v.is_undo() + }) } pub fn destroy(self) -> Result<(), Error> { @@ -382,7 +389,7 @@ mod tests { use super::{BlockBody, BlockHash, BlockSlot, RollDB}; pub fn with_tmp_db(k_param: u64, op: fn(db: &mut RollDB) -> T) { - let mut db = RollDB::open_tmp(k_param).unwrap(); + let mut db = RollDB::open_tmp(k_param, 0).unwrap(); op(&mut db); diff --git a/src/storage/rolldb/stream.rs b/src/storage/rolldb/stream.rs index 9eef34bd..2ee0d4a5 100644 --- a/src/storage/rolldb/stream.rs +++ b/src/storage/rolldb/stream.rs @@ -88,7 +88,7 @@ mod tests { #[tokio::test] async fn test_stream_waiting() { let path = tempfile::tempdir().unwrap().into_path(); - let mut db = super::RollDB::open(path.clone(), 30).unwrap(); + let mut db = super::RollDB::open(path.clone(), 30, 0).unwrap(); for i in 0..100 { let (slot, hash, body) = dummy_block(i * 10); diff --git a/src/sync/pull.rs b/src/sync/pull.rs index 9dc19a55..c33da945 100644 --- a/src/sync/pull.rs +++ b/src/sync/pull.rs @@ -205,7 +205,8 @@ impl gasket::framework::Worker for Worker { } async fn teardown(&mut self) -> Result<(), WorkerError> { - self.peer_session.abort(); + // TODO: doesn't work because teardown takes mutable ref but abort takes + // self.peer_session.abort(); Ok(()) } diff --git a/src/sync/roll.rs b/src/sync/roll.rs index 041a85c6..bb455b19 100644 --- a/src/sync/roll.rs +++ b/src/sync/roll.rs @@ -40,7 +40,12 @@ impl Stage { /// /// Reads from Wal using the latest known cursor and outputs the corresponding downstream events async fn catchup(&mut self) -> Result<(), WorkerError> { - let iter = self.rolldb.crawl_wal_from_cursor(self.cursor).or_panic()?; + let iter = self + .rolldb + .crawl_wal_from_cursor(self.cursor) + .or_panic()? + .ok_or(Error::server("could not find cursor on WAL for catchup")) + .or_panic()?; for wal in iter { let (_, wal) = wal.or_panic()?; diff --git a/src/tests/upstream.rs b/src/tests/upstream.rs index 4dec5717..d42fb773 100644 --- a/src/tests/upstream.rs +++ b/src/tests/upstream.rs @@ -50,7 +50,7 @@ fn test_mainnet_upstream() { ) .unwrap(); - let rolldb = RollDB::open("tmp", 10).unwrap(); + let rolldb = RollDB::open("tmp", 10, 0).unwrap(); let intersection = rolldb.intersect_options(5).unwrap().into_iter().collect();