Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rooch-da): add transaction revert handling #3266

Merged
merged 3 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/rooch-da/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,12 @@ impl AppendTransactionMessage {
}
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RevertTransactionMessage {
pub tx_order: u64,
}

impl Message for RevertTransactionMessage {
type Result = anyhow::Result<()>;
}
24 changes: 23 additions & 1 deletion crates/rooch-da/src/actor/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::actor::messages::{AppendTransactionMessage, GetServerStatusMessage};
use crate::actor::messages::{
AppendTransactionMessage, GetServerStatusMessage, RevertTransactionMessage,
};
use crate::backend::openda::AdapterSubmitStat;
use crate::backend::{DABackend, DABackends};
use crate::batcher::BatchMaker;
Expand Down Expand Up @@ -163,6 +165,15 @@ impl DAServerActor {
Ok(())
}

pub async fn revert_transaction(
&mut self,
msg: RevertTransactionMessage,
) -> anyhow::Result<()> {
let tx_order = msg.tx_order;
self.batch_maker.revert_transaction(tx_order)?;
Ok(())
}

// Spawns a background submitter to handle unsubmitted blocks off the main thread.
// This prevents blocking other actor handlers and maintains the actor's responsiveness.
fn run_background_submitter(
Expand Down Expand Up @@ -255,6 +266,17 @@ impl Handler<AppendTransactionMessage> for DAServerActor {
}
}

#[async_trait]
impl Handler<RevertTransactionMessage> for DAServerActor {
async fn handle(
&mut self,
msg: RevertTransactionMessage,
_ctx: &mut ActorContext,
) -> anyhow::Result<()> {
self.revert_transaction(msg).await
}
}

