Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(pipeline): reduce memcopy by avoiding whole L1BlockWithBody clone and optimize tx_hash caching #3259

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/rooch-benchmarks/benches/bench_tx_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rooch_benchmarks::tx_exec::tx_exec_benchmark;

criterion_group! {
name = tx_exec_bench;
config = configure_criterion(None);
config = configure_criterion(None).measurement_time(std::time::Duration::from_secs(5));
targets = tx_exec_benchmark
}

Expand Down
6 changes: 3 additions & 3 deletions crates/rooch-benchmarks/benches/bench_tx_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub fn tx_validate_benchmark(c: &mut Criterion) {

let tx_type = config.tx_type.unwrap().clone();

let tx_cnt = 600;
let tx_cnt = 1000;
let transactions: Vec<_> = (0..tx_cnt)
.map(|_n| {
// Because the validate function doesn't increase the sequence number,
Expand All @@ -34,14 +34,14 @@ pub fn tx_validate_benchmark(c: &mut Criterion) {
c.bench_function("tx_validate", |b| {
b.iter(|| {
let tx = transactions_iter.next().unwrap();
binding_test.executor.validate_l2_tx(tx.clone()).unwrap()
binding_test.executor.validate_l2_tx(tx).unwrap()
});
});
}

criterion_group! {
name = tx_validate_bench;
config = configure_criterion(None).measurement_time(Duration::from_millis(200));
config = configure_criterion(None).measurement_time(Duration::from_millis(5000));
targets = tx_validate_benchmark
}

Expand Down
6 changes: 3 additions & 3 deletions crates/rooch-benchmarks/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ pub fn find_block_height(dir: &Path) -> Result<Vec<u64>> {
}

pub fn create_btc_blk_tx(height: u64, block_file: &Path) -> Result<L1BlockWithBody> {
let block_hex_str = fs::read_to_string(block_file).unwrap();
let block_hex = Vec::<u8>::from_hex(&block_hex_str).unwrap();
let origin_block: Block = deserialize(&block_hex).unwrap();
let block_hex_str = fs::read_to_string(block_file)?;
let block_hex = Vec::<u8>::from_hex(&block_hex_str)?;
let origin_block: Block = deserialize(&block_hex)?;
let block = origin_block.clone();
let block_hash = block.header.block_hash();
let move_block = rooch_types::bitcoin::types::Block::from(block.clone());
Expand Down
30 changes: 16 additions & 14 deletions crates/rooch-pipeline-processor/src/actor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,22 @@ impl PipelineProcessorActor {
#[named]
pub async fn execute_l1_block(
&mut self,
l1_block: L1BlockWithBody,
l1_block_with_body: L1BlockWithBody,
) -> Result<ExecuteTransactionResponse> {
let fn_name = function_name!();
let _timer = self
.metrics
.pipeline_processor_execution_tx_latency_seconds
.with_label_values(&[fn_name])
.start_timer();
let moveos_tx = self.executor.validate_l1_block(l1_block.clone()).await?;
let block_height = l1_block.block.block_height;

let block = l1_block_with_body.block.clone();
let block_height = block.block_height;

let moveos_tx = self.executor.validate_l1_block(l1_block_with_body).await?;
let ledger_tx = self
.sequencer
.sequence_transaction(LedgerTxData::L1Block(l1_block.block))
.sequence_transaction(LedgerTxData::L1Block(block))
.await?;
let tx_order = ledger_tx.sequence_info.tx_order;
let size = moveos_tx.ctx.tx_size;
Expand Down Expand Up @@ -266,7 +269,9 @@ impl PipelineProcessorActor {
&mut self,
mut tx: RoochTransaction,
) -> Result<ExecuteTransactionResponse> {
let tx_hash = tx.tx_hash(); // cache tx_hash
debug!("pipeline execute_l2_tx: {:?}", tx.tx_hash());

let fn_name = function_name!();
let _timer = self
.metrics
Expand All @@ -288,7 +293,6 @@ impl PipelineProcessorActor {
"Execute L2 Tx failed while VM panic occurred and revert tx. error: {:?} tx info {}",
err, hex::encode(l2_tx_bcs_bytes)
);
let tx_hash = tx.tx_hash();
self.rooch_db.revert_tx(tx_hash)?;
}
return Err(err);
Expand Down Expand Up @@ -332,11 +336,11 @@ impl PipelineProcessorActor {
})
.await?;
let root = execution_info.root_metadata();
// Sync latest state root from writer executor to reader executor
// Sync the latest state root from writer executor to reader executor
self.executor
.refresh_state(root.clone(), output.is_upgrade)
.await?;
// Save state change set is a notify call, do not block current task
// Save state change set is a notify call, do not block the current task
let state_change_set_ext =
StateChangeSetExt::new(output.changeset.clone(), moveos_tx.ctx.sequence_number);
self.executor
Expand All @@ -345,19 +349,17 @@ impl PipelineProcessorActor {

let indexer = self.indexer.clone();
let sequence_info = tx.sequence_info.clone();
let execution_info_clone = execution_info.clone();
let output_clone = output.clone();

// If bitcoin block data import, don't write all indexer
// If bitcoin block data import, don't write indexer
if !self.service_status.is_date_import_mode() {
//The update_indexer is a notify call, do not block current task
//The update_indexer is a notification call, do not block the current task
let result = indexer
.update_indexer(
tx,
execution_info_clone,
execution_info.clone(),
moveos_tx,
output_clone.events,
output_clone.changeset,
output.events.clone(),
output.changeset.clone(),
)
.await;
match result {
Expand Down
2 changes: 1 addition & 1 deletion crates/rooch-relayer/src/actor/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl RelayerActor {
async fn handle_l1_block(&mut self, l1_block: L1BlockWithBody) -> Result<()> {
let block_hash = hex::encode(&l1_block.block.block_hash);
let block_height = l1_block.block.block_height;
let result = self.processor.execute_l1_block(l1_block.clone()).await?;
let result = self.processor.execute_l1_block(l1_block).await?;

match result.execution_info.status {
KeptVMStatus::Executed => {
Expand Down
8 changes: 7 additions & 1 deletion crates/rooch/src/commands/da/commands/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ impl ExecInner {
async fn execute(&self, msg: ExecMsg) -> anyhow::Result<()> {
let ExecMsg {
tx_order,
ledger_tx,
mut ledger_tx,
l1_block_with_body,
} = msg;

Expand All @@ -638,6 +638,12 @@ impl ExecInner {
let exp_state_root = exp_root_opt.map(|v| v.0);
let exp_accumulator_root = exp_root_opt.map(|v| v.1);

// cache tx_hash
if is_l2_tx {
let _ = ledger_tx.tx_hash();
}
// it's okay to sequence tx before validation,
// because in this case, all tx have been sequenced in Rooch Network.
if self.mode.need_seq() {
self.sequenced_tx_store
.store_tx(ledger_tx.clone(), exp_accumulator_root)?;
Expand Down
Loading