From 8e0da40fc92908ec63a63c2b59a5cbe7bc94a243 Mon Sep 17 00:00:00 2001 From: Bharath Date: Wed, 20 Mar 2024 18:47:54 +0530 Subject: [PATCH] address review comments --- crates/astria-composer/src/collectors/geth.rs | 2 +- crates/astria-composer/src/executor/mod.rs | 14 +-- crates/astria-composer/src/executor/tests.rs | 10 +-- crates/astria-composer/src/lib.rs | 2 - crates/astria-composer/src/tests.rs | 90 ------------------- .../tests/blackbox/composer.rs | 36 ++++++++ crates/astria-test-utils/src/mock/geth.rs | 4 +- 7 files changed, 51 insertions(+), 107 deletions(-) delete mode 100644 crates/astria-composer/src/tests.rs diff --git a/crates/astria-composer/src/collectors/geth.rs b/crates/astria-composer/src/collectors/geth.rs index b9d78016ae..1bb5b0a957 100644 --- a/crates/astria-composer/src/collectors/geth.rs +++ b/crates/astria-composer/src/collectors/geth.rs @@ -160,7 +160,7 @@ impl Geth { }; match executor_handle - .send_with_timeout(seq_action, Duration::from_millis(500)) + .send_timeout(seq_action, Duration::from_millis(500)) .await { Ok(()) => {} diff --git a/crates/astria-composer/src/executor/mod.rs b/crates/astria-composer/src/executor/mod.rs index 25f7894ac5..bb9fa6d764 100644 --- a/crates/astria-composer/src/executor/mod.rs +++ b/crates/astria-composer/src/executor/mod.rs @@ -90,7 +90,7 @@ pub(super) struct Executor { // The status of this executor status: watch::Sender, // Channel for receiving `SequenceAction`s to be bundled. - sequence_action_rx: mpsc::Receiver, + serialized_rollup_transactions: mpsc::Receiver, // The client for submitting wrapped and signed pending eth transactions to the astria // sequencer. sequencer_client: sequencer_client::HttpClient, @@ -110,13 +110,13 @@ pub(super) struct Handle { } impl Handle { - pub(super) fn new(serialized_rollup_transactions_tx: mpsc::Sender) -> Self { + fn new(serialized_rollup_transactions_tx: mpsc::Sender) -> Self { Self { serialized_rollup_transactions_tx, } } - pub(super) async fn send_with_timeout( + pub(super) async fn send_timeout( &self, sequence_action: SequenceAction, timeout: Duration, @@ -169,20 +169,20 @@ impl Executor { let sequencer_address = Address::from_verification_key(sequencer_key.verification_key()); - let (sequence_action_tx, sequence_action_rx) = + let (serialized_rollup_transaction_tx, serialized_rollup_transaction_rx) = tokio::sync::mpsc::channel::(256); Ok(( Self { status, - sequence_action_rx, + serialized_rollup_transactions: serialized_rollup_transaction_rx, sequencer_client, sequencer_key, address: sequencer_address, block_time: Duration::from_millis(block_time), max_bytes_per_bundle, }, - Handle::new(sequence_action_tx), + Handle::new(serialized_rollup_transaction_tx), )) } @@ -250,7 +250,7 @@ impl Executor { } // receive new seq_action and bundle it - Some(seq_action) = self.sequence_action_rx.recv() => { + Some(seq_action) = self.serialized_rollup_transactions.recv() => { let rollup_id = seq_action.rollup_id; if let Err(e) = bundle_factory.try_push(seq_action) { warn!( diff --git a/crates/astria-composer/src/executor/tests.rs b/crates/astria-composer/src/executor/tests.rs index fbd50f073f..67b3c0dd67 100644 --- a/crates/astria-composer/src/executor/tests.rs +++ b/crates/astria-composer/src/executor/tests.rs @@ -227,11 +227,11 @@ async fn full_bundle() { // push both sequence actions to the executor in order to force the full bundle to be sent executor_handle - .send_with_timeout(seq0.clone(), Duration::from_millis(1000)) + .send_timeout(seq0.clone(), Duration::from_millis(1000)) .await .unwrap(); executor_handle - .send_with_timeout(seq1.clone(), Duration::from_millis(1000)) + .send_timeout(seq1.clone(), Duration::from_millis(1000)) .await .unwrap(); @@ -308,7 +308,7 @@ async fn bundle_triggered_by_block_timer() { // despite it not being full time::pause(); executor_handle - .send_with_timeout(seq0.clone(), Duration::from_millis(1000)) + .send_timeout(seq0.clone(), Duration::from_millis(1000)) .await .unwrap(); time::advance(Duration::from_millis(cfg.block_time_ms)).await; @@ -393,11 +393,11 @@ async fn two_seq_actions_single_bundle() { // despite it not being full time::pause(); executor_handle - .send_with_timeout(seq0.clone(), Duration::from_millis(1000)) + .send_timeout(seq0.clone(), Duration::from_millis(1000)) .await .unwrap(); executor_handle - .send_with_timeout(seq1.clone(), Duration::from_millis(1000)) + .send_timeout(seq1.clone(), Duration::from_millis(1000)) .await .unwrap(); time::advance(Duration::from_millis(cfg.block_time_ms)).await; diff --git a/crates/astria-composer/src/lib.rs b/crates/astria-composer/src/lib.rs index 22cfec1d7f..eb71bd5a21 100644 --- a/crates/astria-composer/src/lib.rs +++ b/crates/astria-composer/src/lib.rs @@ -44,8 +44,6 @@ mod composer; pub mod config; mod executor; mod rollup; -#[cfg(test)] -mod tests; pub use build_info::BUILD_INFO; pub use composer::Composer; diff --git a/crates/astria-composer/src/tests.rs b/crates/astria-composer/src/tests.rs deleted file mode 100644 index f23fa65a3c..0000000000 --- a/crates/astria-composer/src/tests.rs +++ /dev/null @@ -1,90 +0,0 @@ -use std::collections::HashMap; - -use astria_core::sequencer::v1::{ - asset::default_native_asset_id, - transaction::action::SequenceAction, - RollupId, -}; -use ethers::types::Transaction; -use tokio_util::task::JoinMap; - -use crate::{ - collectors::{ - geth::Status, - Geth, - }, - composer::reconnect_exited_collector, - executor, -}; - -/// This tests the `reconnect_exited_collector` handler. -#[tokio::test] -async fn collector_is_reconnected_after_exit() { - let mock_geth = test_utils::mock::Geth::spawn().await; - let rollup_name = "test".to_string(); - let rollup_url = format!("ws://{}", mock_geth.local_addr()); - let rollups = HashMap::from([(rollup_name.clone(), rollup_url.clone())]); - - let (tx, mut rx) = tokio::sync::mpsc::channel(16); - - let mut collector_tasks = JoinMap::new(); - let executor_handle = executor::Handle::new(tx.clone()); - let collector = Geth::new( - rollup_name.clone(), - rollup_url.clone(), - executor_handle.clone(), - ); - let mut status = collector.subscribe(); - collector_tasks.spawn(rollup_name.clone(), collector.run_until_stopped()); - status.wait_for(Status::is_connected).await.unwrap(); - let rollup_tx = Transaction::default(); - let expected_seq_action = SequenceAction { - rollup_id: RollupId::from_unhashed_bytes(&rollup_name), - data: Transaction::default().rlp().to_vec(), - fee_asset_id: default_native_asset_id(), - }; - let _ = mock_geth.push_tx(rollup_tx.clone()).unwrap(); - let collector_tx = rx.recv().await.unwrap(); - - assert_eq!( - RollupId::from_unhashed_bytes(&rollup_name), - collector_tx.rollup_id, - ); - assert_eq!(expected_seq_action.data, collector_tx.data); - - let _ = mock_geth.abort().unwrap(); - - let (exited_rollup_name, exit_result) = collector_tasks.join_next().await.unwrap(); - assert_eq!(exited_rollup_name, rollup_name); - assert!(collector_tasks.is_empty()); - - // after aborting pushing a new tx to subscribers should fail as there are no broadcast - // receivers - assert!(mock_geth.push_tx(Transaction::default()).is_err()); - - let mut statuses = HashMap::new(); - reconnect_exited_collector( - &mut statuses, - &mut collector_tasks, - executor_handle.clone(), - &rollups, - rollup_name.clone(), - exit_result, - ); - - assert!(collector_tasks.contains_key(&rollup_name)); - statuses - .get_mut(&rollup_name) - .unwrap() - .wait_for(Status::is_connected) - .await - .unwrap(); - let _ = mock_geth.push_tx(rollup_tx).unwrap(); - let collector_tx = rx.recv().await.unwrap(); - - assert_eq!( - RollupId::from_unhashed_bytes(&rollup_name), - collector_tx.rollup_id, - ); - assert_eq!(expected_seq_action.data, collector_tx.data); -} diff --git a/crates/astria-composer/tests/blackbox/composer.rs b/crates/astria-composer/tests/blackbox/composer.rs index 7c93bdd5e2..9f034ab82d 100644 --- a/crates/astria-composer/tests/blackbox/composer.rs +++ b/crates/astria-composer/tests/blackbox/composer.rs @@ -54,6 +54,42 @@ async fn tx_from_one_rollup_is_received_by_sequencer() { .expect("mocked sequencer should have received a broadcast message from composer"); } +#[tokio::test] +async fn collector_restarts_after_exit() { + // Spawn a composer with a mock sequencer and a mock rollup node + // Initial nonce is 0 + let test_composer = spawn_composer(&["test1"]).await; + tokio::time::timeout( + Duration::from_millis(100), + test_composer.setup_guard.wait_until_satisfied(), + ) + .await + .expect("setup guard failed"); + + // get rollup node + let rollup_node = test_composer.rollup_nodes.get("test1").unwrap(); + // abort the rollup node. The collector should restart after this abort + rollup_node.cancel_subscriptions().unwrap(); + + // the collector will be restarted now, we should be able to send a tx normally + let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")]; + let mock_guard = + mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_rollup_ids, vec![0]).await; + test_composer.rollup_nodes["test1"] + .push_tx(Transaction::default()) + .unwrap(); + + // wait for 1 sequencer block time to make sure the bundle is preempted + // we added an extra 1000ms to the block time to make sure the collector has restarted + // as the collector has to establish a new subscription on start up. + tokio::time::timeout( + Duration::from_millis(test_composer.cfg.block_time_ms + 1000), + mock_guard.wait_until_satisfied(), + ) + .await + .expect("mocked sequencer should have received a broadcast message from composer"); +} + #[tokio::test] async fn invalid_nonce_failure_causes_tx_resubmission_under_different_nonce() { use crate::helper::mock_sequencer::mount_abci_query_mock; diff --git a/crates/astria-test-utils/src/mock/geth.rs b/crates/astria-test-utils/src/mock/geth.rs index c8e82aa114..7a6944203e 100644 --- a/crates/astria-test-utils/src/mock/geth.rs +++ b/crates/astria-test-utils/src/mock/geth.rs @@ -238,11 +238,11 @@ impl Geth { /// # Errors /// /// Returns the same error as tokio's [`Sender::send`]. - pub fn abort(&self) -> Result> { + pub fn cancel_subscriptions(&self) -> Result> { self.command.send(SubscriptionCommand::Abort) } - /// Push a new transaction into the mocket geth server. + /// Push a new transaction into the mocked geth server. /// /// If composer is subscribed to the mocked geth server using its /// `eth_subscribe` JSONRPC, the transaction will be immediately