diff --git a/crates/rooch-da/src/actor/messages.rs b/crates/rooch-da/src/actor/messages.rs index 35e91cdc9e..083af2e36c 100644 --- a/crates/rooch-da/src/actor/messages.rs +++ b/crates/rooch-da/src/actor/messages.rs @@ -30,3 +30,12 @@ impl AppendTransactionMessage { } } } + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct RevertTransactionMessage { + pub tx_order: u64, +} + +impl Message for RevertTransactionMessage { + type Result = anyhow::Result<()>; +} diff --git a/crates/rooch-da/src/actor/server.rs b/crates/rooch-da/src/actor/server.rs index ad5c2f5ded..8b9da167df 100644 --- a/crates/rooch-da/src/actor/server.rs +++ b/crates/rooch-da/src/actor/server.rs @@ -1,7 +1,9 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::actor::messages::{AppendTransactionMessage, GetServerStatusMessage}; +use crate::actor::messages::{ + AppendTransactionMessage, GetServerStatusMessage, RevertTransactionMessage, +}; use crate::backend::openda::AdapterSubmitStat; use crate::backend::{DABackend, DABackends}; use crate::batcher::BatchMaker; @@ -163,6 +165,15 @@ impl DAServerActor { Ok(()) } + pub async fn revert_transaction( + &mut self, + msg: RevertTransactionMessage, + ) -> anyhow::Result<()> { + let tx_order = msg.tx_order; + self.batch_maker.revert_transaction(tx_order)?; + Ok(()) + } + // Spawns a background submitter to handle unsubmitted blocks off the main thread. // This prevents blocking other actor handlers and maintains the actor's responsiveness. fn run_background_submitter( @@ -255,6 +266,17 @@ impl Handler for DAServerActor { } } +#[async_trait] +impl Handler for DAServerActor { + async fn handle( + &mut self, + msg: RevertTransactionMessage, + _ctx: &mut ActorContext, + ) -> anyhow::Result<()> { + self.revert_transaction(msg).await + } +} + pub(crate) struct Submitter { sequencer_key: RoochKeyPair, rooch_store: RoochStore, diff --git a/crates/rooch-da/src/batcher/mod.rs b/crates/rooch-da/src/batcher/mod.rs index 8ba3e03e17..fb40e16268 100644 --- a/crates/rooch-da/src/batcher/mod.rs +++ b/crates/rooch-da/src/batcher/mod.rs @@ -1,6 +1,7 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use anyhow::anyhow; use rooch_config::settings::ROOCH_BATCH_INTERVAL; use rooch_store::da_store::DAMetaStore; use rooch_store::RoochStore; @@ -70,20 +71,81 @@ impl InProgressBatch { } pub struct BatchMaker { + pending_tx: PendingTx, in_progress_batch: InProgressBatch, rooch_store: RoochStore, } +struct PendingTx { + tx_order: u64, + tx_timestamp: u64, +} + +impl PendingTx { + fn new() -> Self { + Self { + tx_order: 0, + tx_timestamp: 0, + } + } + + fn revert(&mut self, tx_order: u64) -> anyhow::Result<()> { + let pending_tx_order = self.tx_order; + if tx_order != pending_tx_order { + return Err(anyhow!( + "failed to revert pending transaction: transaction order is not continuous, pending_tx_order: {}, revert_tx_order: {}", + pending_tx_order, + tx_order + )); + } + self.tx_order = 0; + self.tx_timestamp = 0; + Ok(()) + } + + fn push(&mut self, tx_order: u64, tx_timestamp: u64) -> Option { + let old = if self.tx_order == 0 { + None + } else { + Some(PendingTx { + tx_order: self.tx_order, + tx_timestamp: self.tx_timestamp, + }) + }; + self.tx_order = tx_order; + self.tx_timestamp = tx_timestamp; + old + } +} + impl BatchMaker { pub fn new(rooch_store: RoochStore) -> Self { Self { + pending_tx: PendingTx::new(), in_progress_batch: InProgressBatch::init(), rooch_store, } } - // append transaction to the batch, return block number if a new batch is made + // append transaction: + // 1. push the new transaction to pending_tx return the old one if it has + // 2. add the old transaction to the batch, return block number if a new batch is made pub fn append_transaction(&mut self, tx_order: u64, tx_timestamp: u64) -> Option { + if let Some(old) = self.pending_tx.push(tx_order, tx_timestamp) { + if let Some(block_number) = self.add_to_batch(old.tx_order, old.tx_timestamp) { + return Some(block_number); + } + } + None + } + + // revert pending transaction + pub fn revert_transaction(&mut self, tx_order: u64) -> anyhow::Result<()> { + self.pending_tx.revert(tx_order) + } + + // add transaction to the batch, return block number if a new batch is made + fn add_to_batch(&mut self, tx_order: u64, tx_timestamp: u64) -> Option { let order_range = self .in_progress_batch .append_transaction(tx_order, tx_timestamp); diff --git a/crates/rooch-da/src/proxy/mod.rs b/crates/rooch-da/src/proxy/mod.rs index 559141e094..f2e9fb9afb 100644 --- a/crates/rooch-da/src/proxy/mod.rs +++ b/crates/rooch-da/src/proxy/mod.rs @@ -1,7 +1,9 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::actor::messages::{AppendTransactionMessage, GetServerStatusMessage}; +use crate::actor::messages::{ + AppendTransactionMessage, GetServerStatusMessage, RevertTransactionMessage, +}; use crate::actor::server::DAServerActor; use coerce::actor::ActorRef; use rooch_types::da::status::DAServerStatus; @@ -23,4 +25,8 @@ impl DAServerProxy { pub async fn append_tx(&self, msg: AppendTransactionMessage) -> anyhow::Result<()> { self.actor.send(msg).await? } + + pub async fn revert_tx(&self, msg: RevertTransactionMessage) -> anyhow::Result<()> { + self.actor.send(msg).await? + } } diff --git a/crates/rooch-pipeline-processor/src/actor/processor.rs b/crates/rooch-pipeline-processor/src/actor/processor.rs index 71a2c72eb8..4d22fee2ec 100644 --- a/crates/rooch-pipeline-processor/src/actor/processor.rs +++ b/crates/rooch-pipeline-processor/src/actor/processor.rs @@ -15,7 +15,7 @@ use moveos::moveos::VMPanicError; use moveos_types::state::StateChangeSetExt; use moveos_types::transaction::VerifiedMoveOSTransaction; use prometheus::Registry; -use rooch_da::actor::messages::AppendTransactionMessage; +use rooch_da::actor::messages::{AppendTransactionMessage, RevertTransactionMessage}; use rooch_da::proxy::DAServerProxy; use rooch_db::RoochDB; use rooch_event::actor::{EventActor, UpdateServiceStatusMessage}; @@ -183,6 +183,12 @@ impl PipelineProcessorActor { .sequencer .sequence_transaction(LedgerTxData::L1Block(block)) .await?; + self.da_server + .append_tx(AppendTransactionMessage { + tx_order: ledger_tx.sequence_info.tx_order, + tx_timestamp: ledger_tx.sequence_info.tx_timestamp, + }) + .await?; let tx_order = ledger_tx.sequence_info.tx_order; let size = moveos_tx.ctx.tx_size; let result = match self.execute_tx(ledger_tx, moveos_tx).await { @@ -227,6 +233,12 @@ impl PipelineProcessorActor { .sequencer .sequence_transaction(LedgerTxData::L1Tx(l1_tx.clone())) .await?; + self.da_server + .append_tx(AppendTransactionMessage { + tx_order: ledger_tx.sequence_info.tx_order, + tx_timestamp: ledger_tx.sequence_info.tx_timestamp, + }) + .await?; let size = moveos_tx.ctx.tx_size; let tx_order = ledger_tx.sequence_info.tx_order; let result = match self.execute_tx(ledger_tx, moveos_tx).await { @@ -283,6 +295,13 @@ impl PipelineProcessorActor { .sequencer .sequence_transaction(LedgerTxData::L2Tx(tx.clone())) .await?; + let tx_order = ledger_tx.sequence_info.tx_order; + self.da_server + .append_tx(AppendTransactionMessage { + tx_order, + tx_timestamp: ledger_tx.sequence_info.tx_timestamp, + }) + .await?; let size = moveos_tx.ctx.tx_size; let result = match self.execute_tx(ledger_tx, moveos_tx).await { Ok(v) => v, @@ -293,6 +312,9 @@ impl PipelineProcessorActor { "Execute L2 Tx failed while VM panic occurred and revert tx. error: {:?} tx info {}", err, hex::encode(l2_tx_bcs_bytes) ); + self.da_server + .revert_tx(RevertTransactionMessage { tx_order }) + .await?; self.rooch_db.revert_tx(tx_hash)?; } return Err(err); @@ -329,12 +351,7 @@ impl PipelineProcessorActor { // Then execute let size = moveos_tx.ctx.tx_size; let (output, execution_info) = self.executor.execute_transaction(moveos_tx.clone()).await?; - self.da_server - .append_tx(AppendTransactionMessage { - tx_order: tx.sequence_info.tx_order, - tx_timestamp: tx.sequence_info.tx_timestamp, - }) - .await?; + let root = execution_info.root_metadata(); // Sync the latest state root from writer executor to reader executor self.executor