Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(composer): interact with executor through handle #834

Merged
merged 3 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions crates/astria-composer/src/collectors/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ use ethers::providers::{
Ws,
};
use tokio::sync::{
mpsc::{
error::SendTimeoutError,
Sender,
},
mpsc::error::SendTimeoutError,
watch,
};
use tracing::{
Expand All @@ -40,6 +37,8 @@ use tracing::{
warn,
};

use crate::executor;

type StdError = dyn std::error::Error;

/// `GethCollector` Collects transactions submitted to a Geth rollup node and passes
Expand All @@ -48,15 +47,14 @@ type StdError = dyn std::error::Error;
/// It is responsible for fetching pending transactions submitted to the rollup Geth nodes and then
/// passing them downstream for the executor to process. Thus, a composer can have multiple
/// collectors running at the same time funneling data from multiple rollup nodes.
#[derive(Debug)]
pub(crate) struct Geth {
// Chain ID to identify in the astria sequencer block which rollup a serialized sequencer
// action belongs to. Created from `chain_name`.
rollup_id: RollupId,
// Name of the chain the transactions are read from.
chain_name: String,
// The channel on which the collector sends new txs to the executor.
new_bundles: Sender<SequenceAction>,
executor_handle: executor::Handle,
// The status of this collector instance.
status: watch::Sender<Status>,
/// Rollup URL
Expand All @@ -82,16 +80,12 @@ impl Status {

impl Geth {
/// Initializes a new collector instance
pub(crate) fn new(
chain_name: String,
url: String,
new_bundles: Sender<SequenceAction>,
) -> Self {
pub(crate) fn new(chain_name: String, url: String, executor_handle: executor::Handle) -> Self {
let (status, _) = watch::channel(Status::new());
Self {
rollup_id: RollupId::from_unhashed_bytes(&chain_name),
chain_name,
new_bundles,
executor_handle,
status,
url,
}
Expand All @@ -113,7 +107,7 @@ impl Geth {

let Self {
rollup_id,
new_bundles,
executor_handle,
status,
url,
..
Expand Down Expand Up @@ -165,7 +159,7 @@ impl Geth {
fee_asset_id: default_native_asset_id(),
};

match new_bundles
match executor_handle
.send_timeout(seq_action, Duration::from_millis(500))
.await
{
Expand Down
37 changes: 11 additions & 26 deletions crates/astria-composer/src/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ use std::{
net::SocketAddr,
};

use astria_core::sequencer::v1::transaction::action::SequenceAction;
use astria_eyre::eyre::{
self,
WrapErr as _,
};
use tokio::{
sync::{
mpsc::Sender,
watch,
},
sync::watch,
task::JoinError,
};
use tokio_util::task::JoinMap;
Expand Down Expand Up @@ -44,9 +40,9 @@ pub struct Composer {
/// `ComposerStatusSender` is used to announce the current status of the Composer for other
/// modules in the crate to use.
composer_status_sender: watch::Sender<Status>,
/// `SerializedRollupTransactionsTx` is used to communicate SequenceActions to the Executor
/// `ExecutorHandle` contains a channel to communicate SequenceActions to the Executor
/// This is at the Composer level to allow its sharing to various different collectors.
serialized_rollup_transactions_tx: tokio::sync::mpsc::Sender<SequenceAction>,
executor_handle: executor::Handle,
/// `Executor` is responsible for signing and submitting sequencer transactions
/// The sequencer transactions are received from various collectors.
executor: Executor,
Expand Down Expand Up @@ -91,15 +87,11 @@ impl Composer {
pub fn from_config(cfg: &Config) -> eyre::Result<Self> {
let (composer_status_sender, _) = watch::channel(Status::default());

let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);

let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.wrap_err("executor construction from config failed")?;

Expand All @@ -120,11 +112,8 @@ impl Composer {
let geth_collectors = rollups
.iter()
.map(|(rollup_name, url)| {
let collector = Geth::new(
rollup_name.clone(),
url.clone(),
serialized_rollup_transactions_tx.clone(),
);
let collector =
Geth::new(rollup_name.clone(), url.clone(), executor_handle.clone());
(rollup_name.clone(), collector)
})
.collect::<HashMap<_, _>>();
Expand All @@ -137,7 +126,7 @@ impl Composer {
Ok(Self {
api_server,
composer_status_sender,
serialized_rollup_transactions_tx,
executor_handle,
executor,
rollups,
geth_collectors,
Expand All @@ -160,7 +149,7 @@ impl Composer {
api_server,
composer_status_sender,
executor,
serialized_rollup_transactions_tx,
executor_handle,
mut geth_collector_tasks,
mut geth_collectors,
rollups,
Expand Down Expand Up @@ -202,7 +191,7 @@ impl Composer {
reconnect_exited_collector(
&mut geth_collector_statuses,
&mut geth_collector_tasks,
serialized_rollup_transactions_tx.clone(),
executor_handle.clone(),
&rollups,
rollup,
collector_exit,
Expand Down Expand Up @@ -271,7 +260,7 @@ async fn wait_for_collectors(
pub(super) fn reconnect_exited_collector(
collector_statuses: &mut HashMap<String, watch::Receiver<collectors::geth::Status>>,
collector_tasks: &mut JoinMap<String, eyre::Result<()>>,
serialized_rollup_transactions_tx: Sender<SequenceAction>,
executor_handle: executor::Handle,
rollups: &HashMap<String, String>,
rollup: String,
exit_result: Result<eyre::Result<()>, JoinError>,
Expand All @@ -285,11 +274,7 @@ pub(super) fn reconnect_exited_collector(
return;
};

let collector = Geth::new(
rollup.clone(),
url.clone(),
serialized_rollup_transactions_tx,
);
let collector = Geth::new(rollup.clone(), url.clone(), executor_handle);
collector_statuses.insert(rollup.clone(), collector.subscribe());
collector_tasks.spawn(rollup, collector.run_until_stopped());
}
Expand Down
55 changes: 42 additions & 13 deletions crates/astria-composer/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tokio::{
select,
sync::{
mpsc,
mpsc::error::SendTimeoutError,
watch,
},
time::{
Expand Down Expand Up @@ -89,7 +90,7 @@ pub(super) struct Executor {
// The status of this executor
status: watch::Sender<Status>,
// Channel for receiving `SequenceAction`s to be bundled.
serialized_rollup_transactions_rx: mpsc::Receiver<SequenceAction>,
serialized_rollup_transactions: mpsc::Receiver<SequenceAction>,
// The client for submitting wrapped and signed pending eth transactions to the astria
// sequencer.
sequencer_client: sequencer_client::HttpClient,
Expand All @@ -103,6 +104,29 @@ pub(super) struct Executor {
max_bytes_per_bundle: usize,
}

#[derive(Clone)]
pub(super) struct Handle {
serialized_rollup_transactions_tx: mpsc::Sender<SequenceAction>,
}

impl Handle {
fn new(serialized_rollup_transactions_tx: mpsc::Sender<SequenceAction>) -> Self {
Self {
serialized_rollup_transactions_tx,
}
}

pub(super) async fn send_timeout(
&self,
sequence_action: SequenceAction,
timeout: Duration,
) -> Result<(), SendTimeoutError<SequenceAction>> {
self.serialized_rollup_transactions_tx
.send_timeout(sequence_action, timeout)
.await
}
}

impl Drop for Executor {
fn drop(&mut self) {
self.sequencer_key.zeroize();
Expand Down Expand Up @@ -132,8 +156,7 @@ impl Executor {
private_key: &SecretString,
block_time: u64,
max_bytes_per_bundle: usize,
serialized_rollup_transactions_rx: mpsc::Receiver<SequenceAction>,
) -> eyre::Result<Self> {
) -> eyre::Result<(Self, Handle)> {
let sequencer_client = sequencer_client::HttpClient::new(sequencer_url)
.wrap_err("failed constructing sequencer client")?;
let (status, _) = watch::channel(Status::new());
Expand All @@ -146,15 +169,21 @@ impl Executor {

let sequencer_address = Address::from_verification_key(sequencer_key.verification_key());

Ok(Self {
status,
serialized_rollup_transactions_rx,
sequencer_client,
sequencer_key,
address: sequencer_address,
block_time: Duration::from_millis(block_time),
max_bytes_per_bundle,
})
let (serialized_rollup_transaction_tx, serialized_rollup_transaction_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);

Ok((
Self {
status,
serialized_rollup_transactions: serialized_rollup_transaction_rx,
sequencer_client,
sequencer_key,
address: sequencer_address,
block_time: Duration::from_millis(block_time),
max_bytes_per_bundle,
},
Handle::new(serialized_rollup_transaction_tx),
))
}

/// Return a reader to the status reporting channel
Expand Down Expand Up @@ -221,7 +250,7 @@ impl Executor {
}

// receive new seq_action and bundle it
Some(seq_action) = self.serialized_rollup_transactions_rx.recv() => {
Some(seq_action) = self.serialized_rollup_transactions.recv() => {
let rollup_id = seq_action.rollup_id;
if let Err(e) = bundle_factory.try_push(seq_action) {
warn!(
Expand Down
35 changes: 13 additions & 22 deletions crates/astria-composer/src/executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,11 @@ async fn wait_for_startup(
async fn full_bundle() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg) = setup().await;
let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);
let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.unwrap();

Expand Down Expand Up @@ -229,12 +226,12 @@ async fn full_bundle() {
};

// push both sequence actions to the executor in order to force the full bundle to be sent
serialized_rollup_transactions_tx
.send(seq0.clone())
executor_handle
.send_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
serialized_rollup_transactions_tx
.send(seq1.clone())
executor_handle
.send_timeout(seq1.clone(), Duration::from_millis(1000))
.await
.unwrap();

Expand Down Expand Up @@ -283,14 +280,11 @@ async fn full_bundle() {
async fn bundle_triggered_by_block_timer() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg) = setup().await;
let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);
let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.unwrap();
let status = executor.subscribe();
Expand All @@ -313,8 +307,8 @@ async fn bundle_triggered_by_block_timer() {
// make sure at least one block has passed so that the executor will submit the bundle
// despite it not being full
time::pause();
serialized_rollup_transactions_tx
.send(seq0.clone())
executor_handle
.send_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
time::advance(Duration::from_millis(cfg.block_time_ms)).await;
Expand Down Expand Up @@ -364,14 +358,11 @@ async fn bundle_triggered_by_block_timer() {
async fn two_seq_actions_single_bundle() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg) = setup().await;
let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);
let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.unwrap();

Expand Down Expand Up @@ -401,12 +392,12 @@ async fn two_seq_actions_single_bundle() {
// make sure at least one block has passed so that the executor will submit the bundle
// despite it not being full
time::pause();
serialized_rollup_transactions_tx
.send(seq0.clone())
executor_handle
.send_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
serialized_rollup_transactions_tx
.send(seq1.clone())
executor_handle
.send_timeout(seq1.clone(), Duration::from_millis(1000))
.await
.unwrap();
time::advance(Duration::from_millis(cfg.block_time_ms)).await;
Expand Down
2 changes: 0 additions & 2 deletions crates/astria-composer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ mod composer;
pub mod config;
mod executor;
mod rollup;
#[cfg(test)]
mod tests;

pub use build_info::BUILD_INFO;
pub use composer::Composer;
Expand Down
Loading
Loading