Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rooch-da): restructure backend architecture for OpenDA #3140

Merged
merged 16 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 161 additions & 66 deletions crates/rooch-config/src/da_config.rs

Large diffs are not rendered by default.

164 changes: 43 additions & 121 deletions crates/rooch-da/src/actor/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
// SPDX-License-Identifier: Apache-2.0

use crate::actor::messages::{AppendTransactionMessage, GetServerStatusMessage};
use crate::backend::openda::OpenDABackend;
use crate::backend::DABackend;
use crate::backend::{DABackend, DABackends};
use crate::batcher::BatchMaker;
use anyhow::anyhow;
use async_trait::async_trait;
use coerce::actor::context::ActorContext;
use coerce::actor::message::Handler;
use coerce::actor::Actor;
use moveos_types::h256::H256;
use rooch_config::da_config::{DABackendConfigType, DAConfig};
use rooch_config::da_config::{DAConfig, DEFAULT_DA_BACKGROUND_SUBMIT_INTERVAL};
use rooch_store::da_store::DAMetaStore;
use rooch_store::transaction_store::TransactionStore;
use rooch_store::RoochStore;
Expand All @@ -25,18 +24,9 @@ use std::time;
use std::time::{Duration, SystemTime};
use tokio::sync::broadcast;

// default background submit interval: 5 seconds
// smaller interval helps to reduce the delay of blocks making and submitting, get more accurate block number by status query
// the major duty of background submitter is to submit unsubmitted blocks made before server start,
// in most cases, backends work well enough to submit new blocks in time, which means after submitting old blocks,
// background submitter will have nothing to do.
// Only few database operations are needed to catch up with the latest block numbers,
// so it's okay to have a small interval.
const DEFAULT_BACKGROUND_SUBMIT_INTERVAL: u64 = 5;

