From 227f2431f273410ecfa8683d54ed96fa99cec2b7 Mon Sep 17 00:00:00 2001 From: popcnt <142196625+popcnt1@users.noreply.github.com> Date: Sun, 12 Jan 2025 04:25:25 +0800 Subject: [PATCH] feat(rooch-da): add accumulator verification (#3175) * feat(rooch-da): add accumulator verification Introduce `SequencedTxStore` to manage sequenced transactions with MerkleAccumulator for Rooch. Added execution modes (`exec`, `seq`, `full`) in `ExecCommand` to control transaction sequencing and execution processes. * docs(rooch): update genesis source data link in README Updated the README to include the correct link to the genesis source data for Rooch MainNet. This ensures users can access the necessary files more easily and improves documentation clarity. --- Cargo.lock | 1 + crates/rooch/Cargo.toml | 1 + crates/rooch/src/commands/da/commands/exec.rs | 211 +++++++++++++----- crates/rooch/src/commands/da/commands/mod.rs | 113 ++++++++++ crates/rooch/src/commands/statedb/README.md | 3 +- .../src/commands/statedb/btc_source_data.md | 70 ------ 6 files changed, 269 insertions(+), 130 deletions(-) delete mode 100644 crates/rooch/src/commands/statedb/btc_source_data.md diff --git a/Cargo.lock b/Cargo.lock index 42deeb5e5f..fe19fefbd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10083,6 +10083,7 @@ dependencies = [ name = "rooch" version = "0.8.6" dependencies = [ + "accumulator", "anyhow 1.0.95", "async-trait", "bcs", diff --git a/crates/rooch/Cargo.toml b/crates/rooch/Cargo.toml index fc0fd43ccd..423d61b287 100644 --- a/crates/rooch/Cargo.toml +++ b/crates/rooch/Cargo.toml @@ -85,6 +85,7 @@ framework-types = { workspace = true } raw-store = { workspace = true } smt = { workspace = true } +accumulator = { workspace = true } bitcoin-client = { workspace = true } rooch-executor = { workspace = true } rooch-key = { workspace = true } diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index ca1317dd91..a14c8925fe 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -1,7 +1,9 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::commands::da::commands::{build_rooch_db, LedgerTxGetter, TxDAIndexer}; +use crate::commands::da::commands::{ + build_rooch_db, LedgerTxGetter, SequencedTxStore, TxDAIndexer, +}; use anyhow::Context; use bitcoin::hashes::Hash; use bitcoin_client::actor::client::BitcoinClientConfig; @@ -30,6 +32,7 @@ use rooch_types::bitcoin::types::Block as BitcoinBlock; use rooch_types::error::RoochResult; use rooch_types::rooch_network::RoochChainID; use rooch_types::transaction::{L1BlockWithBody, LedgerTransaction, LedgerTxData}; +use std::cmp::min; use std::collections::HashMap; use std::fs::File; use std::io::{BufRead, BufReader, Read}; @@ -42,22 +45,37 @@ use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::sync::watch; use tokio::time; +use tracing::info; /// exec LedgerTransaction List for verification. #[derive(Debug, Parser)] pub struct ExecCommand { + #[clap( + long = "mode", + help = "execute mode: exec, seq, full. exec: only execute transactions, no update sequence related data; full: execute transactions and update sequence related data" + )] + pub mode: ExecMode, #[clap(long = "segment-dir")] pub segment_dir: PathBuf, #[clap( long = "order-state-path", - help = "Path to tx_order:state_root file(results from RoochNetwork), for verification" + help = "Path to tx_order:state_root file(results from RoochNetwork), for fast verification avoiding blocking on RPC requests" )] pub order_state_path: PathBuf, + #[clap( + long = "order-hash-path", + help = "Path to tx_order:tx_hash:l2_block_number file" + )] + pub order_hash_path: PathBuf, + #[clap( + long = "rollback", + help = "rollback to tx order. If not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order." + )] + pub rollback: Option, #[clap(long = "data-dir", short = 'd')] /// Path to data dir, this dir is base dir, the final data_dir is base_dir/chain_network_name pub base_data_dir: Option, - /// If local chainid, start the service with a temporary data store. /// All data will be deleted when the service is stopped. #[clap(long, short = 'n', help = R_OPT_NET_HELP)] @@ -74,7 +92,6 @@ pub struct ExecCommand { #[clap(name = "rocksdb-row-cache-size", long, help = "rocksdb row cache size")] pub row_cache_size: Option, - #[clap( name = "rocksdb-block-cache-size", long, @@ -83,36 +100,56 @@ pub struct ExecCommand { pub block_cache_size: Option, #[clap(long = "enable-rocks-stats", help = "rocksdb-enable-statistics")] pub enable_rocks_stats: bool, +} - #[clap( - long = "order-hash-path", - help = "Path to tx_order:tx_hash:block_number file" - )] - pub order_hash_path: PathBuf, - #[clap( - long = "rollback", - help = "rollback to tx order. If not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order." - )] - pub rollback: Option, +#[derive(Debug, Copy, Clone, clap::ValueEnum)] +pub enum ExecMode { + Exec, // Only execute transactions, no sequence updates + Seq, // Only update sequence data, no execution + Both, // Execute transactions and update sequence data +} + +impl ExecMode { + pub fn as_bits(&self) -> u8 { + match self { + ExecMode::Exec => 0b10, // Execute + ExecMode::Seq => 0b01, // Sequence + ExecMode::Both => 0b11, // Both + } + } + + pub fn is_exec(&self) -> bool { + self.as_bits() & 0b10 != 0 + } + + pub fn is_seq(&self) -> bool { + self.as_bits() & 0b01 != 0 + } + + pub fn is_both(&self) -> bool { + self.as_bits() == 0b11 + } + + pub fn get_verify_targets(&self) -> String { + match self { + ExecMode::Exec => "state root", + ExecMode::Seq => "accumulator root", + ExecMode::Both => "state+accumulator root", + } + .to_string() + } } impl ExecCommand { pub async fn execute(self) -> RoochResult<()> { - let exec_inner = self.build_exec_inner().await?; + let mut exec_inner = self.build_exec_inner().await?; exec_inner.run().await?; Ok(()) } async fn build_exec_inner(&self) -> anyhow::Result { let actor_system = ActorSystem::global_system(); - let bitcoin_client_proxy = build_btc_client_proxy( - self.btc_rpc_url.clone(), - self.btc_rpc_user_name.clone(), - self.btc_rpc_password.clone(), - self.btc_local_block_store_dir.clone(), - &actor_system, - ) - .await?; + let (executor, moveos_store, rooch_db) = build_executor_and_store( self.base_data_dir.clone(), self.chain_id.clone(), @@ -123,6 +160,17 @@ impl ExecCommand { ) .await?; + let sequenced_tx_store = SequencedTxStore::new(rooch_db.rooch_store.clone())?; + + let bitcoin_client_proxy = build_btc_client_proxy( + self.btc_rpc_url.clone(), + self.btc_rpc_user_name.clone(), + self.btc_rpc_password.clone(), + self.btc_local_block_store_dir.clone(), + &actor_system, + ) + .await?; + let (order_state_pair, tx_order_end) = self.load_order_state_pair(); let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir.clone())?; let tx_da_indexer = TxDAIndexer::load_from_file( @@ -130,10 +178,12 @@ impl ExecCommand { moveos_store.transaction_store, )?; Ok(ExecInner { + mode: self.mode, ledger_tx_getter: ledger_tx_loader, tx_da_indexer, order_state_pair, tx_order_end, + sequenced_tx_store, bitcoin_client_proxy, executor, produced: Arc::new(AtomicU64::new(0)), @@ -144,7 +194,7 @@ impl ExecCommand { }) } - fn load_order_state_pair(&self) -> (HashMap, u64) { + fn load_order_state_pair(&self) -> (HashMap, u64) { let mut order_state_pair = HashMap::new(); let mut tx_order_end = 0; @@ -155,7 +205,8 @@ impl ExecCommand { let parts: Vec<&str> = line.split(':').collect(); let tx_order = parts[0].parse::().unwrap(); let state_root = H256::from_str(parts[1]).unwrap(); - order_state_pair.insert(tx_order, state_root); + let accumulator_root = H256::from_str(parts[2]).unwrap(); + order_state_pair.insert(tx_order, (state_root, accumulator_root)); if tx_order > tx_order_end { tx_order_end = tx_order; } @@ -165,11 +216,15 @@ impl ExecCommand { } struct ExecInner { + mode: ExecMode, + ledger_tx_getter: LedgerTxGetter, tx_da_indexer: TxDAIndexer, - order_state_pair: HashMap, + order_state_pair: HashMap, tx_order_end: u64, + sequenced_tx_store: SequencedTxStore, + bitcoin_client_proxy: BitcoinClientProxy, executor: ExecutorProxy, @@ -243,7 +298,7 @@ impl ExecInner { } } - async fn run(&self) -> anyhow::Result<()> { + async fn run(&mut self) -> anyhow::Result<()> { let (shutdown_tx, shutdown_rx) = watch::channel(()); self.start_logging_task(shutdown_rx); @@ -282,15 +337,37 @@ impl ExecInner { async fn produce_tx(&self, tx: Sender) -> anyhow::Result<()> { let last_executed_opt = self.tx_da_indexer.find_last_executed()?; - let next_tx_order = last_executed_opt + let last_sequenced_tx = self.sequenced_tx_store.get_last_tx_order(); + let mut next_tx_order = last_executed_opt .clone() .map(|v| v.tx_order + 1) .unwrap_or(1); + + if self.mode.is_both() && next_tx_order != last_sequenced_tx + 1 { + let last_executed_tx_order = match last_executed_opt { + Some(v) => v.tx_order, + None => 0, + }; + info! { + "Last executed tx order: {}, last sequenced tx order: {}, need rollback to tx order: {}", + last_executed_tx_order, + last_sequenced_tx, + min(last_sequenced_tx, last_executed_tx_order) + }; + return Ok(()); + } + let mut next_block_number = last_executed_opt .clone() .map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block .unwrap_or(0); - tracing::info!( + + if !self.mode.is_exec() { + next_tx_order = last_sequenced_tx + 1; + next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap(); + } + + info!( "next_tx_order: {:?}. need rollback soon: {:?}", next_tx_order, self.rollback.is_some() @@ -304,7 +381,7 @@ impl ExecInner { self.tx_da_indexer.slice(rollback, last_executed_tx_order)?; // split into two parts, the first get execution info for new startup, all others rollback let (new_last, rollback_part) = new_last_and_rollback.split_first().unwrap(); - tracing::info!( + info!( "Start to rollback transactions tx_order: [{}, {}]", rollback_part.first().unwrap().tx_order, rollback_part.last().unwrap().tx_order, @@ -323,17 +400,14 @@ impl ExecInner { let rollback_execution_info = self.tx_da_indexer.get_execution_info(new_last.tx_hash)?; self.update_startup_info_after_rollback(rollback_execution_info.unwrap())?; - tracing::info!( - "Rollback transactions done. Please RESTART process without rollback." - ); + info!("Rollback transactions done. Please RESTART process without rollback."); return Ok(()); // rollback done, need to restart to get new state_root for startup rooch store } }; - tracing::info!( + info!( "Start to produce transactions from tx_order: {}, check from block: {}", - next_tx_order, - next_block_number, + next_tx_order, next_block_number, ); let mut produced_tx_order = 0; let mut reach_end = false; @@ -382,18 +456,20 @@ impl ExecInner { } next_block_number += 1; } - tracing::info!( + info!( "All transactions are produced, max_block_number: {}, max_tx_order: {}", - next_block_number, - produced_tx_order + next_block_number, produced_tx_order ); Ok(()) } async fn consume_tx(&self, mut rx: Receiver) -> anyhow::Result<()> { - tracing::info!("Start to consume transactions"); + info!("Start to consume transactions"); let mut executed_tx_order = 0; let mut last_record_time = std::time::Instant::now(); + + const STATISTICS_INTERVAL: u64 = 100000; + loop { let exec_msg_opt = rx.recv().await; if exec_msg_opt.is_none() { @@ -404,7 +480,7 @@ impl ExecInner { self.execute(exec_msg).await.with_context(|| { format!( - "Error executing transaction: tx_order: {}, executed_tx_order: {}", + "Error occurs: tx_order: {}, executed_tx_order: {}", tx_order, executed_tx_order ) })?; @@ -414,20 +490,21 @@ impl ExecInner { .store(executed_tx_order, std::sync::atomic::Ordering::Relaxed); let done = self.done.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; - if done % 10000 == 0 { + if done % STATISTICS_INTERVAL == 0 { let elapsed = last_record_time.elapsed(); - tracing::info!( - "execute tx range: [{}, {}], cost: {:?}, avg: {:.3} ms/tx", - tx_order + 1 - 10000, // add first, avoid overflow + info!( + "tx range: [{}, {}], cost: {:?}, avg: {:.3} ms/tx", + tx_order + 1 - STATISTICS_INTERVAL, // add first, avoid overflow tx_order, elapsed, - elapsed.as_millis() as f64 / 10000f64 + elapsed.as_millis() as f64 / STATISTICS_INTERVAL as f64 ); last_record_time = std::time::Instant::now(); } } - tracing::info!( - "All transactions execution state root are strictly equal to RoochNetwork: [0, {}]", + info!( + "All transactions {} are strictly equal to RoochNetwork: [0, {}]", + self.mode.get_verify_targets(), executed_tx_order ); Ok(()) @@ -439,12 +516,28 @@ impl ExecInner { ledger_tx, l1_block_with_body, } = msg; + let is_l2_tx = ledger_tx.data.is_l2_tx(); - let moveos_tx = self - .validate_ledger_transaction(ledger_tx, l1_block_with_body) - .await?; - if let Err(err) = self.execute_moveos_tx(tx_order, moveos_tx).await { - self.handle_execution_error(err, is_l2_tx, tx_order)?; + + let exp_root_opt = self.order_state_pair.get(&tx_order); + let exp_state_root = exp_root_opt.map(|v| v.0); + let exp_accumulator_root = exp_root_opt.map(|v| v.1); + + if self.mode.is_seq() { + self.sequenced_tx_store + .store_tx(ledger_tx.clone(), exp_accumulator_root)?; + } + + if self.mode.is_exec() { + let moveos_tx = self + .validate_ledger_transaction(ledger_tx, l1_block_with_body) + .await?; + if let Err(err) = self + .execute_moveos_tx(tx_order, moveos_tx, exp_state_root) + .await + { + self.handle_execution_error(err, is_l2_tx, tx_order)?; + } } Ok(()) @@ -513,7 +606,7 @@ impl ExecInner { } }; - moveos_tx.ctx.add(ledger_tx.sequence_info.clone())?; + moveos_tx.ctx.add(ledger_tx.sequence_info)?; Ok(moveos_tx) } @@ -521,23 +614,23 @@ impl ExecInner { &self, tx_order: u64, moveos_tx: VerifiedMoveOSTransaction, + exp_state_root_opt: Option, ) -> anyhow::Result<()> { let executor = self.executor.clone(); let (_output, execution_info) = executor.execute_transaction(moveos_tx.clone()).await?; let root = execution_info.root_metadata(); - let expected_root_opt = self.order_state_pair.get(&tx_order); - match expected_root_opt { + match exp_state_root_opt { Some(expected_root) => { - if root.state_root.unwrap() != *expected_root { + if root.state_root.unwrap() != expected_root { return Err(anyhow::anyhow!( "Execution state root is not equal to RoochNetwork: tx_order: {}, exp: {:?}, act: {:?}; act_execution_info: {:?}", tx_order, - *expected_root, root.state_root.unwrap(), execution_info + expected_root, root.state_root.unwrap(), execution_info )); } - tracing::info!( + info!( "Execution state root is equal to RoochNetwork: tx_order: {}", tx_order ); diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index 93bf675f40..a96340d2c1 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -1,6 +1,7 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use accumulator::{Accumulator, MerkleAccumulator}; use metrics::RegistryService; use moveos_store::transaction_store::{TransactionDBStore, TransactionStore}; use moveos_types::h256::H256; @@ -9,21 +10,120 @@ use moveos_types::transaction::TransactionExecutionInfo; use rooch_common::vec::find_last_true; use rooch_config::RoochOpt; use rooch_db::RoochDB; +use rooch_store::RoochStore; use rooch_types::da::chunk::chunk_from_segments; use rooch_types::da::segment::{segment_from_bytes, SegmentID}; use rooch_types::rooch_network::RoochChainID; +use rooch_types::sequencer::SequencerInfo; use rooch_types::transaction::LedgerTransaction; use std::collections::HashMap; use std::fs; use std::fs::File; use std::io::{BufRead, BufReader, Read}; use std::path::PathBuf; +use std::sync::atomic::AtomicU64; +use tracing::info; pub mod exec; pub mod index_tx; pub mod namespace; pub mod unpack; +pub(crate) struct SequencedTxStore { + tx_accumulator: MerkleAccumulator, + last_sequenced_tx_order: AtomicU64, + rooch_store: RoochStore, +} + +impl SequencedTxStore { + pub(crate) fn new(rooch_store: RoochStore) -> anyhow::Result { + // The sequencer info would be initialized when genesis, so the sequencer info should not be None + let last_sequencer_info = rooch_store + .get_meta_store() + .get_sequencer_info()? + .ok_or_else(|| anyhow::anyhow!("Load sequencer info failed"))?; + let (last_order, last_accumulator_info) = ( + last_sequencer_info.last_order, + last_sequencer_info.last_accumulator_info.clone(), + ); + info!("Load latest sequencer order {:?}", last_order); + info!( + "Load latest sequencer accumulator info {:?}", + last_accumulator_info + ); + let tx_accumulator = MerkleAccumulator::new_with_info( + last_accumulator_info, + rooch_store.get_transaction_accumulator_store(), + ); + + Ok(SequencedTxStore { + tx_accumulator, + last_sequenced_tx_order: AtomicU64::new(last_order), + rooch_store, + }) + } + + pub(crate) fn get_last_tx_order(&self) -> u64 { + self.last_sequenced_tx_order + .load(std::sync::atomic::Ordering::SeqCst) + } + + pub(crate) fn store_tx( + &self, + mut tx: LedgerTransaction, + exp_accumulator_root: Option, + ) -> anyhow::Result<()> { + let tx_order = tx.sequence_info.tx_order; + match self.last_sequenced_tx_order.compare_exchange( + tx_order - 1, + tx_order, + std::sync::atomic::Ordering::SeqCst, + std::sync::atomic::Ordering::SeqCst, + ) { + Ok(_) => { + // CAS succeeded, continue with function logic + } + Err(current) => { + return Err(anyhow::anyhow!( + "CAS failed: Tx order is not strictly incremental. \ + Expected: {}, Actual: {}, Tx Order: {}", + tx_order - 1, + current, + tx_order + )); + } + } + + let tx_hash = tx.tx_hash(); + let _tx_accumulator_root = self.tx_accumulator.append(vec![tx_hash].as_slice())?; + let tx_accumulator_unsaved_nodes = self.tx_accumulator.pop_unsaved_nodes(); + let tx_accumulator_info = self.tx_accumulator.get_info(); + + if let Some(exp_accumulator_root) = exp_accumulator_root { + if tx_accumulator_info.accumulator_root != exp_accumulator_root { + return Err(anyhow::anyhow!( + "Tx accumulator root mismatch, expect: {:?}, actual: {:?}", + exp_accumulator_root, + tx_accumulator_info.accumulator_root + )); + } else { + info!( + "Accumulator root is equal to RoochNetwork: tx_order: {}", + tx_order + ); + } + } + + let sequencer_info = SequencerInfo::new(tx_order, tx_accumulator_info); + self.rooch_store.save_sequenced_tx( + tx_hash, + tx.clone(), + sequencer_info, + tx_accumulator_unsaved_nodes, + ) + } +} + // collect all the chunks from segment_dir. // each segment is stored in a file named by the segment_id. // each chunk may contain multiple segments. @@ -258,6 +358,19 @@ impl TxDAIndexer { Ok(r.cloned()) } + pub fn find_tx_block(&self, tx_order: u64) -> Option { + let r = self + .tx_order_hash_blocks + .binary_search_by(|x| x.tx_order.cmp(&tx_order)); + let idx = match r { + Ok(i) => i, + Err(_) => { + return None; + } + }; + Some(self.tx_order_hash_blocks[idx].block_number) + } + fn has_executed(&self, tx_hash: H256) -> bool { let execution_info = self .transaction_store diff --git a/crates/rooch/src/commands/statedb/README.md b/crates/rooch/src/commands/statedb/README.md index 41f82f3fc5..6abad77bf5 100644 --- a/crates/rooch/src/commands/statedb/README.md +++ b/crates/rooch/src/commands/statedb/README.md @@ -10,7 +10,8 @@ A tool to export/import rooch statedb. `genesis` is a subcommand to generate rooch statedb from utxo and ord source files. Run it before starting rooch node. -Source data needed by `genesis` for Rooch MainNet could be found [here](TODO). +Source data needed by `genesis` for Rooch MainNet could be +found [here](https://storage.googleapis.com/rooch_dev/genesis-source/README.md). For protecting the data integrity, verify checksum file's sha256 before running `genesis` command. diff --git a/crates/rooch/src/commands/statedb/btc_source_data.md b/crates/rooch/src/commands/statedb/btc_source_data.md deleted file mode 100644 index 76c928cf7c..0000000000 --- a/crates/rooch/src/commands/statedb/btc_source_data.md +++ /dev/null @@ -1,70 +0,0 @@ -## BTC Source Data - -This doc shows how Rooch gets BTC source data(utxo/inscriptions) - -### Sync Bitcoin server at certain height - -#### bitcoind synced with `-txindex=1` and `-server=1` option: - -```shell -bitcoind -datadir= -txindex=1 -server=1 -``` - -#### set block height to genesis block height: - -expect height: `` (for Rooch Mainnet, `` is `859000`) - -```shell - -get `` block hash: - -```shell -bitcoin-cli -datadir= -conf= -rpccookiefile= getblockhash - -``` - -invalid `` block: - -```shell -bitcoin-cli -datadir= -conf= -rpccookiefile= invalidateblock -``` - -check block height is expected: - -```shell -bitcoin-cli -datadir= -conf= -rpccookiefile= getblockcount - -``` - -### Dump UTXO source file: - -> - stop bitcoind first -> - clone chainstate: - -```shell -rsync --delete -av -``` - -> - dump utxo source file(each line is a utxo record, format - is `count,txid,vout,height,coinbase,amount,script,type,address`) by - [bitcoin-utxo-dump](https://github.com/in3rsha/bitcoin-utxo-dump): - -```shell -bitcoin-utxo-dump -f count,txid,vout,height,coinbase,amount,script,type,address -db -o -``` - -> - check max height of utxo dump file is by python script: - -```shell -awk -F, 'NR > 1 { if ($4 > max) max = $4 } END { print max }' -``` - -### Dump Inscription source file: - -[ord tool](https://github.com/popcnt1/ord/tree/feat/rooch/export) is used to dump inscriptions source file. - -> - start bitcoind again -> - mapping inscription:address by `ord index map-addr` -> - dump inscriptions by `ord index rooch` - -More details could be found [here](https://github.com/popcnt1/ord/tree/feat/rooch/export/src/subcommand/index)