Skip to content

Commit

Permalink
refactor(conductor): make firm, soft readers subtasks (#1926)
Browse files Browse the repository at this point in the history
## Summary

Makes the firm and soft readers subtasks of the main exection loop.

## Background

Making the celestia (firm) and astria (soft)
readers subtasks of the executor loop is a more
faithful representation of their dependencies:

1. the execution loop can run with either or both present.
2. but the readers cannot run without the
  execution loop present (the tasks do not run at the
  same level of privilege).
3. to fully initialize they the readers depend on
  data provided via the execution loop (rollup genesis
  info and commitment state).

This patch then spins up the readers during initialization
of the execution event loop, which allows removing a lot
of complexity from the conductor codebase:

1. the readers need not explicitly wait for the state to be
  initialized but receive an already initialized state.
2. there is no need for a bespoke channel to dynamically
  set permits (this was needed for dynamic backpressure
  handling by setting the channel capacity after reading the
  celestia variance from the rollup): a standard mpsc channel
  can be instantiated and then passed to the (soft/sequencer)
  reader.

executor::Initialized::run delegates to
executor::Initialized::run_event_loop to separate
the shutdown token from the other arms of the
select macro - this way, an else => {} arm can be
introduced that shuts down executor as a fallback

## Changes

- make the firm (celestia) and soft (sequencer) readers subtasks of the
executor task
- remove the `JoinMap` from the `conductor::Inner` business logic as
this is
now tracked by the executor task
- initialize the reader tasks only after the initial rollup node state
is received
- in the executor task, separate the test for shutdown token
cancellation
from its main event loop so that the `tokio::select!` macro's `else =>
{}` arm
  can be used as a fallback.
- remove the bespoke `executor::channel` for dynamically setting channel
capacity at runtime (no longer necessary; the channel is initialized
later
  after we know its capacity)
- simplify the `State` watch channel notifying the readers of the latest
rollup
state by removing the `StateIsInit` and `StateNotInit` markers (the
readers
  now get a fully initialized watch channel).

## Testing
These changes should be transparent and not noticeable. The blackbox
tests
still pass which implies that the (tested) conductor behavior works.

## Changelogs

**NOTE**: Update before merge

Ensure all relevant changelog files are updated as necessary. See
[keepachangelog](https://keepachangelog.com/en/1.1.0/#how) for change
categories. Replace this text with e.g. "Changelogs updated." or "No
updates
required." to acknowledge changelogs have been considered.


## Related Issues

**NOTE**: Link the forma issue?

Link any issues that are related, prefer full GitHub links.

closes <!-- list any issues closed here -->
  • Loading branch information
SuperFluffy authored Feb 3, 2025
1 parent 141988f commit 77d0217
Show file tree
Hide file tree
Showing 14 changed files with 481 additions and 1,014 deletions.
16 changes: 11 additions & 5 deletions crates/astria-conductor/src/celestia/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@ use jsonrpsee::http_client::HttpClient as CelestiaClient;
use tendermint_rpc::HttpClient as SequencerClient;
use tokio_util::sync::CancellationToken;

use super::Reader;
use super::{
Reader,
ReconstructedBlock,
};
use crate::{
executor,
executor::StateReceiver,
metrics::Metrics,
};

pub(crate) struct Builder {
pub(crate) celestia_block_time: Duration,
pub(crate) celestia_http_endpoint: String,
pub(crate) celestia_token: Option<String>,
pub(crate) executor: executor::Handle,
pub(crate) firm_blocks: tokio::sync::mpsc::Sender<Box<ReconstructedBlock>>,
pub(crate) rollup_state: StateReceiver,
pub(crate) sequencer_cometbft_client: SequencerClient,
pub(crate) sequencer_requests_per_second: u32,
pub(crate) expected_celestia_chain_id: String,
Expand All @@ -36,13 +40,14 @@ impl Builder {
celestia_block_time,
celestia_http_endpoint,
celestia_token,
executor,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
expected_sequencer_chain_id,
shutdown,
metrics,
firm_blocks,
rollup_state,
} = self;

let celestia_client = create_celestia_client(celestia_http_endpoint, celestia_token)
Expand All @@ -51,7 +56,8 @@ impl Builder {
Ok(Reader {
celestia_block_time,
celestia_client,
executor,
firm_blocks,
rollup_state,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
Expand Down
120 changes: 53 additions & 67 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,11 @@ use tracing::{
trace,
trace_span,
warn,
Instrument as _,
};

use crate::{
block_cache::GetSequencerHeight,
executor::{
FirmSendError,
FirmTrySendError,
StateIsInit,
},
metrics::Metrics,
utils::flatten,
};
Expand Down Expand Up @@ -95,10 +91,7 @@ use self::{
BlobVerifier,
},
};
use crate::{
block_cache::BlockCache,
executor,
};
use crate::block_cache::BlockCache;

/// Sequencer Block information reconstructed from Celestia blobs.
///
Expand Down Expand Up @@ -138,8 +131,11 @@ pub(crate) struct Reader {
/// Client to fetch heights and blocks from Celestia.
celestia_client: CelestiaClient,

/// The channel used to send messages to the executor task.
executor: executor::Handle,
/// The channel to forward firm blocks to the executor.
firm_blocks: mpsc::Sender<Box<ReconstructedBlock>>,

/// The channel to read updates of the rollup state from.
rollup_state: crate::executor::StateReceiver,

/// The client to get the sequencer namespace and verify blocks.
sequencer_cometbft_client: SequencerClient,
Expand All @@ -162,7 +158,7 @@ pub(crate) struct Reader {

impl Reader {
pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> {
let (executor, sequencer_chain_id) = select!(
let sequencer_chain_id = select!(
() = self.shutdown.clone().cancelled_owned() => {
info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(||
info!("received shutdown signal while waiting for Celestia reader task to initialize")
Expand All @@ -175,16 +171,14 @@ impl Reader {
}
);

RunningReader::from_parts(self, executor, sequencer_chain_id)
RunningReader::from_parts(self, sequencer_chain_id)
.wrap_err("failed entering run loop")?
.run_until_stopped()
.await
}

#[instrument(skip_all, err)]
async fn initialize(
&mut self,
) -> eyre::Result<(executor::Handle<StateIsInit>, tendermint::chain::Id)> {
async fn initialize(&mut self) -> eyre::Result<tendermint::chain::Id> {
let validate_celestia_chain_id = async {
let actual_celestia_chain_id = get_celestia_chain_id(&self.celestia_client)
.await
Expand All @@ -196,14 +190,8 @@ impl Reader {
`{actual_celestia_chain_id}`"
);
Ok(())
};

let wait_for_init_executor = async {
self.executor
.wait_for_init()
.await
.wrap_err("handle to executor failed while waiting for it being initialized")
};
}
.in_current_span();

let get_and_validate_sequencer_chain_id = async {
let actual_sequencer_chain_id =
Expand All @@ -217,18 +205,18 @@ impl Reader {
actual: `{actual_sequencer_chain_id}`"
);
Ok(actual_sequencer_chain_id)
};
}
.in_current_span();

try_join!(
validate_celestia_chain_id,
wait_for_init_executor,
get_and_validate_sequencer_chain_id
)
.map(|((), executor_init, sequencer_chain_id)| (executor_init, sequencer_chain_id))
.map(|((), sequencer_chain_id)| sequencer_chain_id)
}
}

#[instrument(skip_all, err)]
#[instrument(skip_all, err, ret(Display))]
async fn get_celestia_chain_id(
celestia_client: &CelestiaClient,
) -> eyre::Result<celestia_tendermint::chain::Id> {
Expand Down Expand Up @@ -263,8 +251,11 @@ struct RunningReader {
// Client to fetch heights and blocks from Celestia.
celestia_client: CelestiaClient,

/// The channel used to send messages to the executor task.
executor: executor::Handle<StateIsInit>,
/// The channel to forward firm blocks to the executor.
firm_blocks: mpsc::Sender<Box<ReconstructedBlock>>,

/// The channel to read updates of the rollup state from.
rollup_state: crate::executor::StateReceiver,

/// Token to listen for Conductor being shut down.
shutdown: CancellationToken,
Expand All @@ -280,7 +271,8 @@ struct RunningReader {
/// capacity again. Used as a back pressure mechanism so that this task does not fetch more
/// blobs if there is no capacity in the executor to execute them against the rollup in
/// time.
enqueued_block: Fuse<BoxFuture<'static, Result<u64, FirmSendError>>>,
enqueued_block:
Fuse<BoxFuture<'static, Result<u64, mpsc::error::SendError<Box<ReconstructedBlock>>>>>,

/// The latest observed head height of the Celestia network. Set by values read from
/// the `latest_height` stream.
Expand Down Expand Up @@ -323,7 +315,6 @@ struct RunningReader {
impl RunningReader {
fn from_parts(
exposed_reader: Reader,
mut executor: executor::Handle<StateIsInit>,
sequencer_chain_id: tendermint::chain::Id,
) -> eyre::Result<Self> {
let Reader {
Expand All @@ -333,21 +324,23 @@ impl RunningReader {
shutdown,
sequencer_requests_per_second,
metrics,
firm_blocks,
rollup_state,
..
} = exposed_reader;
let block_cache =
BlockCache::with_next_height(executor.next_expected_firm_sequencer_height())
BlockCache::with_next_height(rollup_state.next_expected_firm_sequencer_height())
.wrap_err("failed constructing sequential block cache")?;

let latest_heights = stream_latest_heights(celestia_client.clone(), celestia_block_time);
let rollup_id = executor.rollup_id();
let rollup_id = rollup_state.rollup_id();
let rollup_namespace = astria_core::celestia::namespace_v0_from_rollup_id(rollup_id);
let sequencer_namespace =
astria_core::celestia::namespace_v0_from_sha256_of_bytes(sequencer_chain_id.as_bytes());

let celestia_next_height = executor.celestia_base_block_height();
let celestia_reference_height = executor.celestia_base_block_height();
let celestia_variance = executor.celestia_block_variance();
let celestia_next_height = rollup_state.celestia_base_block_height();
let celestia_reference_height = rollup_state.celestia_base_block_height();
let celestia_variance = rollup_state.celestia_block_variance();

Ok(Self {
block_cache,
Expand All @@ -357,7 +350,8 @@ impl RunningReader {
),
celestia_client,
enqueued_block: Fuse::terminated(),
executor,
firm_blocks,
rollup_state,
latest_heights,
shutdown,
reconstruction_tasks: JoinMap::new(),
Expand Down Expand Up @@ -498,7 +492,7 @@ impl RunningReader {
rollup_id: self.rollup_id,
rollup_namespace: self.rollup_namespace,
sequencer_namespace: self.sequencer_namespace,
executor: self.executor.clone(),
rollup_state: self.rollup_state.clone(),
metrics: self.metrics,
};
self.reconstruction_tasks.spawn(height, task.execute());
Expand All @@ -520,28 +514,20 @@ impl RunningReader {
#[instrument(skip_all)]
fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> {
let celestia_height = block.celestia_height;
match self.executor.try_send_firm_block(block) {
match self.firm_blocks.try_send(block.into()) {
Ok(()) => self.advance_reference_celestia_height(celestia_height),
Err(FirmTrySendError::Channel {
source,
}) => match source {
mpsc::error::TrySendError::Full(block) => {
trace!(
"executor channel is full; rescheduling block fetch until the channel \
opens up"
);
self.enqueued_block =
enqueue_block(self.executor.clone(), block).boxed().fuse();
}
mpsc::error::TrySendError::Closed(_) => {
bail!("exiting because executor channel is closed");
}
},
Err(FirmTrySendError::NotSet) => bail!(
"exiting because executor was configured without firm commitments; this Celestia \
reader should have never been started"
),
}
Err(mpsc::error::TrySendError::Full(block)) => {
trace!(
"executor channel is full; rescheduling block fetch until the channel opens up"
);
self.enqueued_block = enqueue_block(self.firm_blocks.clone(), block)
.boxed()
.fuse();
}
Err(mpsc::error::TrySendError::Closed(_)) => {
bail!("exiting because executor channel is closed");
}
};
Ok(())
}

Expand Down Expand Up @@ -574,7 +560,7 @@ struct FetchConvertVerifyAndReconstruct {
rollup_id: RollupId,
rollup_namespace: Namespace,
sequencer_namespace: Namespace,
executor: executor::Handle<StateIsInit>,
rollup_state: crate::executor::StateReceiver,
metrics: &'static Metrics,
}

Expand All @@ -593,7 +579,7 @@ impl FetchConvertVerifyAndReconstruct {
rollup_id,
rollup_namespace,
sequencer_namespace,
executor,
rollup_state,
metrics,
} = self;

Expand Down Expand Up @@ -633,7 +619,7 @@ impl FetchConvertVerifyAndReconstruct {
"decoded Sequencer header and rollup info from raw Celestia blobs",
);

let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, executor).await;
let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, rollup_state).await;

metrics.record_sequencer_blocks_metadata_verified_per_celestia_fetch(
verified_blobs.len_header_blobs(),
Expand Down Expand Up @@ -671,15 +657,15 @@ impl FetchConvertVerifyAndReconstruct {

#[instrument(skip_all, err)]
async fn enqueue_block(
executor: executor::Handle<StateIsInit>,
firm_blocks_tx: mpsc::Sender<Box<ReconstructedBlock>>,
block: Box<ReconstructedBlock>,
) -> Result<u64, FirmSendError> {
) -> Result<u64, mpsc::error::SendError<Box<ReconstructedBlock>>> {
let celestia_height = block.celestia_height;
executor.send_firm_block(block).await?;
firm_blocks_tx.send(block).await?;
Ok(celestia_height)
}

#[instrument(skip_all, err)]
#[instrument(skip_all, err, ret(Display))]
async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result<tendermint::chain::Id> {
use sequencer_client::Client as _;

Expand Down
8 changes: 2 additions & 6 deletions crates/astria-conductor/src/celestia/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ use super::{
block_verifier,
convert::ConvertedBlobs,
};
use crate::executor::{
self,
StateIsInit,
};

pub(super) struct VerifiedBlobs {
celestia_height: u64,
Expand Down Expand Up @@ -99,15 +95,15 @@ struct VerificationTaskKey {
pub(super) async fn verify_metadata(
blob_verifier: Arc<BlobVerifier>,
converted_blobs: ConvertedBlobs,
mut executor: executor::Handle<StateIsInit>,
rollup_state: crate::executor::StateReceiver,
) -> VerifiedBlobs {
let (celestia_height, header_blobs, rollup_blobs) = converted_blobs.into_parts();

let mut verification_tasks = JoinMap::new();
let mut verified_header_blobs = HashMap::with_capacity(header_blobs.len());

let next_expected_firm_sequencer_height =
executor.next_expected_firm_sequencer_height().value();
rollup_state.next_expected_firm_sequencer_height().value();

for (index, blob) in header_blobs.into_iter().enumerate() {
if blob.height().value() < next_expected_firm_sequencer_height {
Expand Down
Loading

0 comments on commit 77d0217

Please sign in to comment.