Skip to content

Commit

Permalink
refactor(pipeline): reduce memcopy by avoiding whole L1BlockWithBody …
Browse files Browse the repository at this point in the history
…clone and optimize tx_hash caching (#3259)

1. Refactored tx_hash caching to improve performance and reduce redundancy
2. memmove of block body is unnecessary
3. L1Block is small, that's the real field we need to share between actors. Clone it but not whole L1BlockWithBody
4.  Increase tx count and measurement time for benchmarking
  • Loading branch information
popcnt1 authored Feb 4, 2025
1 parent 61af80f commit bb595bc
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 23 deletions.
2 changes: 1 addition & 1 deletion crates/rooch-benchmarks/benches/bench_tx_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rooch_benchmarks::tx_exec::tx_exec_benchmark;

criterion_group! {
name = tx_exec_bench;
config = configure_criterion(None);
config = configure_criterion(None).measurement_time(std::time::Duration::from_secs(5));
targets = tx_exec_benchmark
}

Expand Down
6 changes: 3 additions & 3 deletions crates/rooch-benchmarks/benches/bench_tx_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub fn tx_validate_benchmark(c: &mut Criterion) {

let tx_type = config.tx_type.unwrap().clone();

let tx_cnt = 600;
let tx_cnt = 1000;
let transactions: Vec<_> = (0..tx_cnt)
.map(|_n| {
// Because the validate function doesn't increase the sequence number,
Expand All @@ -34,14 +34,14 @@ pub fn tx_validate_benchmark(c: &mut Criterion) {
c.bench_function("tx_validate", |b| {
b.iter(|| {
let tx = transactions_iter.next().unwrap();
binding_test.executor.validate_l2_tx(tx.clone()).unwrap()
binding_test.executor.validate_l2_tx(tx).unwrap()
});
});
}

criterion_group! {
name = tx_validate_bench;
config = configure_criterion(None).measurement_time(Duration::from_millis(200));
config = configure_criterion(None).measurement_time(Duration::from_millis(5000));
targets = tx_validate_benchmark
}

Expand Down
6 changes: 3 additions & 3 deletions crates/rooch-benchmarks/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ pub fn find_block_height(dir: &Path) -> Result<Vec<u64>> {
}

pub fn create_btc_blk_tx(height: u64, block_file: &Path) -> Result<L1BlockWithBody> {
let block_hex_str = fs::read_to_string(block_file).unwrap();
let block_hex = Vec::<u8>::from_hex(&block_hex_str).unwrap();
let origin_block: Block = deserialize(&block_hex).unwrap();
let block_hex_str = fs::read_to_string(block_file)?;
let block_hex = Vec::<u8>::from_hex(&block_hex_str)?;
let origin_block: Block = deserialize(&block_hex)?;
let block = origin_block.clone();
let block_hash = block.header.block_hash();
let move_block = rooch_types::bitcoin::types::Block::from(block.clone());
Expand Down
30 changes: 16 additions & 14 deletions crates/rooch-pipeline-processor/src/actor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,22 @@ impl PipelineProcessorActor {
#[named]
pub async fn execute_l1_block(
&mut self,
l1_block: L1BlockWithBody,
l1_block_with_body: L1BlockWithBody,
) -> Result<ExecuteTransactionResponse> {
let fn_name = function_name!();
let _timer = self
.metrics
.pipeline_processor_execution_tx_latency_seconds
.with_label_values(&[fn_name])
.start_timer();
let moveos_tx = self.executor.validate_l1_block(l1_block.clone()).await?;
let block_height = l1_block.block.block_height;

let block = l1_block_with_body.block.clone();
let block_height = block.block_height;

let moveos_tx = self.executor.validate_l1_block(l1_block_with_body).await?;
let ledger_tx = self
.sequencer
.sequence_transaction(LedgerTxData::L1Block(l1_block.block))
.sequence_transaction(LedgerTxData::L1Block(block))
.await?;
let tx_order = ledger_tx.sequence_info.tx_order;
let size = moveos_tx.ctx.tx_size;
Expand Down Expand Up @@ -266,7 +269,9 @@ impl PipelineProcessorActor {
&mut self,
mut tx: RoochTransaction,
) -> Result<ExecuteTransactionResponse> {
let tx_hash = tx.tx_hash(); // cache tx_hash
debug!("pipeline execute_l2_tx: {:?}", tx.tx_hash());

let fn_name = function_name!();
let _timer = self
.metrics
Expand All @@ -288,7 +293,6 @@ impl PipelineProcessorActor {
"Execute L2 Tx failed while VM panic occurred and revert tx. error: {:?} tx info {}",
err, hex::encode(l2_tx_bcs_bytes)
);
let tx_hash = tx.tx_hash();
self.rooch_db.revert_tx(tx_hash)?;
}
return Err(err);
Expand Down Expand Up @@ -332,11 +336,11 @@ impl PipelineProcessorActor {
})
.await?;
let root = execution_info.root_metadata();
// Sync latest state root from writer executor to reader executor
// Sync the latest state root from writer executor to reader executor
self.executor
.refresh_state(root.clone(), output.is_upgrade)
.await?;
// Save state change set is a notify call, do not block current task
// Save state change set is a notify call, do not block the current task
let state_change_set_ext =
StateChangeSetExt::new(output.changeset.clone(), moveos_tx.ctx.sequence_number);
self.executor
Expand All @@ -345,19 +349,17 @@ impl PipelineProcessorActor {

let indexer = self.indexer.clone();
let sequence_info = tx.sequence_info.clone();
let execution_info_clone = execution_info.clone();
let output_clone = output.clone();

// If bitcoin block data import, don't write all indexer
// If bitcoin block data import, don't write indexer
if !self.service_status.is_date_import_mode() {
//The update_indexer is a notify call, do not block current task
//The update_indexer is a notification call, do not block the current task
let result = indexer
.update_indexer(
tx,
execution_info_clone,
execution_info.clone(),
moveos_tx,
output_clone.events,
output_clone.changeset,
output.events.clone(),
output.changeset.clone(),
)
.await;
match result {
Expand Down
2 changes: 1 addition & 1 deletion crates/rooch-relayer/src/actor/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl RelayerActor {
async fn handle_l1_block(&mut self, l1_block: L1BlockWithBody) -> Result<()> {
let block_hash = hex::encode(&l1_block.block.block_hash);
let block_height = l1_block.block.block_height;
let result = self.processor.execute_l1_block(l1_block.clone()).await?;
let result = self.processor.execute_l1_block(l1_block).await?;

match result.execution_info.status {
KeptVMStatus::Executed => {
Expand Down
8 changes: 7 additions & 1 deletion crates/rooch/src/commands/da/commands/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ impl ExecInner {
async fn execute(&self, msg: ExecMsg) -> anyhow::Result<()> {
let ExecMsg {
tx_order,
ledger_tx,
mut ledger_tx,
l1_block_with_body,
} = msg;

Expand All @@ -638,6 +638,12 @@ impl ExecInner {
let exp_state_root = exp_root_opt.map(|v| v.0);
let exp_accumulator_root = exp_root_opt.map(|v| v.1);

// cache tx_hash
if is_l2_tx {
let _ = ledger_tx.tx_hash();
}
// it's okay to sequence tx before validation,
// because in this case, all tx have been sequenced in Rooch Network.
if self.mode.need_seq() {
self.sequenced_tx_store
.store_tx(ledger_tx.clone(), exp_accumulator_root)?;
Expand Down

0 comments on commit bb595bc

Please sign in to comment.