From bb595bc653c5863512219eabf594870d16b5cbf3 Mon Sep 17 00:00:00 2001 From: popcnt <142196625+popcnt1@users.noreply.github.com> Date: Tue, 4 Feb 2025 12:13:56 +0800 Subject: [PATCH] refactor(pipeline): reduce memcopy by avoiding whole L1BlockWithBody clone and optimize tx_hash caching (#3259) 1. Refactored tx_hash caching to improve performance and reduce redundancy 2. memmove of block body is unnecessary 3. L1Block is small, that's the real field we need to share between actors. Clone it but not whole L1BlockWithBody 4. Increase tx count and measurement time for benchmarking --- .../rooch-benchmarks/benches/bench_tx_exec.rs | 2 +- .../benches/bench_tx_validate.rs | 6 ++-- crates/rooch-benchmarks/src/tx.rs | 6 ++-- .../src/actor/processor.rs | 30 ++++++++++--------- crates/rooch-relayer/src/actor/relayer.rs | 2 +- crates/rooch/src/commands/da/commands/exec.rs | 8 ++++- 6 files changed, 31 insertions(+), 23 deletions(-) diff --git a/crates/rooch-benchmarks/benches/bench_tx_exec.rs b/crates/rooch-benchmarks/benches/bench_tx_exec.rs index f9eb11168a..ec8c35a877 100644 --- a/crates/rooch-benchmarks/benches/bench_tx_exec.rs +++ b/crates/rooch-benchmarks/benches/bench_tx_exec.rs @@ -8,7 +8,7 @@ use rooch_benchmarks::tx_exec::tx_exec_benchmark; criterion_group! { name = tx_exec_bench; - config = configure_criterion(None); + config = configure_criterion(None).measurement_time(std::time::Duration::from_secs(5)); targets = tx_exec_benchmark } diff --git a/crates/rooch-benchmarks/benches/bench_tx_validate.rs b/crates/rooch-benchmarks/benches/bench_tx_validate.rs index 15cc003dfa..d902407cf3 100644 --- a/crates/rooch-benchmarks/benches/bench_tx_validate.rs +++ b/crates/rooch-benchmarks/benches/bench_tx_validate.rs @@ -21,7 +21,7 @@ pub fn tx_validate_benchmark(c: &mut Criterion) { let tx_type = config.tx_type.unwrap().clone(); - let tx_cnt = 600; + let tx_cnt = 1000; let transactions: Vec<_> = (0..tx_cnt) .map(|_n| { // Because the validate function doesn't increase the sequence number, @@ -34,14 +34,14 @@ pub fn tx_validate_benchmark(c: &mut Criterion) { c.bench_function("tx_validate", |b| { b.iter(|| { let tx = transactions_iter.next().unwrap(); - binding_test.executor.validate_l2_tx(tx.clone()).unwrap() + binding_test.executor.validate_l2_tx(tx).unwrap() }); }); } criterion_group! { name = tx_validate_bench; - config = configure_criterion(None).measurement_time(Duration::from_millis(200)); + config = configure_criterion(None).measurement_time(Duration::from_millis(5000)); targets = tx_validate_benchmark } diff --git a/crates/rooch-benchmarks/src/tx.rs b/crates/rooch-benchmarks/src/tx.rs index 42b3486385..c6efd098f3 100644 --- a/crates/rooch-benchmarks/src/tx.rs +++ b/crates/rooch-benchmarks/src/tx.rs @@ -87,9 +87,9 @@ pub fn find_block_height(dir: &Path) -> Result> { } pub fn create_btc_blk_tx(height: u64, block_file: &Path) -> Result { - let block_hex_str = fs::read_to_string(block_file).unwrap(); - let block_hex = Vec::::from_hex(&block_hex_str).unwrap(); - let origin_block: Block = deserialize(&block_hex).unwrap(); + let block_hex_str = fs::read_to_string(block_file)?; + let block_hex = Vec::::from_hex(&block_hex_str)?; + let origin_block: Block = deserialize(&block_hex)?; let block = origin_block.clone(); let block_hash = block.header.block_hash(); let move_block = rooch_types::bitcoin::types::Block::from(block.clone()); diff --git a/crates/rooch-pipeline-processor/src/actor/processor.rs b/crates/rooch-pipeline-processor/src/actor/processor.rs index a48dfc7c87..71a2c72eb8 100644 --- a/crates/rooch-pipeline-processor/src/actor/processor.rs +++ b/crates/rooch-pipeline-processor/src/actor/processor.rs @@ -166,7 +166,7 @@ impl PipelineProcessorActor { #[named] pub async fn execute_l1_block( &mut self, - l1_block: L1BlockWithBody, + l1_block_with_body: L1BlockWithBody, ) -> Result { let fn_name = function_name!(); let _timer = self @@ -174,11 +174,14 @@ impl PipelineProcessorActor { .pipeline_processor_execution_tx_latency_seconds .with_label_values(&[fn_name]) .start_timer(); - let moveos_tx = self.executor.validate_l1_block(l1_block.clone()).await?; - let block_height = l1_block.block.block_height; + + let block = l1_block_with_body.block.clone(); + let block_height = block.block_height; + + let moveos_tx = self.executor.validate_l1_block(l1_block_with_body).await?; let ledger_tx = self .sequencer - .sequence_transaction(LedgerTxData::L1Block(l1_block.block)) + .sequence_transaction(LedgerTxData::L1Block(block)) .await?; let tx_order = ledger_tx.sequence_info.tx_order; let size = moveos_tx.ctx.tx_size; @@ -266,7 +269,9 @@ impl PipelineProcessorActor { &mut self, mut tx: RoochTransaction, ) -> Result { + let tx_hash = tx.tx_hash(); // cache tx_hash debug!("pipeline execute_l2_tx: {:?}", tx.tx_hash()); + let fn_name = function_name!(); let _timer = self .metrics @@ -288,7 +293,6 @@ impl PipelineProcessorActor { "Execute L2 Tx failed while VM panic occurred and revert tx. error: {:?} tx info {}", err, hex::encode(l2_tx_bcs_bytes) ); - let tx_hash = tx.tx_hash(); self.rooch_db.revert_tx(tx_hash)?; } return Err(err); @@ -332,11 +336,11 @@ impl PipelineProcessorActor { }) .await?; let root = execution_info.root_metadata(); - // Sync latest state root from writer executor to reader executor + // Sync the latest state root from writer executor to reader executor self.executor .refresh_state(root.clone(), output.is_upgrade) .await?; - // Save state change set is a notify call, do not block current task + // Save state change set is a notify call, do not block the current task let state_change_set_ext = StateChangeSetExt::new(output.changeset.clone(), moveos_tx.ctx.sequence_number); self.executor @@ -345,19 +349,17 @@ impl PipelineProcessorActor { let indexer = self.indexer.clone(); let sequence_info = tx.sequence_info.clone(); - let execution_info_clone = execution_info.clone(); - let output_clone = output.clone(); - // If bitcoin block data import, don't write all indexer + // If bitcoin block data import, don't write indexer if !self.service_status.is_date_import_mode() { - //The update_indexer is a notify call, do not block current task + //The update_indexer is a notification call, do not block the current task let result = indexer .update_indexer( tx, - execution_info_clone, + execution_info.clone(), moveos_tx, - output_clone.events, - output_clone.changeset, + output.events.clone(), + output.changeset.clone(), ) .await; match result { diff --git a/crates/rooch-relayer/src/actor/relayer.rs b/crates/rooch-relayer/src/actor/relayer.rs index b2eb48ea4e..92d905e01c 100644 --- a/crates/rooch-relayer/src/actor/relayer.rs +++ b/crates/rooch-relayer/src/actor/relayer.rs @@ -104,7 +104,7 @@ impl RelayerActor { async fn handle_l1_block(&mut self, l1_block: L1BlockWithBody) -> Result<()> { let block_hash = hex::encode(&l1_block.block.block_hash); let block_height = l1_block.block.block_height; - let result = self.processor.execute_l1_block(l1_block.clone()).await?; + let result = self.processor.execute_l1_block(l1_block).await?; match result.execution_info.status { KeptVMStatus::Executed => { diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index eeb13f7e7f..0064ae18a3 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -628,7 +628,7 @@ impl ExecInner { async fn execute(&self, msg: ExecMsg) -> anyhow::Result<()> { let ExecMsg { tx_order, - ledger_tx, + mut ledger_tx, l1_block_with_body, } = msg; @@ -638,6 +638,12 @@ impl ExecInner { let exp_state_root = exp_root_opt.map(|v| v.0); let exp_accumulator_root = exp_root_opt.map(|v| v.1); + // cache tx_hash + if is_l2_tx { + let _ = ledger_tx.tx_hash(); + } + // it's okay to sequence tx before validation, + // because in this case, all tx have been sequenced in Rooch Network. if self.mode.need_seq() { self.sequenced_tx_store .store_tx(ledger_tx.clone(), exp_accumulator_root)?;