diff --git a/crates/astria-composer/src/collectors/geth.rs b/crates/astria-composer/src/collectors/geth.rs index 89f6b4cb0d..1bb5b0a957 100644 --- a/crates/astria-composer/src/collectors/geth.rs +++ b/crates/astria-composer/src/collectors/geth.rs @@ -28,10 +28,7 @@ use ethers::providers::{ Ws, }; use tokio::sync::{ - mpsc::{ - error::SendTimeoutError, - Sender, - }, + mpsc::error::SendTimeoutError, watch, }; use tracing::{ @@ -40,6 +37,8 @@ use tracing::{ warn, }; +use crate::executor; + type StdError = dyn std::error::Error; /// `GethCollector` Collects transactions submitted to a Geth rollup node and passes @@ -48,7 +47,6 @@ type StdError = dyn std::error::Error; /// It is responsible for fetching pending transactions submitted to the rollup Geth nodes and then /// passing them downstream for the executor to process. Thus, a composer can have multiple /// collectors running at the same time funneling data from multiple rollup nodes. -#[derive(Debug)] pub(crate) struct Geth { // Chain ID to identify in the astria sequencer block which rollup a serialized sequencer // action belongs to. Created from `chain_name`. @@ -56,7 +54,7 @@ pub(crate) struct Geth { // Name of the chain the transactions are read from. chain_name: String, // The channel on which the collector sends new txs to the executor. - new_bundles: Sender, + executor_handle: executor::Handle, // The status of this collector instance. status: watch::Sender, /// Rollup URL @@ -82,16 +80,12 @@ impl Status { impl Geth { /// Initializes a new collector instance - pub(crate) fn new( - chain_name: String, - url: String, - new_bundles: Sender, - ) -> Self { + pub(crate) fn new(chain_name: String, url: String, executor_handle: executor::Handle) -> Self { let (status, _) = watch::channel(Status::new()); Self { rollup_id: RollupId::from_unhashed_bytes(&chain_name), chain_name, - new_bundles, + executor_handle, status, url, } @@ -113,7 +107,7 @@ impl Geth { let Self { rollup_id, - new_bundles, + executor_handle, status, url, .. @@ -165,7 +159,7 @@ impl Geth { fee_asset_id: default_native_asset_id(), }; - match new_bundles + match executor_handle .send_timeout(seq_action, Duration::from_millis(500)) .await { diff --git a/crates/astria-composer/src/composer.rs b/crates/astria-composer/src/composer.rs index 43db5573d3..33a299256f 100644 --- a/crates/astria-composer/src/composer.rs +++ b/crates/astria-composer/src/composer.rs @@ -3,16 +3,12 @@ use std::{ net::SocketAddr, }; -use astria_core::sequencer::v1::transaction::action::SequenceAction; use astria_eyre::eyre::{ self, WrapErr as _, }; use tokio::{ - sync::{ - mpsc::Sender, - watch, - }, + sync::watch, task::JoinError, }; use tokio_util::task::JoinMap; @@ -44,9 +40,9 @@ pub struct Composer { /// `ComposerStatusSender` is used to announce the current status of the Composer for other /// modules in the crate to use. composer_status_sender: watch::Sender, - /// `SerializedRollupTransactionsTx` is used to communicate SequenceActions to the Executor + /// `ExecutorHandle` contains a channel to communicate SequenceActions to the Executor /// This is at the Composer level to allow its sharing to various different collectors. - serialized_rollup_transactions_tx: tokio::sync::mpsc::Sender, + executor_handle: executor::Handle, /// `Executor` is responsible for signing and submitting sequencer transactions /// The sequencer transactions are received from various collectors. executor: Executor, @@ -91,15 +87,11 @@ impl Composer { pub fn from_config(cfg: &Config) -> eyre::Result { let (composer_status_sender, _) = watch::channel(Status::default()); - let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) = - tokio::sync::mpsc::channel::(256); - - let executor = Executor::new( + let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, - serialized_rollup_transactions_rx, ) .wrap_err("executor construction from config failed")?; @@ -120,11 +112,8 @@ impl Composer { let geth_collectors = rollups .iter() .map(|(rollup_name, url)| { - let collector = Geth::new( - rollup_name.clone(), - url.clone(), - serialized_rollup_transactions_tx.clone(), - ); + let collector = + Geth::new(rollup_name.clone(), url.clone(), executor_handle.clone()); (rollup_name.clone(), collector) }) .collect::>(); @@ -137,7 +126,7 @@ impl Composer { Ok(Self { api_server, composer_status_sender, - serialized_rollup_transactions_tx, + executor_handle, executor, rollups, geth_collectors, @@ -160,7 +149,7 @@ impl Composer { api_server, composer_status_sender, executor, - serialized_rollup_transactions_tx, + executor_handle, mut geth_collector_tasks, mut geth_collectors, rollups, @@ -202,7 +191,7 @@ impl Composer { reconnect_exited_collector( &mut geth_collector_statuses, &mut geth_collector_tasks, - serialized_rollup_transactions_tx.clone(), + executor_handle.clone(), &rollups, rollup, collector_exit, @@ -271,7 +260,7 @@ async fn wait_for_collectors( pub(super) fn reconnect_exited_collector( collector_statuses: &mut HashMap>, collector_tasks: &mut JoinMap>, - serialized_rollup_transactions_tx: Sender, + executor_handle: executor::Handle, rollups: &HashMap, rollup: String, exit_result: Result, JoinError>, @@ -285,11 +274,7 @@ pub(super) fn reconnect_exited_collector( return; }; - let collector = Geth::new( - rollup.clone(), - url.clone(), - serialized_rollup_transactions_tx, - ); + let collector = Geth::new(rollup.clone(), url.clone(), executor_handle); collector_statuses.insert(rollup.clone(), collector.subscribe()); collector_tasks.spawn(rollup, collector.run_until_stopped()); } diff --git a/crates/astria-composer/src/executor/mod.rs b/crates/astria-composer/src/executor/mod.rs index 2ea4b81053..bb9fa6d764 100644 --- a/crates/astria-composer/src/executor/mod.rs +++ b/crates/astria-composer/src/executor/mod.rs @@ -51,6 +51,7 @@ use tokio::{ select, sync::{ mpsc, + mpsc::error::SendTimeoutError, watch, }, time::{ @@ -89,7 +90,7 @@ pub(super) struct Executor { // The status of this executor status: watch::Sender, // Channel for receiving `SequenceAction`s to be bundled. - serialized_rollup_transactions_rx: mpsc::Receiver, + serialized_rollup_transactions: mpsc::Receiver, // The client for submitting wrapped and signed pending eth transactions to the astria // sequencer. sequencer_client: sequencer_client::HttpClient, @@ -103,6 +104,29 @@ pub(super) struct Executor { max_bytes_per_bundle: usize, } +#[derive(Clone)] +pub(super) struct Handle { + serialized_rollup_transactions_tx: mpsc::Sender, +} + +impl Handle { + fn new(serialized_rollup_transactions_tx: mpsc::Sender) -> Self { + Self { + serialized_rollup_transactions_tx, + } + } + + pub(super) async fn send_timeout( + &self, + sequence_action: SequenceAction, + timeout: Duration, + ) -> Result<(), SendTimeoutError> { + self.serialized_rollup_transactions_tx + .send_timeout(sequence_action, timeout) + .await + } +} + impl Drop for Executor { fn drop(&mut self) { self.sequencer_key.zeroize(); @@ -132,8 +156,7 @@ impl Executor { private_key: &SecretString, block_time: u64, max_bytes_per_bundle: usize, - serialized_rollup_transactions_rx: mpsc::Receiver, - ) -> eyre::Result { + ) -> eyre::Result<(Self, Handle)> { let sequencer_client = sequencer_client::HttpClient::new(sequencer_url) .wrap_err("failed constructing sequencer client")?; let (status, _) = watch::channel(Status::new()); @@ -146,15 +169,21 @@ impl Executor { let sequencer_address = Address::from_verification_key(sequencer_key.verification_key()); - Ok(Self { - status, - serialized_rollup_transactions_rx, - sequencer_client, - sequencer_key, - address: sequencer_address, - block_time: Duration::from_millis(block_time), - max_bytes_per_bundle, - }) + let (serialized_rollup_transaction_tx, serialized_rollup_transaction_rx) = + tokio::sync::mpsc::channel::(256); + + Ok(( + Self { + status, + serialized_rollup_transactions: serialized_rollup_transaction_rx, + sequencer_client, + sequencer_key, + address: sequencer_address, + block_time: Duration::from_millis(block_time), + max_bytes_per_bundle, + }, + Handle::new(serialized_rollup_transaction_tx), + )) } /// Return a reader to the status reporting channel @@ -221,7 +250,7 @@ impl Executor { } // receive new seq_action and bundle it - Some(seq_action) = self.serialized_rollup_transactions_rx.recv() => { + Some(seq_action) = self.serialized_rollup_transactions.recv() => { let rollup_id = seq_action.rollup_id; if let Err(e) = bundle_factory.try_push(seq_action) { warn!( diff --git a/crates/astria-composer/src/executor/tests.rs b/crates/astria-composer/src/executor/tests.rs index 1252521b98..67b3c0dd67 100644 --- a/crates/astria-composer/src/executor/tests.rs +++ b/crates/astria-composer/src/executor/tests.rs @@ -194,14 +194,11 @@ async fn wait_for_startup( async fn full_bundle() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg) = setup().await; - let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) = - tokio::sync::mpsc::channel::(256); - let executor = Executor::new( + let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, - serialized_rollup_transactions_rx, ) .unwrap(); @@ -229,12 +226,12 @@ async fn full_bundle() { }; // push both sequence actions to the executor in order to force the full bundle to be sent - serialized_rollup_transactions_tx - .send(seq0.clone()) + executor_handle + .send_timeout(seq0.clone(), Duration::from_millis(1000)) .await .unwrap(); - serialized_rollup_transactions_tx - .send(seq1.clone()) + executor_handle + .send_timeout(seq1.clone(), Duration::from_millis(1000)) .await .unwrap(); @@ -283,14 +280,11 @@ async fn full_bundle() { async fn bundle_triggered_by_block_timer() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg) = setup().await; - let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) = - tokio::sync::mpsc::channel::(256); - let executor = Executor::new( + let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, - serialized_rollup_transactions_rx, ) .unwrap(); let status = executor.subscribe(); @@ -313,8 +307,8 @@ async fn bundle_triggered_by_block_timer() { // make sure at least one block has passed so that the executor will submit the bundle // despite it not being full time::pause(); - serialized_rollup_transactions_tx - .send(seq0.clone()) + executor_handle + .send_timeout(seq0.clone(), Duration::from_millis(1000)) .await .unwrap(); time::advance(Duration::from_millis(cfg.block_time_ms)).await; @@ -364,14 +358,11 @@ async fn bundle_triggered_by_block_timer() { async fn two_seq_actions_single_bundle() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg) = setup().await; - let (serialized_rollup_transactions_tx, serialized_rollup_transactions_rx) = - tokio::sync::mpsc::channel::(256); - let executor = Executor::new( + let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, - serialized_rollup_transactions_rx, ) .unwrap(); @@ -401,12 +392,12 @@ async fn two_seq_actions_single_bundle() { // make sure at least one block has passed so that the executor will submit the bundle // despite it not being full time::pause(); - serialized_rollup_transactions_tx - .send(seq0.clone()) + executor_handle + .send_timeout(seq0.clone(), Duration::from_millis(1000)) .await .unwrap(); - serialized_rollup_transactions_tx - .send(seq1.clone()) + executor_handle + .send_timeout(seq1.clone(), Duration::from_millis(1000)) .await .unwrap(); time::advance(Duration::from_millis(cfg.block_time_ms)).await; diff --git a/crates/astria-composer/src/lib.rs b/crates/astria-composer/src/lib.rs index 22cfec1d7f..eb71bd5a21 100644 --- a/crates/astria-composer/src/lib.rs +++ b/crates/astria-composer/src/lib.rs @@ -44,8 +44,6 @@ mod composer; pub mod config; mod executor; mod rollup; -#[cfg(test)] -mod tests; pub use build_info::BUILD_INFO; pub use composer::Composer; diff --git a/crates/astria-composer/src/tests.rs b/crates/astria-composer/src/tests.rs deleted file mode 100644 index fa85a81dba..0000000000 --- a/crates/astria-composer/src/tests.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::collections::HashMap; - -use astria_core::sequencer::v1::{ - asset::default_native_asset_id, - transaction::action::SequenceAction, - RollupId, -}; -use ethers::types::Transaction; -use tokio_util::task::JoinMap; - -use crate::{ - collectors::{ - geth::Status, - Geth, - }, - composer::reconnect_exited_collector, -}; - -/// This tests the `reconnect_exited_collector` handler. -#[tokio::test] -async fn collector_is_reconnected_after_exit() { - let mock_geth = test_utils::mock::Geth::spawn().await; - let rollup_name = "test".to_string(); - let rollup_url = format!("ws://{}", mock_geth.local_addr()); - let rollups = HashMap::from([(rollup_name.clone(), rollup_url.clone())]); - - let (tx, mut rx) = tokio::sync::mpsc::channel(16); - - let mut collector_tasks = JoinMap::new(); - let collector = Geth::new(rollup_name.clone(), rollup_url.clone(), tx.clone()); - let mut status = collector.subscribe(); - collector_tasks.spawn(rollup_name.clone(), collector.run_until_stopped()); - status.wait_for(Status::is_connected).await.unwrap(); - let rollup_tx = Transaction::default(); - let expected_seq_action = SequenceAction { - rollup_id: RollupId::from_unhashed_bytes(&rollup_name), - data: Transaction::default().rlp().to_vec(), - fee_asset_id: default_native_asset_id(), - }; - let _ = mock_geth.push_tx(rollup_tx.clone()).unwrap(); - let collector_tx = rx.recv().await.unwrap(); - - assert_eq!( - RollupId::from_unhashed_bytes(&rollup_name), - collector_tx.rollup_id, - ); - assert_eq!(expected_seq_action.data, collector_tx.data); - - let _ = mock_geth.abort().unwrap(); - - let (exited_rollup_name, exit_result) = collector_tasks.join_next().await.unwrap(); - assert_eq!(exited_rollup_name, rollup_name); - assert!(collector_tasks.is_empty()); - - // after aborting pushing a new tx to subscribers should fail as there are no broadcast - // receivers - assert!(mock_geth.push_tx(Transaction::default()).is_err()); - - let mut statuses = HashMap::new(); - reconnect_exited_collector( - &mut statuses, - &mut collector_tasks, - tx.clone(), - &rollups, - rollup_name.clone(), - exit_result, - ); - - assert!(collector_tasks.contains_key(&rollup_name)); - statuses - .get_mut(&rollup_name) - .unwrap() - .wait_for(Status::is_connected) - .await - .unwrap(); - let _ = mock_geth.push_tx(rollup_tx).unwrap(); - let collector_tx = rx.recv().await.unwrap(); - - assert_eq!( - RollupId::from_unhashed_bytes(&rollup_name), - collector_tx.rollup_id, - ); - assert_eq!(expected_seq_action.data, collector_tx.data); -} diff --git a/crates/astria-composer/tests/blackbox/composer.rs b/crates/astria-composer/tests/blackbox/composer.rs index 7c93bdd5e2..03458f15e6 100644 --- a/crates/astria-composer/tests/blackbox/composer.rs +++ b/crates/astria-composer/tests/blackbox/composer.rs @@ -54,6 +54,46 @@ async fn tx_from_one_rollup_is_received_by_sequencer() { .expect("mocked sequencer should have received a broadcast message from composer"); } +#[tokio::test] +async fn collector_restarts_after_exit() { + // Spawn a composer with a mock sequencer and a mock rollup node + // Initial nonce is 0 + let test_composer = spawn_composer(&["test1"]).await; + tokio::time::timeout( + Duration::from_millis(100), + test_composer.setup_guard.wait_until_satisfied(), + ) + .await + .expect("setup guard failed"); + + // get rollup node + let rollup_node = test_composer.rollup_nodes.get("test1").unwrap(); + // abort the rollup node. The collector should restart after this abort + rollup_node.cancel_subscriptions().unwrap(); + + // FIXME: There is a race condition in the mock geth server between when the tx is pushed + // and when the `eth_subscribe` task reads it. + tokio::time::sleep(Duration::from_millis(100)).await; + + // the collector will be restarted now, we should be able to send a tx normally + let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")]; + let mock_guard = + mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_rollup_ids, vec![0]).await; + test_composer.rollup_nodes["test1"] + .push_tx(Transaction::default()) + .unwrap(); + + // wait for 1 sequencer block time to make sure the bundle is preempted + // we added an extra 1000ms to the block time to make sure the collector has restarted + // as the collector has to establish a new subscription on start up. + tokio::time::timeout( + Duration::from_millis(test_composer.cfg.block_time_ms + 1000), + mock_guard.wait_until_satisfied(), + ) + .await + .expect("mocked sequencer should have received a broadcast message from composer"); +} + #[tokio::test] async fn invalid_nonce_failure_causes_tx_resubmission_under_different_nonce() { use crate::helper::mock_sequencer::mount_abci_query_mock; diff --git a/crates/astria-test-utils/src/mock/geth.rs b/crates/astria-test-utils/src/mock/geth.rs index c8e82aa114..7a6944203e 100644 --- a/crates/astria-test-utils/src/mock/geth.rs +++ b/crates/astria-test-utils/src/mock/geth.rs @@ -238,11 +238,11 @@ impl Geth { /// # Errors /// /// Returns the same error as tokio's [`Sender::send`]. - pub fn abort(&self) -> Result> { + pub fn cancel_subscriptions(&self) -> Result> { self.command.send(SubscriptionCommand::Abort) } - /// Push a new transaction into the mocket geth server. + /// Push a new transaction into the mocked geth server. /// /// If composer is subscribed to the mocked geth server using its /// `eth_subscribe` JSONRPC, the transaction will be immediately