diff --git a/Cargo.lock b/Cargo.lock index 42deeb5e5f..fe19fefbd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10083,6 +10083,7 @@ dependencies = [ name = "rooch" version = "0.8.6" dependencies = [ + "accumulator", "anyhow 1.0.95", "async-trait", "bcs", diff --git a/crates/rooch/Cargo.toml b/crates/rooch/Cargo.toml index fc0fd43ccd..423d61b287 100644 --- a/crates/rooch/Cargo.toml +++ b/crates/rooch/Cargo.toml @@ -85,6 +85,7 @@ framework-types = { workspace = true } raw-store = { workspace = true } smt = { workspace = true } +accumulator = { workspace = true } bitcoin-client = { workspace = true } rooch-executor = { workspace = true } rooch-key = { workspace = true } diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index ca1317dd91..a14c8925fe 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -1,7 +1,9 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::commands::da::commands::{build_rooch_db, LedgerTxGetter, TxDAIndexer}; +use crate::commands::da::commands::{ + build_rooch_db, LedgerTxGetter, SequencedTxStore, TxDAIndexer, +}; use anyhow::Context; use bitcoin::hashes::Hash; use bitcoin_client::actor::client::BitcoinClientConfig; @@ -30,6 +32,7 @@ use rooch_types::bitcoin::types::Block as BitcoinBlock; use rooch_types::error::RoochResult; use rooch_types::rooch_network::RoochChainID; use rooch_types::transaction::{L1BlockWithBody, LedgerTransaction, LedgerTxData}; +use std::cmp::min; use std::collections::HashMap; use std::fs::File; use std::io::{BufRead, BufReader, Read}; @@ -42,22 +45,37 @@ use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::sync::watch; use tokio::time; +use tracing::info; /// exec LedgerTransaction List for verification. #[derive(Debug, Parser)] pub struct ExecCommand { + #[clap( + long = "mode", + help = "execute mode: exec, seq, full. exec: only execute transactions, no update sequence related data; full: execute transactions and update sequence related data" + )] + pub mode: ExecMode, #[clap(long = "segment-dir")] pub segment_dir: PathBuf, #[clap( long = "order-state-path", - help = "Path to tx_order:state_root file(results from RoochNetwork), for verification" + help = "Path to tx_order:state_root file(results from RoochNetwork), for fast verification avoiding blocking on RPC requests" )] pub order_state_path: PathBuf, + #[clap( + long = "order-hash-path", + help = "Path to tx_order:tx_hash:l2_block_number file" + )] + pub order_hash_path: PathBuf, + #[clap( + long = "rollback", + help = "rollback to tx order. If not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order." + )] + pub rollback: Option, #[clap(long = "data-dir", short = 'd')] /// Path to data dir, this dir is base dir, the final data_dir is base_dir/chain_network_name pub base_data_dir: Option, - /// If local chainid, start the service with a temporary data store. /// All data will be deleted when the service is stopped. #[clap(long, short = 'n', help = R_OPT_NET_HELP)] @@ -74,7 +92,6 @@ pub struct ExecCommand { #[clap(name = "rocksdb-row-cache-size", long, help = "rocksdb row cache size")] pub row_cache_size: Option, - #[clap( name = "rocksdb-block-cache-size", long, @@ -83,36 +100,56 @@ pub struct ExecCommand { pub block_cache_size: Option, #[clap(long = "enable-rocks-stats", help = "rocksdb-enable-statistics")] pub enable_rocks_stats: bool, +} - #[clap( - long = "order-hash-path", - help = "Path to tx_order:tx_hash:block_number file" - )] - pub order_hash_path: PathBuf, - #[clap( - long = "rollback", - help = "rollback to tx order. If not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order." - )] - pub rollback: Option, +#[derive(Debug, Copy, Clone, clap::ValueEnum)] +pub enum ExecMode { + Exec, // Only execute transactions, no sequence updates + Seq, // Only update sequence data, no execution + Both, // Execute transactions and update sequence data +} + +impl ExecMode { + pub fn as_bits(&self) -> u8 { + match self { + ExecMode::Exec => 0b10, // Execute + ExecMode::Seq => 0b01, // Sequence + ExecMode::Both => 0b11, // Both + } + } + + pub fn is_exec(&self) -> bool { + self.as_bits() & 0b10 != 0 + } + + pub fn is_seq(&self) -> bool { + self.as_bits() & 0b01 != 0 + } + + pub fn is_both(&self) -> bool { + self.as_bits() == 0b11 + } + + pub fn get_verify_targets(&self) -> String { + match self { + ExecMode::Exec => "state root", + ExecMode::Seq => "accumulator root", + ExecMode::Both => "state+accumulator root", + } + .to_string() + } } impl ExecCommand { pub async fn execute(self) -> RoochResult<()> { - let exec_inner = self.build_exec_inner().await?; + let mut exec_inner = self.build_exec_inner().await?; exec_inner.run().await?; Ok(()) } async fn build_exec_inner(&self) -> anyhow::Result { let actor_system = ActorSystem::global_system(); - let bitcoin_client_proxy = build_btc_client_proxy( - self.btc_rpc_url.clone(), - self.btc_rpc_user_name.clone(), - self.btc_rpc_password.clone(), - self.btc_local_block_store_dir.clone(), - &actor_system, - ) - .await?; + let (executor, moveos_store, rooch_db) = build_executor_and_store( self.base_data_dir.clone(), self.chain_id.clone(), @@ -123,6 +160,17 @@ impl ExecCommand { ) .await?; + let sequenced_tx_store = SequencedTxStore::new(rooch_db.rooch_store.clone())?; + + let bitcoin_client_proxy = build_btc_client_proxy( + self.btc_rpc_url.clone(), + self.btc_rpc_user_name.clone(), + self.btc_rpc_password.clone(), + self.btc_local_block_store_dir.clone(), + &actor_system, + ) + .await?; + let (order_state_pair, tx_order_end) = self.load_order_state_pair(); let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir.clone())?; let tx_da_indexer = TxDAIndexer::load_from_file( @@ -130,10 +178,12 @@ impl ExecCommand { moveos_store.transaction_store, )?; Ok(ExecInner { + mode: self.mode, ledger_tx_getter: ledger_tx_loader, tx_da_indexer, order_state_pair, tx_order_end, + sequenced_tx_store, bitcoin_client_proxy, executor, produced: Arc::new(AtomicU64::new(0)), @@ -144,7 +194,7 @@ impl ExecCommand { }) } - fn load_order_state_pair(&self) -> (HashMap, u64) { + fn load_order_state_pair(&self) -> (HashMap, u64) { let mut order_state_pair = HashMap::new(); let mut tx_order_end = 0; @@ -155,7 +205,8 @@ impl ExecCommand { let parts: Vec<&str> = line.split(':').collect(); let tx_order = parts[0].parse::().unwrap(); let state_root = H256::from_str(parts[1]).unwrap(); - order_state_pair.insert(tx_order, state_root); + let accumulator_root = H256::from_str(parts[2]).unwrap(); + order_state_pair.insert(tx_order, (state_root, accumulator_root)); if tx_order > tx_order_end { tx_order_end = tx_order; } @@ -165,11 +216,15 @@ impl ExecCommand { } struct ExecInner { + mode: ExecMode, + ledger_tx_getter: LedgerTxGetter, tx_da_indexer: TxDAIndexer, - order_state_pair: HashMap, + order_state_pair: HashMap, tx_order_end: u64, + sequenced_tx_store: SequencedTxStore, + bitcoin_client_proxy: BitcoinClientProxy, executor: ExecutorProxy, @@ -243,7 +298,7 @@ impl ExecInner { } } - async fn run(&self) -> anyhow::Result<()> { + async fn run(&mut self) -> anyhow::Result<()> { let (shutdown_tx, shutdown_rx) = watch::channel(()); self.start_logging_task(shutdown_rx); @@ -282,15 +337,37 @@ impl ExecInner { async fn produce_tx(&self, tx: Sender) -> anyhow::Result<()> { let last_executed_opt = self.tx_da_indexer.find_last_executed()?; - let next_tx_order = last_executed_opt + let last_sequenced_tx = self.sequenced_tx_store.get_last_tx_order(); + let mut next_tx_order = last_executed_opt .clone() .map(|v| v.tx_order + 1) .unwrap_or(1); + + if self.mode.is_both() && next_tx_order != last_sequenced_tx + 1 { + let last_executed_tx_order = match last_executed_opt { + Some(v) => v.tx_order, + None => 0, + }; + info! { + "Last executed tx order: {}, last sequenced tx order: {}, need rollback to tx order: {}", + last_executed_tx_order, + last_sequenced_tx, + min(last_sequenced_tx, last_executed_tx_order) + }; + return Ok(()); + } + let mut next_block_number = last_executed_opt .clone() .map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block .unwrap_or(0); - tracing::info!( + + if !self.mode.is_exec() { + next_tx_order = last_sequenced_tx + 1; + next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap(); + } + + info!( "next_tx_order: {:?}. need rollback soon: {:?}", next_tx_order, self.rollback.is_some() @@ -304,7 +381,7 @@ impl ExecInner { self.tx_da_indexer.slice(rollback, last_executed_tx_order)?; // split into two parts, the first get execution info for new startup, all others rollback let (new_last, rollback_part) = new_last_and_rollback.split_first().unwrap(); - tracing::info!( + info!( "Start to rollback transactions tx_order: [{}, {}]", rollback_part.first().unwrap().tx_order, rollback_part.last().unwrap().tx_order, @@ -323,17 +400,14 @@ impl ExecInner { let rollback_execution_info = self.tx_da_indexer.get_execution_info(new_last.tx_hash)?; self.update_startup_info_after_rollback(rollback_execution_info.unwrap())?; - tracing::info!( - "Rollback transactions done. Please RESTART process without rollback." - ); + info!("Rollback transactions done. Please RESTART process without rollback."); return Ok(()); // rollback done, need to restart to get new state_root for startup rooch store } }; - tracing::info!( + info!( "Start to produce transactions from tx_order: {}, check from block: {}", - next_tx_order, - next_block_number, + next_tx_order, next_block_number, ); let mut produced_tx_order = 0; let mut reach_end = false; @@ -382,18 +456,20 @@ impl ExecInner { } next_block_number += 1; } - tracing::info!( + info!( "All transactions are produced, max_block_number: {}, max_tx_order: {}", - next_block_number, - produced_tx_order + next_block_number, produced_tx_order ); Ok(()) } async fn consume_tx(&self, mut rx: Receiver) -> anyhow::Result<()> { - tracing::info!("Start to consume transactions"); + info!("Start to consume transactions"); let mut executed_tx_order = 0; let mut last_record_time = std::time::Instant::now(); + + const STATISTICS_INTERVAL: u64 = 100000; + loop { let exec_msg_opt = rx.recv().await; if exec_msg_opt.is_none() { @@ -404,7 +480,7 @@ impl ExecInner { self.execute(exec_msg).await.with_context(|| { format!( - "Error executing transaction: tx_order: {}, executed_tx_order: {}", + "Error occurs: tx_order: {}, executed_tx_order: {}", tx_order, executed_tx_order ) })?; @@ -414,20 +490,21 @@ impl ExecInner { .store(executed_tx_order, std::sync::atomic::Ordering::Relaxed); let done = self.done.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; - if done % 10000 == 0 { + if done % STATISTICS_INTERVAL == 0 { let elapsed = last_record_time.elapsed(); - tracing::info!( - "execute tx range: [{}, {}], cost: {:?}, avg: {:.3} ms/tx", - tx_order + 1 - 10000, // add first, avoid overflow + info!( + "tx range: [{}, {}], cost: {:?}, avg: {:.3} ms/tx", + tx_order + 1 - STATISTICS_INTERVAL, // add first, avoid overflow tx_order, elapsed, - elapsed.as_millis() as f64 / 10000f64 + elapsed.as_millis() as f64 / STATISTICS_INTERVAL as f64 ); last_record_time = std::time::Instant::now(); } } - tracing::info!( - "All transactions execution state root are strictly equal to RoochNetwork: [0, {}]", + info!( + "All transactions {} are strictly equal to RoochNetwork: [0, {}]", + self.mode.get_verify_targets(), executed_tx_order ); Ok(()) @@ -439,12 +516,28 @@ impl ExecInner { ledger_tx, l1_block_with_body, } = msg; + let is_l2_tx = ledger_tx.data.is_l2_tx(); - let moveos_tx = self - .validate_ledger_transaction(ledger_tx, l1_block_with_body) - .await?; - if let Err(err) = self.execute_moveos_tx(tx_order, moveos_tx).await { - self.handle_execution_error(err, is_l2_tx, tx_order)?; + + let exp_root_opt = self.order_state_pair.get(&tx_order); + let exp_state_root = exp_root_opt.map(|v| v.0); + let exp_accumulator_root = exp_root_opt.map(|v| v.1); + + if self.mode.is_seq() { + self.sequenced_tx_store + .store_tx(ledger_tx.clone(), exp_accumulator_root)?; + } + + if self.mode.is_exec() { + let moveos_tx = self + .validate_ledger_transaction(ledger_tx, l1_block_with_body) + .await?; + if let Err(err) = self + .execute_moveos_tx(tx_order, moveos_tx, exp_state_root) + .await + { + self.handle_execution_error(err, is_l2_tx, tx_order)?; + } } Ok(()) @@ -513,7 +606,7 @@ impl ExecInner { } }; - moveos_tx.ctx.add(ledger_tx.sequence_info.clone())?; + moveos_tx.ctx.add(ledger_tx.sequence_info)?; Ok(moveos_tx) } @@ -521,23 +614,23 @@ impl ExecInner { &self, tx_order: u64, moveos_tx: VerifiedMoveOSTransaction, + exp_state_root_opt: Option, ) -> anyhow::Result<()> { let executor = self.executor.clone(); let (_output, execution_info) = executor.execute_transaction(moveos_tx.clone()).await?; let root = execution_info.root_metadata(); - let expected_root_opt = self.order_state_pair.get(&tx_order); - match expected_root_opt { + match exp_state_root_opt { Some(expected_root) => { - if root.state_root.unwrap() != *expected_root { + if root.state_root.unwrap() != expected_root { return Err(anyhow::anyhow!( "Execution state root is not equal to RoochNetwork: tx_order: {}, exp: {:?}, act: {:?}; act_execution_info: {:?}", tx_order, - *expected_root, root.state_root.unwrap(), execution_info + expected_root, root.state_root.unwrap(), execution_info )); } - tracing::info!( + info!( "Execution state root is equal to RoochNetwork: tx_order: {}", tx_order ); diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index 93bf675f40..a96340d2c1 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -1,6 +1,7 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use accumulator::{Accumulator, MerkleAccumulator}; use metrics::RegistryService; use moveos_store::transaction_store::{TransactionDBStore, TransactionStore}; use moveos_types::h256::H256; @@ -9,21 +10,120 @@ use moveos_types::transaction::TransactionExecutionInfo; use rooch_common::vec::find_last_true; use rooch_config::RoochOpt; use rooch_db::RoochDB; +use rooch_store::RoochStore; use rooch_types::da::chunk::chunk_from_segments; use rooch_types::da::segment::{segment_from_bytes, SegmentID}; use rooch_types::rooch_network::RoochChainID; +use rooch_types::sequencer::SequencerInfo; use rooch_types::transaction::LedgerTransaction; use std::collections::HashMap; use std::fs; use std::fs::File; use std::io::{BufRead, BufReader, Read}; use std::path::PathBuf; +use std::sync::atomic::AtomicU64; +use tracing::info; pub mod exec; pub mod index_tx; pub mod namespace; pub mod unpack; +pub(crate) struct SequencedTxStore { + tx_accumulator: MerkleAccumulator, + last_sequenced_tx_order: AtomicU64, + rooch_store: RoochStore, +} + +impl SequencedTxStore { + pub(crate) fn new(rooch_store: RoochStore) -> anyhow::Result { + // The sequencer info would be initialized when genesis, so the sequencer info should not be None + let last_sequencer_info = rooch_store + .get_meta_store() + .get_sequencer_info()? + .ok_or_else(|| anyhow::anyhow!("Load sequencer info failed"))?; + let (last_order, last_accumulator_info) = ( + last_sequencer_info.last_order, + last_sequencer_info.last_accumulator_info.clone(), + ); + info!("Load latest sequencer order {:?}", last_order); + info!( + "Load latest sequencer accumulator info {:?}", + last_accumulator_info + ); + let tx_accumulator = MerkleAccumulator::new_with_info( + last_accumulator_info, + rooch_store.get_transaction_accumulator_store(), + ); + + Ok(SequencedTxStore { + tx_accumulator, + last_sequenced_tx_order: AtomicU64::new(last_order), + rooch_store, + }) + } + + pub(crate) fn get_last_tx_order(&self) -> u64 { + self.last_sequenced_tx_order + .load(std::sync::atomic::Ordering::SeqCst) + } + + pub(crate) fn store_tx( + &self, + mut tx: LedgerTransaction, + exp_accumulator_root: Option, + ) -> anyhow::Result<()> { + let tx_order = tx.sequence_info.tx_order; + match self.last_sequenced_tx_order.compare_exchange( + tx_order - 1, + tx_order, + std::sync::atomic::Ordering::SeqCst, + std::sync::atomic::Ordering::SeqCst, + ) { + Ok(_) => { + // CAS succeeded, continue with function logic + } + Err(current) => { + return Err(anyhow::anyhow!( + "CAS failed: Tx order is not strictly incremental. \ + Expected: {}, Actual: {}, Tx Order: {}", + tx_order - 1, + current, + tx_order + )); + } + } + + let tx_hash = tx.tx_hash(); + let _tx_accumulator_root = self.tx_accumulator.append(vec![tx_hash].as_slice())?; + let tx_accumulator_unsaved_nodes = self.tx_accumulator.pop_unsaved_nodes(); + let tx_accumulator_info = self.tx_accumulator.get_info(); + + if let Some(exp_accumulator_root) = exp_accumulator_root { + if tx_accumulator_info.accumulator_root != exp_accumulator_root { + return Err(anyhow::anyhow!( + "Tx accumulator root mismatch, expect: {:?}, actual: {:?}", + exp_accumulator_root, + tx_accumulator_info.accumulator_root + )); + } else { + info!( + "Accumulator root is equal to RoochNetwork: tx_order: {}", + tx_order + ); + } + } + + let sequencer_info = SequencerInfo::new(tx_order, tx_accumulator_info); + self.rooch_store.save_sequenced_tx( + tx_hash, + tx.clone(), + sequencer_info, + tx_accumulator_unsaved_nodes, + ) + } +} + // collect all the chunks from segment_dir. // each segment is stored in a file named by the segment_id. // each chunk may contain multiple segments. @@ -258,6 +358,19 @@ impl TxDAIndexer { Ok(r.cloned()) } + pub fn find_tx_block(&self, tx_order: u64) -> Option { + let r = self + .tx_order_hash_blocks + .binary_search_by(|x| x.tx_order.cmp(&tx_order)); + let idx = match r { + Ok(i) => i, + Err(_) => { + return None; + } + }; + Some(self.tx_order_hash_blocks[idx].block_number) + } + fn has_executed(&self, tx_hash: H256) -> bool { let execution_info = self .transaction_store diff --git a/crates/rooch/src/commands/statedb/README.md b/crates/rooch/src/commands/statedb/README.md index 41f82f3fc5..6abad77bf5 100644 --- a/crates/rooch/src/commands/statedb/README.md +++ b/crates/rooch/src/commands/statedb/README.md @@ -10,7 +10,8 @@ A tool to export/import rooch statedb. `genesis` is a subcommand to generate rooch statedb from utxo and ord source files. Run it before starting rooch node. -Source data needed by `genesis` for Rooch MainNet could be found [here](TODO). +Source data needed by `genesis` for Rooch MainNet could be +found [here](https://storage.googleapis.com/rooch_dev/genesis-source/README.md). For protecting the data integrity, verify checksum file's sha256 before running `genesis` command. diff --git a/crates/rooch/src/commands/statedb/btc_source_data.md b/crates/rooch/src/commands/statedb/btc_source_data.md deleted file mode 100644 index 76c928cf7c..0000000000 --- a/crates/rooch/src/commands/statedb/btc_source_data.md +++ /dev/null @@ -1,70 +0,0 @@ -## BTC Source Data - -This doc shows how Rooch gets BTC source data(utxo/inscriptions) - -### Sync Bitcoin server at certain height - -#### bitcoind synced with `-txindex=1` and `-server=1` option: - -```shell -bitcoind -datadir= -txindex=1 -server=1 -``` - -#### set block height to genesis block height: - -expect height: `` (for Rooch Mainnet, `` is `859000`) - -```shell - -get `` block hash: - -```shell -bitcoin-cli -datadir= -conf= -rpccookiefile= getblockhash - -``` - -invalid `` block: - -```shell -bitcoin-cli -datadir= -conf= -rpccookiefile= invalidateblock -``` - -check block height is expected: - -```shell -bitcoin-cli -datadir= -conf= -rpccookiefile= getblockcount - -``` - -### Dump UTXO source file: - -> - stop bitcoind first -> - clone chainstate: - -```shell -rsync --delete -av -``` - -> - dump utxo source file(each line is a utxo record, format - is `count,txid,vout,height,coinbase,amount,script,type,address`) by - [bitcoin-utxo-dump](https://github.com/in3rsha/bitcoin-utxo-dump): - -```shell -bitcoin-utxo-dump -f count,txid,vout,height,coinbase,amount,script,type,address -db -o -``` - -> - check max height of utxo dump file is by python script: - -```shell -awk -F, 'NR > 1 { if ($4 > max) max = $4 } END { print max }' -``` - -### Dump Inscription source file: - -[ord tool](https://github.com/popcnt1/ord/tree/feat/rooch/export) is used to dump inscriptions source file. - -> - start bitcoind again -> - mapping inscription:address by `ord index map-addr` -> - dump inscriptions by `ord index rooch` - -More details could be found [here](https://github.com/popcnt1/ord/tree/feat/rooch/export/src/subcommand/index)