pub(crate) struct Submitter {
sequencer_key: RoochKeyPair,
rooch_store: RoochStore,
Expand Down
60 changes: 59 additions & 1 deletion crates/rooch-da/src/batcher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use anyhow::anyhow;
use rooch_config::settings::ROOCH_BATCH_INTERVAL;
use rooch_store::da_store::DAMetaStore;
use rooch_store::RoochStore;
Expand Down Expand Up @@ -70,20 +71,77 @@ impl InProgressBatch {
}

pub struct BatchMaker {
pending_tx: PendingTx,
in_progress_batch: InProgressBatch,
rooch_store: RoochStore,
}

struct PendingTx {
tx_order: u64,
tx_timestamp: u64,
}

impl PendingTx {
fn new() -> Self {
Self {
tx_order: 0,
tx_timestamp: 0,
}
}

fn revert(&mut self, tx_order: u64) -> anyhow::Result<()> {
let pending_tx_order = self.tx_order;
if tx_order != pending_tx_order {
return Err(anyhow!(
"failed to revert pending transaction: transaction order is not continuous, pending_tx_order: {}, revert_tx_order: {}",
pending_tx_order,
tx_order
));
}
self.tx_order = 0;
self.tx_timestamp = 0;
Ok(())
}

fn push(&mut self, tx_order: u64, tx_timestamp: u64) -> Option<PendingTx> {
let old = if self.tx_order == 0 {
None
} else {
Some(PendingTx {
tx_order: self.tx_order,
tx_timestamp: self.tx_timestamp,
})
};
self.tx_order = tx_order;
self.tx_timestamp = tx_timestamp;
old
}
}

impl BatchMaker {
pub fn new(rooch_store: RoochStore) -> Self {
Self {
pending_tx: PendingTx::new(),
in_progress_batch: InProgressBatch::init(),
rooch_store,
}
}

// append transaction to the batch, return block number if a new batch is made
pub fn append_transaction(&mut self, tx_order: u64, tx_timestamp: u64) -> Option<u128> {
if let Some(old) = self.pending_tx.push(tx_order, tx_timestamp) {
if let Some(block_number) = self.add_to_batch(old.tx_order, old.tx_timestamp) {
return Some(block_number);
}
}
None
}

pub fn revert_transaction(&mut self, tx_order: u64) -> anyhow::Result<()> {
self.pending_tx.revert(tx_order)
}

// append transaction to the batch, return block number if a new batch is made
fn add_to_batch(&mut self, tx_order: u64, tx_timestamp: u64) -> Option<u128> {
let order_range = self
.in_progress_batch
.append_transaction(tx_order, tx_timestamp);
Expand Down
8 changes: 7 additions & 1 deletion crates/rooch-da/src/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::actor::messages::{AppendTransactionMessage, GetServerStatusMessage};
use crate::actor::messages::{
AppendTransactionMessage, GetServerStatusMessage, RevertTransactionMessage,
};
use crate::actor::server::DAServerActor;
use coerce::actor::ActorRef;
use rooch_types::da::status::DAServerStatus;
Expand All @@ -23,4 +25,8 @@ impl DAServerProxy {
pub async fn append_tx(&self, msg: AppendTransactionMessage) -> anyhow::Result<()> {
self.actor.send(msg).await?
}

pub async fn revert_tx(&self, msg: RevertTransactionMessage) -> anyhow::Result<()> {
self.actor.send(msg).await?
}
}
31 changes: 24 additions & 7 deletions crates/rooch-pipeline-processor/src/actor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use moveos::moveos::VMPanicError;
use moveos_types::state::StateChangeSetExt;
use moveos_types::transaction::VerifiedMoveOSTransaction;
use prometheus::Registry;
use rooch_da::actor::messages::AppendTransactionMessage;
use rooch_da::actor::messages::{AppendTransactionMessage, RevertTransactionMessage};
use rooch_da::proxy::DAServerProxy;
use rooch_db::RoochDB;
use rooch_event::actor::{EventActor, UpdateServiceStatusMessage};
Expand Down Expand Up @@ -183,6 +183,12 @@ impl PipelineProcessorActor {
.sequencer
.sequence_transaction(LedgerTxData::L1Block(block))
.await?;
self.da_server
.append_tx(AppendTransactionMessage {
tx_order: ledger_tx.sequence_info.tx_order,
tx_timestamp: ledger_tx.sequence_info.tx_timestamp,
})
.await?;
let tx_order = ledger_tx.sequence_info.tx_order;
let size = moveos_tx.ctx.tx_size;
let result = match self.execute_tx(ledger_tx, moveos_tx).await {
Expand Down Expand Up @@ -227,6 +233,12 @@ impl PipelineProcessorActor {
.sequencer
.sequence_transaction(LedgerTxData::L1Tx(l1_tx.clone()))
.await?;
self.da_server
.append_tx(AppendTransactionMessage {
tx_order: ledger_tx.sequence_info.tx_order,
tx_timestamp: ledger_tx.sequence_info.tx_timestamp,
})
.await?;
let size = moveos_tx.ctx.tx_size;
let tx_order = ledger_tx.sequence_info.tx_order;
let result = match self.execute_tx(ledger_tx, moveos_tx).await {
Expand Down Expand Up @@ -283,6 +295,13 @@ impl PipelineProcessorActor {
.sequencer
.sequence_transaction(LedgerTxData::L2Tx(tx.clone()))
.await?;
let tx_order = ledger_tx.sequence_info.tx_order;
self.da_server
.append_tx(AppendTransactionMessage {
tx_order,
tx_timestamp: ledger_tx.sequence_info.tx_timestamp,
})
.await?;
let size = moveos_tx.ctx.tx_size;
let result = match self.execute_tx(ledger_tx, moveos_tx).await {
Ok(v) => v,
Expand All @@ -293,6 +312,9 @@ impl PipelineProcessorActor {
"Execute L2 Tx failed while VM panic occurred and revert tx. error: {:?} tx info {}",
err, hex::encode(l2_tx_bcs_bytes)
);
self.da_server
.revert_tx(RevertTransactionMessage { tx_order })
.await?;
self.rooch_db.revert_tx(tx_hash)?;
}
return Err(err);
Expand Down Expand Up @@ -329,12 +351,7 @@ impl PipelineProcessorActor {
// Then execute
let size = moveos_tx.ctx.tx_size;
let (output, execution_info) = self.executor.execute_transaction(moveos_tx.clone()).await?;
self.da_server
.append_tx(AppendTransactionMessage {
tx_order: tx.sequence_info.tx_order,
tx_timestamp: tx.sequence_info.tx_timestamp,
})
.await?;

let root = execution_info.root_metadata();
// Sync the latest state root from writer executor to reader executor
self.executor
Expand Down
Loading