Skip to content

Commit

Permalink
refactor(composer): interact with executor through handle (#834)
Browse files Browse the repository at this point in the history
## Summary
This is a minor refactor PR which moves the creation of the channel
through which executor receives SequenceActions to submit to the Shared
Sequence N/w to the executor.

## Background
Previously, the channels were created by the Composer which creates the
Executor. It is better practice to allow the Executor to create the
channel which it owns and just return the sending end of it back to the
Composer.

## Changes
- Create a struct `Handle` in Executor which is referenced as
`executor::Handle`.
- Handle creation of the `executor::Handle` in Executor and allow
Executor to pass it back.

## Testing
Since this is not a functional PR change, making sure that the code
compiles and the tests run is enough to ensure that this works.

## Note
The tests were initially using just `send` on the channel but the
executor uses `send_with_timeout`. We could add just a `send` method to
the `executor::Handle`. But to avoid clippy errors w.r.t unused method
`send` and avoiding a `#[cfg(test)]` in the method, I updated the tests
to use `send_with_timeout` instead of `send`.

closes <!-- list any issues closed here -->
  • Loading branch information
bharath-123 authored Mar 20, 2024
1 parent f3d2c18 commit 87c49dc
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 163 deletions.
22 changes: 8 additions & 14 deletions crates/astria-composer/src/collectors/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ use ethers::providers::{
Ws,
};
use tokio::sync::{
mpsc::{
error::SendTimeoutError,
Sender,
},
mpsc::error::SendTimeoutError,
watch,
};
use tracing::{
Expand All @@ -40,6 +37,8 @@ use tracing::{
warn,
};

use crate::executor;

type StdError = dyn std::error::Error;

/// `GethCollector` Collects transactions submitted to a Geth rollup node and passes
Expand All @@ -48,15 +47,14 @@ type StdError = dyn std::error::Error;
/// It is responsible for fetching pending transactions submitted to the rollup Geth nodes and then
/// passing them downstream for the executor to process. Thus, a composer can have multiple
/// collectors running at the same time funneling data from multiple rollup nodes.
#[derive(Debug)]
pub(crate) struct Geth {
// Chain ID to identify in the astria sequencer block which rollup a serialized sequencer
// action belongs to. Created from `chain_name`.
rollup_id: RollupId,
// Name of the chain the transactions are read from.
chain_name: String,
// The channel on which the collector sends new txs to the executor.
new_bundles: Sender<SequenceAction>,
executor_handle: executor::Handle,
// The status of this collector instance.
status: watch::Sender<Status>,
/// Rollup URL
Expand All @@ -82,16 +80,12 @@ impl Status {

impl Geth {
/// Initializes a new collector instance
pub(crate) fn new(
chain_name: String,
url: String,
new_bundles: Sender<SequenceAction>,
) -> Self {
pub(crate) fn new(chain_name: String, url: String, executor_handle: executor::Handle) -> Self {
let (status, _) = watch::channel(Status::new());
Self {
rollup_id: RollupId::from_unhashed_bytes(&chain_name),
chain_name,
new_bundles,
executor_handle,
status,
url,
}
Expand All @@ -113,7 +107,7 @@ impl Geth {

let Self {
rollup_id,
new_bundles,
executor_handle,
status,
url,
..
Expand Down Expand Up @@ -165,7 +159,7 @@ impl Geth {
fee_asset_id: default_native_asset_id(),
};

match new_bundles
match executor_handle
.send_timeout(seq_action, Duration::from_millis(500))
.await
{
Expand Down
37 changes: 11 additions & 26 deletions crates/astria-composer/src/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ use std::{
net::SocketAddr,
};

use astria_core::sequencer::v1::transaction::action::SequenceAction;
use astria_eyre::eyre::{
self,
WrapErr as _,
};
use tokio::{
sync::{
mpsc::Sender,
watch,
},
sync::watch,
task::JoinError,
};
use tokio_util::task::JoinMap;
Expand Down Expand Up @@ -44,9 +40,9 @@ pub struct Composer {
/// `ComposerStatusSender` is used to announce the current status of the Composer for other
/// modules in the crate to use.
composer_status_sender: watch::Sender<Status>,
/// `SerializedRollupTransactionsTx` is used to communicate SequenceActions to the Executor
/// `ExecutorHandle` contains a channel to communicate SequenceActions to the Executor
/// This is at the Composer level to allow its sharing to various different collectors.
serialized_rollup_transactions_tx: tokio::sync::mpsc::Sender<SequenceAction>,
executor_handle: executor::Handle,
/// `Executor` is responsible for signing and submitting sequencer transactions
/// The sequencer transactions are received from various collectors.
executor: Executor,
Expand Down Expand Up @@ -91,15 +87,11 @@ impl Composer {
pub fn from_config(cfg: &Config) -> eyre::Result<Self> {
let (composer_status_sender, _) = watch::channel(Status::default());

let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);

let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.wrap_err("executor construction from config failed")?;

Expand All @@ -120,11 +112,8 @@ impl Composer {
let geth_collectors = rollups
.iter()
.map(|(rollup_name, url)| {
let collector = Geth::new(
rollup_name.clone(),
url.clone(),
serialized_rollup_transactions_tx.clone(),
);
let collector =
Geth::new(rollup_name.clone(), url.clone(), executor_handle.clone());
(rollup_name.clone(), collector)
})
.collect::<HashMap<_, _>>();
Expand All @@ -137,7 +126,7 @@ impl Composer {
Ok(Self {
api_server,
composer_status_sender,
serialized_rollup_transactions_tx,
executor_handle,
executor,
rollups,
geth_collectors,
Expand All @@ -160,7 +149,7 @@ impl Composer {
api_server,
composer_status_sender,
executor,
serialized_rollup_transactions_tx,
executor_handle,
mut geth_collector_tasks,
mut geth_collectors,
rollups,
Expand Down Expand Up @@ -202,7 +191,7 @@ impl Composer {
reconnect_exited_collector(
&mut geth_collector_statuses,
&mut geth_collector_tasks,
serialized_rollup_transactions_tx.clone(),
executor_handle.clone(),
&rollups,
rollup,
collector_exit,
Expand Down Expand Up @@ -271,7 +260,7 @@ async fn wait_for_collectors(
pub(super) fn reconnect_exited_collector(
collector_statuses: &mut HashMap<String, watch::Receiver<collectors::geth::Status>>,
collector_tasks: &mut JoinMap<String, eyre::Result<()>>,
serialized_rollup_transactions_tx: Sender<SequenceAction>,
executor_handle: executor::Handle,
rollups: &HashMap<String, String>,
rollup: String,
exit_result: Result<eyre::Result<()>, JoinError>,
Expand All @@ -285,11 +274,7 @@ pub(super) fn reconnect_exited_collector(
return;
};

let collector = Geth::new(
rollup.clone(),
url.clone(),
serialized_rollup_transactions_tx,
);
let collector = Geth::new(rollup.clone(), url.clone(), executor_handle);
collector_statuses.insert(rollup.clone(), collector.subscribe());
collector_tasks.spawn(rollup, collector.run_until_stopped());
}
Expand Down
55 changes: 42 additions & 13 deletions crates/astria-composer/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tokio::{
select,
sync::{
mpsc,
mpsc::error::SendTimeoutError,
watch,
},
time::{
Expand Down Expand Up @@ -89,7 +90,7 @@ pub(super) struct Executor {
// The status of this executor
status: watch::Sender<Status>,
// Channel for receiving `SequenceAction`s to be bundled.
serialized_rollup_transactions_rx: mpsc::Receiver<SequenceAction>,
serialized_rollup_transactions: mpsc::Receiver<SequenceAction>,
// The client for submitting wrapped and signed pending eth transactions to the astria
// sequencer.
sequencer_client: sequencer_client::HttpClient,
Expand All @@ -103,6 +104,29 @@ pub(super) struct Executor {
max_bytes_per_bundle: usize,
}

#[derive(Clone)]
pub(super) struct Handle {
serialized_rollup_transactions_tx: mpsc::Sender<SequenceAction>,
}

impl Handle {
fn new(serialized_rollup_transactions_tx: mpsc::Sender<SequenceAction>) -> Self {
Self {
serialized_rollup_transactions_tx,
}
}

pub(super) async fn send_timeout(
&self,
sequence_action: SequenceAction,
timeout: Duration,
) -> Result<(), SendTimeoutError<SequenceAction>> {
self.serialized_rollup_transactions_tx
.send_timeout(sequence_action, timeout)
.await
}
}

impl Drop for Executor {
fn drop(&mut self) {
self.sequencer_key.zeroize();
Expand Down Expand Up @@ -132,8 +156,7 @@ impl Executor {
private_key: &SecretString,
block_time: u64,
max_bytes_per_bundle: usize,
serialized_rollup_transactions_rx: mpsc::Receiver<SequenceAction>,
) -> eyre::Result<Self> {
) -> eyre::Result<(Self, Handle)> {
let sequencer_client = sequencer_client::HttpClient::new(sequencer_url)
.wrap_err("failed constructing sequencer client")?;
let (status, _) = watch::channel(Status::new());
Expand All @@ -146,15 +169,21 @@ impl Executor {

let sequencer_address = Address::from_verification_key(sequencer_key.verification_key());

Ok(Self {
status,
serialized_rollup_transactions_rx,
sequencer_client,
sequencer_key,
address: sequencer_address,
block_time: Duration::from_millis(block_time),
max_bytes_per_bundle,
})
let (serialized_rollup_transaction_tx, serialized_rollup_transaction_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);

Ok((
Self {
status,
serialized_rollup_transactions: serialized_rollup_transaction_rx,
sequencer_client,
sequencer_key,
address: sequencer_address,
block_time: Duration::from_millis(block_time),
max_bytes_per_bundle,
},
Handle::new(serialized_rollup_transaction_tx),
))
}

/// Return a reader to the status reporting channel
Expand Down Expand Up @@ -221,7 +250,7 @@ impl Executor {
}

// receive new seq_action and bundle it
Some(seq_action) = self.serialized_rollup_transactions_rx.recv() => {
Some(seq_action) = self.serialized_rollup_transactions.recv() => {
let rollup_id = seq_action.rollup_id;
if let Err(e) = bundle_factory.try_push(seq_action) {
warn!(
Expand Down
35 changes: 13 additions & 22 deletions crates/astria-composer/src/executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,11 @@ async fn wait_for_startup(
async fn full_bundle() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg) = setup().await;
let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);
let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.unwrap();

Expand Down Expand Up @@ -229,12 +226,12 @@ async fn full_bundle() {
};

// push both sequence actions to the executor in order to force the full bundle to be sent
serialized_rollup_transactions_tx
.send(seq0.clone())
executor_handle
.send_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
serialized_rollup_transactions_tx
.send(seq1.clone())
executor_handle
.send_timeout(seq1.clone(), Duration::from_millis(1000))
.await
.unwrap();

Expand Down Expand Up @@ -283,14 +280,11 @@ async fn full_bundle() {
async fn bundle_triggered_by_block_timer() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg) = setup().await;
let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);
let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.unwrap();
let status = executor.subscribe();
Expand All @@ -313,8 +307,8 @@ async fn bundle_triggered_by_block_timer() {
// make sure at least one block has passed so that the executor will submit the bundle
// despite it not being full
time::pause();
serialized_rollup_transactions_tx
.send(seq0.clone())
executor_handle
.send_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
time::advance(Duration::from_millis(cfg.block_time_ms)).await;
Expand Down Expand Up @@ -364,14 +358,11 @@ async fn bundle_triggered_by_block_timer() {
async fn two_seq_actions_single_bundle() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg) = setup().await;
let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);
let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.unwrap();

Expand Down Expand Up @@ -401,12 +392,12 @@ async fn two_seq_actions_single_bundle() {
// make sure at least one block has passed so that the executor will submit the bundle
// despite it not being full
time::pause();
serialized_rollup_transactions_tx
.send(seq0.clone())
executor_handle
.send_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
serialized_rollup_transactions_tx
.send(seq1.clone())
executor_handle
.send_timeout(seq1.clone(), Duration::from_millis(1000))
.await
.unwrap();
time::advance(Duration::from_millis(cfg.block_time_ms)).await;
Expand Down
2 changes: 0 additions & 2 deletions crates/astria-composer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ mod composer;
pub mod config;
mod executor;
mod rollup;
#[cfg(test)]
mod tests;

pub use build_info::BUILD_INFO;
pub use composer::Composer;
Expand Down
Loading

0 comments on commit 87c49dc

Please sign in to comment.