Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bharath-123 committed Mar 20, 2024
1 parent 74a9416 commit aa22f69
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 107 deletions.
2 changes: 1 addition & 1 deletion crates/astria-composer/src/collectors/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Geth {
};

match executor_handle
.send_with_timeout(seq_action, Duration::from_millis(500))
.send_timeout(seq_action, Duration::from_millis(500))
.await
{
Ok(()) => {}
Expand Down
14 changes: 7 additions & 7 deletions crates/astria-composer/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub(super) struct Executor {
// The status of this executor
status: watch::Sender<Status>,
// Channel for receiving `SequenceAction`s to be bundled.
sequence_action_rx: mpsc::Receiver<SequenceAction>,
serialized_rollup_transactions: mpsc::Receiver<SequenceAction>,
// The client for submitting wrapped and signed pending eth transactions to the astria
// sequencer.
sequencer_client: sequencer_client::HttpClient,
Expand All @@ -110,13 +110,13 @@ pub(super) struct Handle {
}

impl Handle {
pub(super) fn new(serialized_rollup_transactions_tx: mpsc::Sender<SequenceAction>) -> Self {
fn new(serialized_rollup_transactions_tx: mpsc::Sender<SequenceAction>) -> Self {
Self {
serialized_rollup_transactions_tx,
}
}

pub(super) async fn send_with_timeout(
pub(super) async fn send_timeout(
&self,
sequence_action: SequenceAction,
timeout: Duration,
Expand Down Expand Up @@ -169,20 +169,20 @@ impl Executor {

let sequencer_address = Address::from_verification_key(sequencer_key.verification_key());

let (sequence_action_tx, sequence_action_rx) =
let (serialized_rollup_transaction_tx, serialized_rollup_transaction_rx) =
tokio::sync::mpsc::channel::<SequenceAction>(256);

Ok((
Self {
status,
sequence_action_rx,
serialized_rollup_transactions: serialized_rollup_transaction_rx,
sequencer_client,
sequencer_key,
address: sequencer_address,
block_time: Duration::from_millis(block_time),
max_bytes_per_bundle,
},
Handle::new(sequence_action_tx),
Handle::new(serialized_rollup_transaction_tx),
))
}

Expand Down Expand Up @@ -250,7 +250,7 @@ impl Executor {
}

// receive new seq_action and bundle it
Some(seq_action) = self.sequence_action_rx.recv() => {
Some(seq_action) = self.serialized_rollup_transactions.recv() => {
let rollup_id = seq_action.rollup_id;
if let Err(e) = bundle_factory.try_push(seq_action) {
warn!(
Expand Down
10 changes: 5 additions & 5 deletions crates/astria-composer/src/executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ async fn full_bundle() {

// push both sequence actions to the executor in order to force the full bundle to be sent
executor_handle
.send_with_timeout(seq0.clone(), Duration::from_millis(1000))
.send_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
executor_handle
.send_with_timeout(seq1.clone(), Duration::from_millis(1000))
.send_timeout(seq1.clone(), Duration::from_millis(1000))
.await
.unwrap();

Expand Down Expand Up @@ -308,7 +308,7 @@ async fn bundle_triggered_by_block_timer() {
// despite it not being full
time::pause();
executor_handle
.send_with_timeout(seq0.clone(), Duration::from_millis(1000))
.send_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
time::advance(Duration::from_millis(cfg.block_time_ms)).await;
Expand Down Expand Up @@ -393,11 +393,11 @@ async fn two_seq_actions_single_bundle() {
// despite it not being full
time::pause();
executor_handle
.send_with_timeout(seq0.clone(), Duration::from_millis(1000))
.send_timeout(seq0.clone(), Duration::from_millis(1000))
.await
.unwrap();
executor_handle
.send_with_timeout(seq1.clone(), Duration::from_millis(1000))
.send_timeout(seq1.clone(), Duration::from_millis(1000))
.await
.unwrap();
time::advance(Duration::from_millis(cfg.block_time_ms)).await;
Expand Down
2 changes: 0 additions & 2 deletions crates/astria-composer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ mod composer;
pub mod config;
mod executor;
mod rollup;
#[cfg(test)]
mod tests;

pub use build_info::BUILD_INFO;
pub use composer::Composer;
Expand Down
90 changes: 0 additions & 90 deletions crates/astria-composer/src/tests.rs

This file was deleted.

37 changes: 37 additions & 0 deletions crates/astria-composer/tests/blackbox/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,43 @@ async fn tx_from_one_rollup_is_received_by_sequencer() {
.expect("mocked sequencer should have received a broadcast message from composer");
}

#[tokio::test]
async fn collector_restarts_after_exit() {
// Spawn a composer with a mock sequencer and a mock rollup node
// Initial nonce is 0
let test_composer = spawn_composer(&["test1"]).await;
tokio::time::timeout(
Duration::from_millis(100),
test_composer.setup_guard.wait_until_satisfied(),
)
.await
.expect("setup guard failed");

// get rollup node
let rollup_node = test_composer.rollup_nodes.get("test1").unwrap();
// abort the rollup node. The collector should restart after this abort
rollup_node.cancel_subscriptions().unwrap();

// wait for the collector to restart
tokio::time::sleep(Duration::from_millis(100)).await;

// the collector will be restarted now, we should be able to send a tx normally
let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")];
let mock_guard =
mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_rollup_ids, vec![0]).await;
test_composer.rollup_nodes["test1"]
.push_tx(Transaction::default())
.unwrap();

// wait for 1 sequencer block time to make sure the bundle is preempted
tokio::time::timeout(
Duration::from_millis(test_composer.cfg.block_time_ms + 1000),
mock_guard.wait_until_satisfied(),
)
.await
.expect("mocked sequencer should have received a broadcast message from composer");
}

#[tokio::test]
async fn invalid_nonce_failure_causes_tx_resubmission_under_different_nonce() {
use crate::helper::mock_sequencer::mount_abci_query_mock;
Expand Down
4 changes: 2 additions & 2 deletions crates/astria-test-utils/src/mock/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ impl Geth {
/// # Errors
///
/// Returns the same error as tokio's [`Sender::send`].
pub fn abort(&self) -> Result<usize, SendError<SubscriptionCommand>> {
pub fn cancel_subscriptions(&self) -> Result<usize, SendError<SubscriptionCommand>> {
self.command.send(SubscriptionCommand::Abort)
}

/// Push a new transaction into the mocket geth server.
/// Push a new transaction into the mocked geth server.
///
/// If composer is subscribed to the mocked geth server using its
/// `eth_subscribe` JSONRPC, the transaction will be immediately
Expand Down

0 comments on commit aa22f69

Please sign in to comment.