Skip to content

Commit

Permalink
move executor handle creation to executor
Browse files Browse the repository at this point in the history
  • Loading branch information
bharath-123 committed Mar 19, 2024
1 parent 3516a7e commit ec91a39
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 78 deletions.
24 changes: 9 additions & 15 deletions crates/astria-composer/src/collectors/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ use ethers::providers::{
Ws,
};
use tokio::sync::{
mpsc::{
error::SendTimeoutError,
Sender,
},
mpsc::error::SendTimeoutError,
watch,
};
use tracing::{
Expand All @@ -40,6 +37,8 @@ use tracing::{
warn,
};

use crate::executor;

type StdError = dyn std::error::Error;

/// `GethCollector` Collects transactions submitted to a Geth rollup node and passes
Expand All @@ -48,15 +47,14 @@ type StdError = dyn std::error::Error;
/// It is responsible for fetching pending transactions submitted to the rollup Geth nodes and then
/// passing them downstream for the executor to process. Thus, a composer can have multiple
/// collectors running at the same time funneling data from multiple rollup nodes.
#[derive(Debug)]
pub(crate) struct Geth {
// Chain ID to identify in the astria sequencer block which rollup a serialized sequencer
// action belongs to. Created from `chain_name`.
rollup_id: RollupId,
// Name of the chain the transactions are read from.
chain_name: String,
// The channel on which the collector sends new txs to the executor.
new_bundles: Sender<SequenceAction>,
executor_handle: executor::Handle,
// The status of this collector instance.
status: watch::Sender<Status>,
/// Rollup URL
Expand All @@ -82,16 +80,12 @@ impl Status {

impl Geth {
/// Initializes a new collector instance
pub(crate) fn new(
chain_name: String,
url: String,
new_bundles: Sender<SequenceAction>,
) -> Self {
pub(crate) fn new(chain_name: String, url: String, executor_handle: executor::Handle) -> Self {
let (status, _) = watch::channel(Status::new());
Self {
rollup_id: RollupId::from_unhashed_bytes(&chain_name),
chain_name,
new_bundles,
executor_handle,
status,
url,
}
Expand All @@ -113,7 +107,7 @@ impl Geth {

let Self {
rollup_id,
new_bundles,
executor_handle,
status,
url,
..
Expand Down Expand Up @@ -165,8 +159,8 @@ impl Geth {
fee_asset_id: default_native_asset_id(),
};

match new_bundles
.send_timeout(seq_action, Duration::from_millis(500))
match executor_handle
.send_with_timeout(seq_action, Duration::from_millis(500))
.await
{
Ok(()) => {}
Expand Down
37 changes: 11 additions & 26 deletions crates/astria-composer/src/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ use std::{
net::SocketAddr,
};

use astria_core::sequencer::v1::transaction::action::SequenceAction;
use astria_eyre::eyre::{
self,
WrapErr as _,
};
use tokio::{
sync::{
mpsc::Sender,
watch,
},
sync::watch,
task::JoinError,
};
use tokio_util::task::JoinMap;
Expand Down Expand Up @@ -44,9 +40,9 @@ pub struct Composer {
/// `ComposerStatusSender` is used to announce the current status of the Composer for other
/// modules in the crate to use.
composer_status_sender: watch::Sender<Status>,
/// `SerializedRollupTransactionsTx` is used to communicate SequenceActions to the Executor
/// `ExecutorHandle` contains a channel to communicate SequenceActions to the Executor
/// This is at the Composer level to allow its sharing to various different collectors.
serialized_rollup_transactions_tx: tokio::sync::mpsc::Sender<SequenceAction>,
executor_handle: executor::Handle,
/// `Executor` is responsible for signing and submitting sequencer transactions
/// The sequencer transactions are received from various collectors.
executor: Executor,
Expand Down Expand Up @@ -91,15 +87,11 @@ impl Composer {
pub fn from_config(cfg: &Config) -> eyre::Result<Self> {
let (composer_status_sender, _) = watch::channel(Status::default());

let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);

let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.wrap_err("executor construction from config failed")?;

Expand All @@ -120,11 +112,8 @@ impl Composer {
let geth_collectors = rollups
.iter()
.map(|(rollup_name, url)| {
let collector = Geth::new(
rollup_name.clone(),
url.clone(),
serialized_rollup_transactions_tx.clone(),
);
let collector =
Geth::new(rollup_name.clone(), url.clone(), executor_handle.clone());
(rollup_name.clone(), collector)
})
.collect::<HashMap<_, _>>();
Expand All @@ -137,7 +126,7 @@ impl Composer {
Ok(Self {
api_server,
composer_status_sender,
serialized_rollup_transactions_tx,
executor_handle,
executor,
rollups,
geth_collectors,
Expand All @@ -160,7 +149,7 @@ impl Composer {
api_server,
composer_status_sender,
executor,
serialized_rollup_transactions_tx,
executor_handle,
mut geth_collector_tasks,
mut geth_collectors,
rollups,
Expand Down Expand Up @@ -202,7 +191,7 @@ impl Composer {
reconnect_exited_collector(
&mut geth_collector_statuses,
&mut geth_collector_tasks,
serialized_rollup_transactions_tx.clone(),
executor_handle.clone(),
&rollups,
rollup,
collector_exit,
Expand Down Expand Up @@ -271,7 +260,7 @@ async fn wait_for_collectors(
pub(super) fn reconnect_exited_collector(
collector_statuses: &mut HashMap<String, watch::Receiver<collectors::geth::Status>>,
collector_tasks: &mut JoinMap<String, eyre::Result<()>>,
serialized_rollup_transactions_tx: Sender<SequenceAction>,
executor_handle: executor::Handle,
rollups: &HashMap<String, String>,
rollup: String,
exit_result: Result<eyre::Result<()>, JoinError>,
Expand All @@ -285,11 +274,7 @@ pub(super) fn reconnect_exited_collector(
return;
};

let collector = Geth::new(
rollup.clone(),
url.clone(),
serialized_rollup_transactions_tx,
);
let collector = Geth::new(rollup.clone(), url.clone(), executor_handle);
collector_statuses.insert(rollup.clone(), collector.subscribe());
collector_tasks.spawn(rollup, collector.run_until_stopped());
}
Expand Down
55 changes: 42 additions & 13 deletions crates/astria-composer/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tokio::{
select,
sync::{
mpsc,
mpsc::error::SendTimeoutError,
watch,
},
time::{
Expand Down Expand Up @@ -89,7 +90,7 @@ pub(super) struct Executor {
// The status of this executor
status: watch::Sender<Status>,
// Channel for receiving `SequenceAction`s to be bundled.
serialized_rollup_transactions_rx: mpsc::Receiver<SequenceAction>,
sequence_action_rx: mpsc::Receiver<SequenceAction>,
// The client for submitting wrapped and signed pending eth transactions to the astria
// sequencer.
sequencer_client: sequencer_client::HttpClient,
Expand All @@ -103,6 +104,29 @@ pub(super) struct Executor {
max_bytes_per_bundle: usize,
}

#[derive(Clone)]
pub(super) struct Handle {
serialized_rollup_transactions_tx: mpsc::Sender<SequenceAction>,
}

impl Handle {
pub(super) fn new(sequence_action_tx: mpsc::Sender<SequenceAction>) -> Self {
Self {
serialized_rollup_transactions_tx: sequence_action_tx,
}
}

pub(super) async fn send_with_timeout(
&self,
sequence_action: SequenceAction,
timeout: Duration,
) -> Result<(), SendTimeoutError<SequenceAction>> {
self.serialized_rollup_transactions_tx
.send_timeout(sequence_action, timeout)
.await
}
}

impl Drop for Executor {
fn drop(&mut self) {
self.sequencer_key.zeroize();
Expand Down Expand Up @@ -132,8 +156,7 @@ impl Executor {
private_key: &SecretString,
block_time: u64,
max_bytes_per_bundle: usize,
serialized_rollup_transactions_rx: mpsc::Receiver<SequenceAction>,
) -> eyre::Result<Self> {
) -> eyre::Result<(Self, Handle)> {
let sequencer_client = sequencer_client::HttpClient::new(sequencer_url)
.wrap_err("failed constructing sequencer client")?;
let (status, _) = watch::channel(Status::new());
Expand All @@ -146,15 +169,21 @@ impl Executor {

let sequencer_address = Address::from_verification_key(sequencer_key.verification_key());

Ok(Self {
status,
serialized_rollup_transactions_rx,
sequencer_client,
sequencer_key,
address: sequencer_address,
block_time: Duration::from_millis(block_time),
max_bytes_per_bundle,
})
let (sequence_action_tx, sequence_action_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);

Ok((
Self {
status,
sequence_action_rx,
sequencer_client,
sequencer_key,
address: sequencer_address,
block_time: Duration::from_millis(block_time),
max_bytes_per_bundle,
},
Handle::new(sequence_action_tx),
))
}

/// Return a reader to the status reporting channel
Expand Down Expand Up @@ -221,7 +250,7 @@ impl Executor {
}

// receive new seq_action and bundle it
Some(seq_action) = self.serialized_rollup_transactions_rx.recv() => {
Some(seq_action) = self.sequence_action_rx.recv() => {
let rollup_id = seq_action.rollup_id;
if let Err(e) = bundle_factory.try_push(seq_action) {
warn!(
Expand Down
35 changes: 13 additions & 22 deletions crates/astria-composer/src/executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,11 @@ async fn wait_for_startup(
async fn full_bundle() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg) = setup().await;
let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);
let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.unwrap();

Expand Down Expand Up @@ -229,12 +226,12 @@ async fn full_bundle() {
};

// push both sequence actions to the executor in order to force the full bundle to be sent
serialized_rollup_transactions_tx
.send(seq0.clone())
executor_handle
.send_with_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
serialized_rollup_transactions_tx
.send(seq1.clone())
executor_handle
.send_with_timeout(seq1.clone(), Duration::from_millis(1000))
.await
.unwrap();

Expand Down Expand Up @@ -283,14 +280,11 @@ async fn full_bundle() {
async fn bundle_triggered_by_block_timer() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg) = setup().await;
let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);
let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.unwrap();
let status = executor.subscribe();
Expand All @@ -313,8 +307,8 @@ async fn bundle_triggered_by_block_timer() {
// make sure at least one block has passed so that the executor will submit the bundle
// despite it not being full
time::pause();
serialized_rollup_transactions_tx
.send(seq0.clone())
executor_handle
.send_with_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
time::advance(Duration::from_millis(cfg.block_time_ms)).await;
Expand Down Expand Up @@ -364,14 +358,11 @@ async fn bundle_triggered_by_block_timer() {
async fn two_seq_actions_single_bundle() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg) = setup().await;
let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);
let executor = Executor::new(
let (executor, executor_handle) = Executor::new(
&cfg.sequencer_url,
&cfg.private_key,
cfg.block_time_ms,
cfg.max_bytes_per_bundle,
serialized_rollup_transactions_rx,
)
.unwrap();

Expand Down Expand Up @@ -401,12 +392,12 @@ async fn two_seq_actions_single_bundle() {
// make sure at least one block has passed so that the executor will submit the bundle
// despite it not being full
time::pause();
serialized_rollup_transactions_tx
.send(seq0.clone())
executor_handle
.send_with_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
serialized_rollup_transactions_tx
.send(seq1.clone())
executor_handle
.send_with_timeout(seq1.clone(), Duration::from_millis(1000))
.await
.unwrap();
time::advance(Duration::from_millis(cfg.block_time_ms)).await;
Expand Down
Loading

0 comments on commit ec91a39

Please sign in to comment.