pub struct DAServerActor {
rooch_store: RoochStore,
backend_names: Vec<String>,
backend_identifiers: Vec<String>,
last_block_number: Option<u128>,
last_block_update_time: u64,
background_last_block_update_time: Arc<AtomicU64>,
Expand All @@ -45,83 +35,6 @@ pub struct DAServerActor {

impl Actor for DAServerActor {}

struct ServerBackends {
backends: Vec<Arc<dyn DABackend>>,
backend_names: Vec<String>,
submit_threshold: usize,
is_nop_backend: bool,
background_submit_interval: u64,
}

impl ServerBackends {
const DEFAULT_SUBMIT_THRESHOLD: usize = 1;
const DEFAULT_IS_NOP_BACKEND: bool = false;
const DEFAULT_BACKGROUND_INTERVAL: u64 = DEFAULT_BACKGROUND_SUBMIT_INTERVAL;

async fn process_backend_configs(
backend_configs: &[DABackendConfigType],
genesis_namespace: String,
backends: &mut Vec<Arc<dyn DABackend>>,
backend_names: &mut Vec<String>,
) -> anyhow::Result<usize> {
let mut available_backends = 0;
for backend_type in backend_configs {
#[allow(irrefutable_let_patterns)]
if let DABackendConfigType::OpenDa(openda_config) = backend_type {
let backend = OpenDABackend::new(openda_config, genesis_namespace.clone()).await?;
backends.push(Arc::new(backend));
backend_names.push(format!("openda-{}", openda_config.scheme));
available_backends += 1;
}
}
Ok(available_backends)
}

async fn build(da_config: DAConfig, genesis_namespace: String) -> anyhow::Result<Self> {
let mut backends: Vec<Arc<dyn DABackend>> = Vec::new();
let mut backend_names: Vec<String> = Vec::new();
let mut submit_threshold = Self::DEFAULT_SUBMIT_THRESHOLD;
let mut is_nop_backend = Self::DEFAULT_IS_NOP_BACKEND;
let background_submit_interval = da_config
.da_backend
.as_ref()
.and_then(|backend_config| backend_config.background_submit_interval)
.unwrap_or(Self::DEFAULT_BACKGROUND_INTERVAL);

let mut available_backends_count = 1; // Nop is always available
if let Some(mut backend_config) = da_config.da_backend {
submit_threshold = backend_config.calculate_submit_threshold();
available_backends_count = Self::process_backend_configs(
&backend_config.backends,
genesis_namespace,
&mut backends,
&mut backend_names,
)
.await?;
} else {
is_nop_backend = true;
backends.push(Arc::new(crate::backend::DABackendNopProxy {}));
backend_names.push("nop".to_string());
}

if available_backends_count < submit_threshold {
return Err(anyhow!(
"failed to start da: not enough backends for future submissions. exp>= {} act: {}",
submit_threshold,
available_backends_count
));
}

Ok(Self {
backends,
backend_names,
submit_threshold,
is_nop_backend,
background_submit_interval,
})
}
}

impl DAServerActor {
pub async fn new(
da_config: DAConfig,
Expand All @@ -131,27 +44,33 @@ impl DAServerActor {
shutdown_rx: broadcast::Receiver<()>,
) -> anyhow::Result<Self> {
let min_block_to_submit = da_config.da_min_block_to_submit;
let ServerBackends {
let background_submit_interval = da_config
.background_submit_interval
.unwrap_or(DEFAULT_DA_BACKGROUND_SUBMIT_INTERVAL);

let DABackends {
backends,
backend_names,
submit_threshold,
is_nop_backend,
background_submit_interval,
} = ServerBackends::build(da_config, genesis_namespace).await?;
} = DABackends::initialize(da_config.da_backend, genesis_namespace).await?;

let backend_identifiers: Vec<String> = backends
.iter()
.map(|backend| backend.get_identifier())
.collect();

let last_block_number = rooch_store.get_last_block_number()?;
let background_last_block_update_time = Arc::new(AtomicU64::new(0));
let server = DAServerActor {
rooch_store: rooch_store.clone(),
backend_names,
backend_identifiers,
last_block_number,
last_block_update_time: 0,
background_last_block_update_time: background_last_block_update_time.clone(),
batch_maker: BatchMaker::new(),
};

if !is_nop_backend {
Self::create_background_submitter(
if submit_threshold != 0 {
Self::run_background_submitter(
rooch_store,
sequencer_key,
backends,
Expand Down Expand Up @@ -195,14 +114,20 @@ impl DAServerActor {
None
};

let avail_backends = if self.backend_identifiers.is_empty() {
vec!["nop".to_string()]
} else {
self.backend_identifiers.clone()
};

Ok(DAServerStatus {
last_block_number: self.last_block_number,
last_tx_order,
last_block_update_time,
last_avail_block_number,
last_avail_tx_order,
last_avail_block_update_time,
avail_backends: self.backend_names.clone(),
avail_backends,
})
}

Expand All @@ -227,7 +152,7 @@ impl DAServerActor {

// Spawns a background submitter to handle unsubmitted blocks off the main thread.
// This prevents blocking other actor handlers and maintains the actor's responsiveness.
fn create_background_submitter(
fn run_background_submitter(
rooch_store: RoochStore,
sequencer_key: RoochKeyPair,
backends: Vec<Arc<dyn DABackend>>,
Expand All @@ -243,7 +168,6 @@ impl DAServerActor {
submitter: Submitter {
sequencer_key: sequencer_key.copy(),
rooch_store: rooch_store.clone(),
nop_backend: false, // background submitter should not be nop-backend
backends: backends.clone(),
submit_threshold,
},
Expand Down Expand Up @@ -322,14 +246,11 @@ pub(crate) struct Submitter {
sequencer_key: RoochKeyPair,
rooch_store: RoochStore,

nop_backend: bool,
backends: Vec<Arc<dyn DABackend>>,
submit_threshold: usize,
}

impl Submitter {
// TODO check all backends are idempotent or not, if not, we need to add a check to avoid duplicated submission
// assume it's idempotent for now
async fn submit_batch_raw(
&self,
block_range: BlockRange,
Expand All @@ -354,21 +275,18 @@ impl Submitter {
// submit batch
self.submit_batch_to_backends(batch).await?;

// update block submitting state if it's not nop-backend
// if it's nop-backend, we don't need to update submitting state, we may need to submit batch to other backends later by fetch unsubmitted blocks
if !self.nop_backend {
match self.rooch_store.set_submitting_block_done(
block_number,
tx_order_start,
tx_order_end,
batch_hash,
) {
Ok(_) => {}
Err(e) => {
tracing::warn!("{:?}, fail to set submitting block done.", e);
}
};
match self.rooch_store.set_submitting_block_done(
block_number,
tx_order_start,
tx_order_end,
batch_hash,
) {
Ok(_) => {}
Err(e) => {
tracing::warn!("{:?}, fail to set submitting block done.", e);
}
};

Ok(SignedDABatchMeta {
meta: batch_meta,
signature: meta_signature,
Expand All @@ -378,16 +296,20 @@ impl Submitter {
async fn submit_batch_to_backends(&self, batch: DABatch) -> anyhow::Result<()> {
let backends = self.backends.clone();
let submit_threshold = self.submit_threshold;

let batch = Arc::new(batch);

// submit to backend in order until meet submit_threshold
let mut success_count = 0;
for backend in &backends {
let submit_fut = backend.submit_batch(batch.clone());
match submit_fut.await {
Ok(_) => {
success_count += 1;
if success_count >= submit_threshold {
break;
}
// TODO parallel submit
// if success_count >= submit_threshold {
// break;
// }
}
Err(e) => {
tracing::warn!("{:?}, fail to submit batch to backend.", e);
Expand Down
60 changes: 34 additions & 26 deletions crates/rooch-da/src/backend/README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
# Backend

Implementations of DA backend.
## Overview

```
+-------------------------+
| DAServerActor |
| (Manages Backends) |
+-------------------------+
|
v
+-------------------------+
| DABackend | <- Trait for all backends
+-------------------------+
^
|
v
+---------------------------------------------------+
| OpenDABackendManager | <- Manages OpenDA-specific backends
| - Common OpenDA logic (batches, configs, etc.) |
| - Reduces redundancy among OpenDA backends |
+---------------------------------------------------+
|
v
+-------------------------------------+
| OpenDAAdapter | <- Trait for OpenDA-specific backend operations
| - submit_segment(), ... |
| - Backend-specific operations |
+-------------------------------------+
^
|
v
+-------------------+ +-------------------+
| CelestiaAdapter | | AvailAdapter | <- Actual backend-specific adapter implementations
+-------------------+ +-------------------+
```

## Open-DA

> - fs: local/remote file system
>- avail: Avail project DA
>- celestia: Celestia DA

## New Backend

For new added backend:

If it could satisfy open-da config, it should be added to `open-da` folder as a module. If not, it should be added to
`backend` folder directly.

## Backend Implementations & Verification

| Name | Description | Category | Implementation | Local | Testnet | Mainnet |
|----------|--------------------------------------------|----------|------------------------------|-------|---------|---------|
| fs | file I/O based on local/remote file system | open-da | [fs](open-da/fs) | ✅ | ✅ | ✅ |
| avail | Avail project DA | open-da | [avail](open-da/avail) | 🔲 | 🔲 | 🔲 |
| celestia | Celestia DA | open-da | [celestia](open-da/celestia) | 🔲 | 🔲 | 🔲 |
| gcs | file I/O based on Google Cloud Storage | open-da | [gcs](open-da/fs) | ✅ | ✅ | ✅ |

- [x] ✅ done
- [ ] 🔲 unfinished
- [ ] ❌ has issues
Loading
Loading