diff --git a/Cargo.lock b/Cargo.lock index 6092b3ff35..f20e88eefe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -560,6 +560,7 @@ dependencies = [ "humantime", "hyper", "insta", + "itertools 0.11.0", "once_cell", "pin-project-lite", "prost", diff --git a/crates/astria-composer/Cargo.toml b/crates/astria-composer/Cargo.toml index 0358d84c92..f04d69e367 100644 --- a/crates/astria-composer/Cargo.toml +++ b/crates/astria-composer/Cargo.toml @@ -46,6 +46,7 @@ tokio = { workspace = true, features = [ "rt-multi-thread", "sync", "time", + "signal", ] } tokio-util = { workspace = true, features = ["rt"] } tracing = { workspace = true, features = ["attributes"] } @@ -53,6 +54,7 @@ tryhard = { workspace = true } tonic = { workspace = true } tokio-stream = { workspace = true, features = ["net"] } tonic-health = "0.10.2" +itertools = "0.11.0" [dependencies.sequencer-client] package = "astria-sequencer-client" diff --git a/crates/astria-composer/src/collectors/geth.rs b/crates/astria-composer/src/collectors/geth.rs index 273321e2ae..66727b9602 100644 --- a/crates/astria-composer/src/collectors/geth.rs +++ b/crates/astria-composer/src/collectors/geth.rs @@ -27,10 +27,14 @@ use ethers::providers::{ ProviderError, Ws, }; -use tokio::sync::{ - mpsc::error::SendTimeoutError, - watch, +use tokio::{ + select, + sync::{ + mpsc::error::SendTimeoutError, + watch, + }, }; +use tokio_util::sync::CancellationToken; use tracing::{ debug, info, @@ -59,6 +63,7 @@ pub(crate) struct Geth { status: watch::Sender, /// Rollup URL url: String, + shutdown_token: CancellationToken, } #[derive(Debug)] @@ -80,7 +85,12 @@ impl Status { impl Geth { /// Initializes a new collector instance - pub(crate) fn new(chain_name: String, url: String, executor_handle: executor::Handle) -> Self { + pub(crate) fn new( + chain_name: String, + url: String, + executor_handle: executor::Handle, + shutdown_token: CancellationToken, + ) -> Self { let (status, _) = watch::channel(Status::new()); let rollup_id = RollupId::from_unhashed_bytes(&chain_name); info!( @@ -94,6 +104,7 @@ impl Geth { executor_handle, status, url, + shutdown_token, } } @@ -116,6 +127,7 @@ impl Geth { executor_handle, status, url, + shutdown_token, .. } = self; @@ -155,37 +167,53 @@ impl Geth { status.send_modify(|status| status.is_connected = true); - while let Some(tx) = tx_stream.next().await { - let tx_hash = tx.hash; - debug!(transaction.hash = %tx_hash, "collected transaction from rollup"); - let data = tx.rlp().to_vec(); - let seq_action = SequenceAction { - rollup_id, - data, - fee_asset_id: default_native_asset_id(), - }; - - match executor_handle - .send_timeout(seq_action, Duration::from_millis(500)) - .await - { - Ok(()) => {} - Err(SendTimeoutError::Timeout(_seq_action)) => { - warn!( - transaction.hash = %tx_hash, - "timed out sending new transaction to executor after 500ms; dropping tx" - ); - } - Err(SendTimeoutError::Closed(_seq_action)) => { - warn!( - transaction.hash = %tx_hash, - "executor channel closed while sending transaction; dropping transaction \ - and exiting event loop" - ); + loop { + select! { + biased; + () = shutdown_token.cancelled() => { + tx_stream.unsubscribe().await?; + status.send_modify(|status| status.is_connected = false); break; + }, + tx_res = tx_stream.next() => { + if let Some(tx) = tx_res { + let tx_hash = tx.hash; + debug!(transaction.hash = %tx_hash, "collected transaction from rollup"); + let data = tx.rlp().to_vec(); + let seq_action = SequenceAction { + rollup_id, + data, + fee_asset_id: default_native_asset_id(), + }; + + match executor_handle + .send_timeout(seq_action, Duration::from_millis(500)) + .await + { + Ok(()) => {} + Err(SendTimeoutError::Timeout(_seq_action)) => { + warn!( + transaction.hash = %tx_hash, + "timed out sending new transaction to executor after 500ms; dropping tx" + ); + } + Err(SendTimeoutError::Closed(_seq_action)) => { + warn!( + transaction.hash = %tx_hash, + "executor channel closed while sending transaction; dropping transaction \ + and exiting event loop" + ); + break; + } + } + } else { + status.send_modify(|status| status.is_connected = false); + break; + } } } } + Ok(()) } } diff --git a/crates/astria-composer/src/composer.rs b/crates/astria-composer/src/composer.rs index edcfaa24dd..5f1d65395c 100644 --- a/crates/astria-composer/src/composer.rs +++ b/crates/astria-composer/src/composer.rs @@ -1,19 +1,37 @@ use std::{ collections::HashMap, net::SocketAddr, + time::Duration, }; + use astria_eyre::eyre::{ self, WrapErr as _, }; -use tokio::{io, task::JoinError}; -use tokio::net::TcpListener; -use tokio::sync::watch; -use tokio_util::task::JoinMap; +use itertools::Itertools; +use tokio::{ + io, + signal::unix::{ + signal, + SignalKind, + }, + sync::watch, + task::{ + JoinError, + JoinHandle, + }, + time::timeout, +}; +use tokio_util::{ + sync::CancellationToken, + task::JoinMap, +}; use tracing::{ error, info, + warn, }; + use crate::{ api::{ self, @@ -57,6 +75,8 @@ pub struct Composer { /// The gRPC server that listens for incoming requests from the collectors via the /// GrpcCollector service. It also exposes a health service. grpc_server: GrpcServer, + /// Used to signal the Composer to shut down. + shutdown_token: CancellationToken, } /// Announces the current status of the Composer for other modules in the crate to use @@ -89,17 +109,24 @@ impl Composer { /// See `[from_config]` for its error scenarios. pub async fn from_config(cfg: &Config) -> eyre::Result { let (composer_status_sender, _) = watch::channel(Status::default()); + let shutdown_token = CancellationToken::new(); + let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, + shutdown_token.clone(), ) .wrap_err("executor construction from config failed")?; - let grpc_server = GrpcServer::new(cfg.grpc_addr, executor_handle.clone()) - .await - .wrap_err("failed to create grpc server")?; + let grpc_server = GrpcServer::new( + cfg.grpc_addr, + executor_handle.clone(), + shutdown_token.clone(), + ) + .await + .wrap_err("grpc collector construction from config failed")?; info!( listen_addr = %grpc_server.local_addr().wrap_err("grpc server listener not bound")?, @@ -124,8 +151,12 @@ impl Composer { let geth_collectors = rollups .iter() .map(|(rollup_name, url)| { - let collector = - Geth::new(rollup_name.clone(), url.clone(), executor_handle.clone()); + let collector = Geth::new( + rollup_name.clone(), + url.clone(), + executor_handle.clone(), + shutdown_token.clone(), + ); (rollup_name.clone(), collector) }) .collect::>(); @@ -145,6 +176,7 @@ impl Composer { geth_collector_statuses, geth_collector_tasks: JoinMap::new(), grpc_server, + shutdown_token, }) } @@ -164,6 +196,9 @@ impl Composer { /// /// # Errors /// It errors out if the API Server, Executor or any of the Geth Collectors fail to start. + /// + /// # Panics + /// It panics if the Composer cannot set the SIGTERM listener. pub async fn run_until_stopped(self) -> eyre::Result<()> { let Self { api_server, @@ -175,6 +210,7 @@ impl Composer { rollups, mut geth_collector_statuses, grpc_server, + shutdown_token, } = self; // run the api server @@ -206,19 +242,29 @@ impl Composer { .wrap_err("grpc server failed") }); + let mut sigterm = signal(SignalKind::terminate()).expect( + "setting a SIGTERM listener should always work on unix; is this running on unix?", + ); + loop { tokio::select!( + biased; + _ = sigterm.recv() => { + info!("received SIGTERM; shutting down"); + shutdown_token.cancel(); + break; + }, o = &mut api_task => { report_exit("api server unexpectedly ended", o); - return Ok(()); + break; }, o = &mut executor_task => { report_exit("executor unexpectedly ended", o); - return Ok(()); + break; }, o = &mut grpc_server_handle => { report_exit("grpc server unexpectedly ended", o); - return Ok(()); + break; }, Some((rollup, collector_exit)) = geth_collector_tasks.join_next() => { reconnect_exited_collector( @@ -228,10 +274,81 @@ impl Composer { &rollups, rollup, collector_exit, + shutdown_token.clone() ); }); } + + shutdown( + api_task, + executor_task, + grpc_server_handle, + geth_collector_tasks, + ) + .await + } +} + +async fn shutdown( + api_server_task_handle: JoinHandle>, + executor_task_handle: JoinHandle>, + grpc_server_task_handle: JoinHandle>, + mut geth_collector_tasks: JoinMap>, +) -> eyre::Result<()> { + // wait 5s for each handle as k8s issues SIGKILL in 30s + match tokio::time::timeout(std::time::Duration::from_secs(5), api_server_task_handle) + .await + .map(flatten) + { + Ok(Ok(())) => info!("api server shut down"), + Ok(Err(error)) => error!(%error, "api server failed to shut down"), + Err(error) => error!(%error, "api server panicked"), + } + // We give executor 5 more seconds to shutdown. 5s + 5s = 10s. + match tokio::time::timeout(std::time::Duration::from_secs(5), executor_task_handle) + .await + .map(flatten) + { + Ok(Ok(())) => info!("executor shut down"), + Ok(Err(error)) => error!(%error, "executor failed to shut down"), + Err(error) => error!(%error, "executor panciked"), + } + // We give the grpc server 5 seconds to shutdown. 10s + 5s = 15s + match tokio::time::timeout(std::time::Duration::from_secs(5), grpc_server_task_handle) + .await + .map(flatten) + { + Ok(Ok(())) => info!("grpc server shut down"), + Ok(Err(error)) => error!(%error, "grpc server failed to shut down"), + Err(error) => error!(%error, "grpc server failed to shut down"), + } + + let shutdown_loop = async { + while let Some((name, res)) = geth_collector_tasks.join_next().await { + let message = "task shut down"; + match flatten(res) { + Ok(()) => info!(name, message), + Err(error) => error!(name, %error, message), + } + } + }; + + // we give 14s to shutdown all the other geth collectors. 15s + 14s = 29s + if timeout(Duration::from_secs(14), shutdown_loop) + .await + .is_err() + { + let tasks = geth_collector_tasks.keys().join(", "); + warn!( + tasks = format_args!("[{tasks}]"), + "aborting all tasks that have not yet shut down", + ); + geth_collector_tasks.abort_all(); + } else { + info!("all tasks shut down regularly"); } + + Ok(()) } async fn wait_for_executor( @@ -297,6 +414,7 @@ pub(super) fn reconnect_exited_collector( rollups: &HashMap, rollup: String, exit_result: Result, JoinError>, + shutdown_token: CancellationToken, ) { report_exit("collector", exit_result); let Some(url) = rollups.get(&rollup) else { @@ -307,7 +425,7 @@ pub(super) fn reconnect_exited_collector( return; }; - let collector = Geth::new(rollup.clone(), url.clone(), executor_handle); + let collector = Geth::new(rollup.clone(), url.clone(), executor_handle, shutdown_token); collector_statuses.insert(rollup.clone(), collector.subscribe()); collector_tasks.spawn(rollup, collector.run_until_stopped()); } @@ -323,3 +441,11 @@ fn report_exit(task_name: &str, outcome: Result, JoinError>) { } } } + +pub(crate) fn flatten(res: Result, JoinError>) -> eyre::Result { + match res { + Ok(Ok(val)) => Ok(val), + Ok(Err(err)) => Err(err).wrap_err("task returned with error"), + Err(err) => Err(err).wrap_err("task panicked"), + } +} diff --git a/crates/astria-composer/src/executor/bundle_factory/mod.rs b/crates/astria-composer/src/executor/bundle_factory/mod.rs index a94abaebe2..4b9c23886f 100644 --- a/crates/astria-composer/src/executor/bundle_factory/mod.rs +++ b/crates/astria-composer/src/executor/bundle_factory/mod.rs @@ -194,6 +194,16 @@ impl BundleFactory { .or_else(|| Some(self.curr_bundle.flush())) .unwrap_or(SizedBundle::new(self.curr_bundle.max_size)) } + + /// Returns the number of bundles that have been finished but not yet sent to the sequencer. + pub(super) fn remaining_finished_bundles(&self) -> usize { + self.finished.len() + } + + /// Returns the number of sequence actions in the current bundle. + pub(super) fn curr_bundle_sequence_actions_size(&self) -> usize { + self.curr_bundle.buffer.len() + } } pub(super) struct NextFinishedBundle<'a> { diff --git a/crates/astria-composer/src/executor/mod.rs b/crates/astria-composer/src/executor/mod.rs index cb35eccf49..4fdfa5c113 100644 --- a/crates/astria-composer/src/executor/mod.rs +++ b/crates/astria-composer/src/executor/mod.rs @@ -56,6 +56,7 @@ use tokio::{ Instant, }, }; +use tokio_util::sync::CancellationToken; use tracing::{ debug, error, @@ -103,6 +104,7 @@ pub(super) struct Executor { block_time: tokio::time::Duration, // Max bytes in a sequencer action bundle max_bytes_per_bundle: usize, + shutdown_token: CancellationToken, } #[derive(Clone)] @@ -157,6 +159,7 @@ impl Executor { private_key: &SecretString, block_time: u64, max_bytes_per_bundle: usize, + shutdown_token: CancellationToken, ) -> eyre::Result<(Self, Handle)> { let sequencer_client = sequencer_client::HttpClient::new(sequencer_url) .wrap_err("failed constructing sequencer client")?; @@ -182,6 +185,7 @@ impl Executor { address: sequencer_address, block_time: Duration::from_millis(block_time), max_bytes_per_bundle, + shutdown_token, }, Handle::new(serialized_rollup_transaction_tx), )) @@ -230,6 +234,20 @@ impl Executor { select! { biased; + () = self.shutdown_token.cancelled() => { + info!("executor shutting down."); + // do not accept any new txs during shutdown. + self.serialized_rollup_transactions.close(); + + self.status.send_modify(|status| status.is_connected = false); + + // report that if there are any existing bundles to be submitted they will be lost + let remaining_finished_bundles = bundle_factory.remaining_finished_bundles(); + let curr_bundle_sequence_actions_size = bundle_factory.curr_bundle_sequence_actions_size(); + report_remaining_bundles(remaining_finished_bundles, curr_bundle_sequence_actions_size); + + break Ok(()); + } // process submission result and update nonce rsp = &mut submission_fut, if !submission_fut.is_terminated() => { match rsp { @@ -318,6 +336,26 @@ async fn get_latest_nonce( .wrap_err("failed getting latest nonce from sequencer after 1024 attempts") } +fn report_remaining_bundles( + remaining_finished_bundles: usize, + curr_bundle_sequence_actions_size: usize, +) { + if remaining_finished_bundles > 0 { + warn!( + "There are still {:?} bundles remaining to be submitted which will be lost upon \ + shutdown.", + remaining_finished_bundles + ); + } + if curr_bundle_sequence_actions_size > 0 { + warn!( + "There are still {:?} sequence actions in the current bundle which will be lost upon \ + shutdown.", + curr_bundle_sequence_actions_size + ); + } +} + /// Queries the sequencer for the latest nonce with an exponential backoff #[instrument( name = "submit signed transaction", diff --git a/crates/astria-composer/src/executor/tests.rs b/crates/astria-composer/src/executor/tests.rs index 59e13d3deb..f1ea0c582b 100644 --- a/crates/astria-composer/src/executor/tests.rs +++ b/crates/astria-composer/src/executor/tests.rs @@ -21,6 +21,7 @@ use tokio::{ sync::watch, time, }; +use tokio_util::sync::CancellationToken; use tracing::debug; use wiremock::{ matchers::{ @@ -195,11 +196,13 @@ async fn wait_for_startup( async fn full_bundle() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg) = setup().await; + let shutdown_token = CancellationToken::new(); let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, + shutdown_token, ) .unwrap(); @@ -281,11 +284,13 @@ async fn full_bundle() { async fn bundle_triggered_by_block_timer() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg) = setup().await; + let shutdown_token = CancellationToken::new(); let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, + shutdown_token, ) .unwrap(); @@ -360,11 +365,13 @@ async fn bundle_triggered_by_block_timer() { async fn two_seq_actions_single_bundle() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg) = setup().await; + let shutdown_token = CancellationToken::new(); let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, + shutdown_token.clone(), ) .unwrap(); diff --git a/crates/astria-composer/src/grpc.rs b/crates/astria-composer/src/grpc.rs index 30fc02e642..9448cafc90 100644 --- a/crates/astria-composer/src/grpc.rs +++ b/crates/astria-composer/src/grpc.rs @@ -17,6 +17,7 @@ use tokio::{ io, net::TcpListener, }; +use tokio_util::sync::CancellationToken; use crate::{ collectors, @@ -30,12 +31,14 @@ use crate::{ pub(crate) struct GrpcServer { listener: TcpListener, grpc_collector: collectors::Grpc, + shutdown_token: CancellationToken, } impl GrpcServer { pub(crate) async fn new( grpc_addr: SocketAddr, executor: executor::Handle, + shutdown_token: CancellationToken, ) -> eyre::Result { let listener = TcpListener::bind(grpc_addr) .await @@ -45,6 +48,7 @@ impl GrpcServer { Ok(Self { listener, grpc_collector, + shutdown_token, }) } @@ -68,9 +72,10 @@ impl GrpcServer { .await; grpc_server - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new( - self.listener, - )) + .serve_with_incoming_shutdown( + tokio_stream::wrappers::TcpListenerStream::new(self.listener), + self.shutdown_token.cancelled(), + ) .await .wrap_err("failed to run grpc server") } diff --git a/crates/astria-composer/tests/blackbox/composer.rs b/crates/astria-composer/tests/blackbox/composer.rs deleted file mode 100644 index 895544c271..0000000000 --- a/crates/astria-composer/tests/blackbox/composer.rs +++ /dev/null @@ -1,491 +0,0 @@ -use std::time::Duration; - -use astria_core::{ - generated::{ - composer::v1alpha1::{ - grpc_collector_service_client::GrpcCollectorServiceClient, - SubmitRollupTransactionRequest, - }, - sequencer::v1::NonceResponse, - }, - sequencer::v1::{ - AbciErrorCode, - RollupId, - SignedTransaction, - }, -}; -use ethers::types::Transaction; -use tendermint_rpc::{ - endpoint::broadcast::tx_sync, - request, - response, - Id, -}; -use tracing::debug; -use wiremock::{ - Mock, - MockGuard, - MockServer, - Request, - ResponseTemplate, -}; - -use crate::helper::spawn_composer; - -#[tokio::test] -async fn tx_from_one_rollup_is_received_by_sequencer_from_geth_collector() { - // Spawn a composer with a mock sequencer and a mock rollup node - // Initial nonce is 0 - let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("setup guard failed"); - - let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")]; - let mock_guard = - mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_rollup_ids, vec![0]).await; - test_composer.rollup_nodes["test1"] - .push_tx(Transaction::default()) - .unwrap(); - - // wait for 1 sequencer block time to make sure the bundle is preempted - tokio::time::timeout( - Duration::from_millis(test_composer.cfg.block_time_ms), - mock_guard.wait_until_satisfied(), - ) - .await - .expect("mocked sequencer should have received a broadcast message from composer"); -} - -#[tokio::test] -async fn tx_from_one_rollup_is_received_by_sequencer_from_grpc_collector() { - let test_composer = spawn_composer(&[]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("setup guard failed"); - - let rollup_id = RollupId::from_unhashed_bytes("test1"); - let expected_chain_ids = vec![rollup_id]; - let mock_guard = - mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_chain_ids, vec![0]).await; - - let tx = Transaction::default(); - // send sequence action request to the grpc generic collector - let mut composer_client = GrpcCollectorServiceClient::connect(format!( - "http://{}", - test_composer.grpc_collector_addr - )) - .await - .unwrap(); - composer_client - .submit_rollup_transaction(SubmitRollupTransactionRequest { - rollup_id: rollup_id.as_ref().to_vec(), - data: tx.rlp().to_vec(), - }) - .await - .expect("error submitting sequence actions to generic collector"); - - // wait for 1 sequencer block time to make sure the bundle is preempted - tokio::time::timeout( - Duration::from_millis(test_composer.cfg.block_time_ms), - mock_guard.wait_until_satisfied(), - ) - .await - .expect("mocked sequencer should have received a broadcast message from composer"); -} - -#[tokio::test] -async fn collector_restarts_after_exit() { - // Spawn a composer with a mock sequencer and a mock rollup node - // Initial nonce is 0 - let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("setup guard failed"); - - // get rollup node - let rollup_node = test_composer.rollup_nodes.get("test1").unwrap(); - // abort the rollup node. The collector should restart after this abort - rollup_node.cancel_subscriptions().unwrap(); - - // FIXME: There is a race condition in the mock geth server between when the tx is pushed - // and when the `eth_subscribe` task reads it. - tokio::time::sleep(Duration::from_millis(100)).await; - - // the collector will be restarted now, we should be able to send a tx normally - let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")]; - let mock_guard = - mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_rollup_ids, vec![0]).await; - test_composer.rollup_nodes["test1"] - .push_tx(Transaction::default()) - .unwrap(); - - // wait for 1 sequencer block time to make sure the bundle is preempted - // we added an extra 1000ms to the block time to make sure the collector has restarted - // as the collector has to establish a new subscription on start up. - tokio::time::timeout( - Duration::from_millis(test_composer.cfg.block_time_ms + 1000), - mock_guard.wait_until_satisfied(), - ) - .await - .expect("mocked sequencer should have received a broadcast message from composer"); -} - -#[tokio::test] -async fn invalid_nonce_failure_causes_tx_resubmission_under_different_nonce_geth_collector() { - use crate::helper::mock_sequencer::mount_abci_query_mock; - - // Spawn a composer with a mock sequencer and a mock rollup node - // Initial nonce is 0 - let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("setup guard failed"); - - // Reject the first transaction for invalid nonce - let invalid_nonce_guard = mount_broadcast_tx_sync_invalid_nonce_mock( - &test_composer.sequencer, - RollupId::from_unhashed_bytes("test1"), - ) - .await; - - // Mount a response of 0 to a nonce query - let nonce_refetch_guard = mount_abci_query_mock( - &test_composer.sequencer, - "accounts/nonce", - NonceResponse { - height: 0, - nonce: 1, - }, - ) - .await; - - let expected_rollup_ids = vec![RollupId::from_unhashed_bytes("test1")]; - // Expect nonce 1 again so that the resubmitted tx is accepted - let valid_nonce_guard = - mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_rollup_ids, vec![1]).await; - - // Push a tx to the rollup node so that it is picked up by the composer and submitted with the - // stored nonce of 0, triggering the nonce refetch process - test_composer.rollup_nodes["test1"] - .push_tx(Transaction::default()) - .unwrap(); - - // wait for 1 sequencer block time to make sure the bundle is preempted - tokio::time::timeout( - Duration::from_millis(test_composer.cfg.block_time_ms), - invalid_nonce_guard.wait_until_satisfied(), - ) - .await - .expect("invalid nonce guard failed"); - - tokio::time::timeout( - Duration::from_millis(100), - nonce_refetch_guard.wait_until_satisfied(), - ) - .await - .expect("nonce refetch guard failed"); - - tokio::time::timeout( - Duration::from_millis(100), - valid_nonce_guard.wait_until_satisfied(), - ) - .await - .expect("valid nonce guard failed"); -} - -#[tokio::test] -async fn invalid_nonce_failure_causes_tx_resubmission_under_different_nonce_grpc_collector() { - use crate::helper::mock_sequencer::mount_abci_query_mock; - - // Spawn a composer with a mock sequencer and a mock rollup node - // Initial nonce is 0 - let rollup_id = RollupId::from_unhashed_bytes("test1"); - let test_composer = spawn_composer(&[rollup_id.to_string().as_str()]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("setup guard failed"); - - // Reject the first transaction for invalid nonce - let invalid_nonce_guard = - mount_broadcast_tx_sync_invalid_nonce_mock(&test_composer.sequencer, rollup_id).await; - - // Mount a response of 0 to a nonce query - let nonce_refetch_guard = mount_abci_query_mock( - &test_composer.sequencer, - "accounts/nonce", - NonceResponse { - height: 0, - nonce: 1, - }, - ) - .await; - - let expected_chain_ids = vec![rollup_id]; - // Expect nonce 1 again so that the resubmitted tx is accepted - let valid_nonce_guard = - mount_broadcast_tx_sync_mock(&test_composer.sequencer, expected_chain_ids, vec![1]).await; - - // Send a tx to the composer so that it is picked up by the generic collector and submitted with - // the stored nonce of 0, triggering the nonce refetch process - let tx = Transaction::default(); - // send sequence action request to the grpc generic collector - let mut composer_client = GrpcCollectorServiceClient::connect(format!( - "http://{}", - test_composer.grpc_collector_addr - )) - .await - .unwrap(); - composer_client - .submit_rollup_transaction(SubmitRollupTransactionRequest { - rollup_id: rollup_id.as_ref().to_vec(), - data: tx.rlp().to_vec(), - }) - .await - .expect("error submitting sequence actions to generic collector"); - - // wait for 1 sequencer block time to make sure the bundle is preempted - tokio::time::timeout( - Duration::from_millis(test_composer.cfg.block_time_ms), - invalid_nonce_guard.wait_until_satisfied(), - ) - .await - .expect("invalid nonce guard failed"); - - tokio::time::timeout( - Duration::from_millis(100), - nonce_refetch_guard.wait_until_satisfied(), - ) - .await - .expect("nonce refetch guard failed"); - - tokio::time::timeout( - Duration::from_millis(100), - valid_nonce_guard.wait_until_satisfied(), - ) - .await - .expect("valid nonce guard failed"); -} - -#[tokio::test] -async fn single_rollup_tx_payload_integrity_geth_collector() { - // Spawn a composer with a mock sequencer and a mock rollup node - // Initial nonce is 0 - let test_composer = spawn_composer(&["test1"]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("setup guard failed"); - - let tx: Transaction = serde_json::from_str(TEST_ETH_TX_JSON).unwrap(); - let mock_guard = - mount_matcher_verifying_tx_integrity(&test_composer.sequencer, tx.clone()).await; - - test_composer.rollup_nodes["test1"].push_tx(tx).unwrap(); - - // wait for 1 sequencer block time to make sure the bundle is preempted - tokio::time::timeout( - Duration::from_millis(test_composer.cfg.block_time_ms), - mock_guard.wait_until_satisfied(), - ) - .await - .expect("mock failed to verify transaction integrity"); -} - -#[tokio::test] -async fn single_rollup_tx_payload_integrity_grpc_collector() { - // Spawn a composer with a mock sequencer and a mock rollup node - // Initial nonce is 0 - let rollup_id = RollupId::from_unhashed_bytes("test1"); - let test_composer = spawn_composer(&[]).await; - tokio::time::timeout( - Duration::from_millis(100), - test_composer.setup_guard.wait_until_satisfied(), - ) - .await - .expect("setup guard failed"); - - let tx: Transaction = serde_json::from_str(TEST_ETH_TX_JSON).unwrap(); - let mock_guard = - mount_matcher_verifying_tx_integrity(&test_composer.sequencer, tx.clone()).await; - - // send sequence action request to the grpc generic collector - let mut composer_client = GrpcCollectorServiceClient::connect(format!( - "http://{}", - test_composer.grpc_collector_addr - )) - .await - .unwrap(); - composer_client - .submit_rollup_transaction(SubmitRollupTransactionRequest { - rollup_id: rollup_id.as_ref().to_vec(), - data: tx.rlp().to_vec(), - }) - .await - .expect("error submitting sequence actions to generic collector"); - - // wait for 1 sequencer block time to make sure the bundle is preempted - tokio::time::timeout( - Duration::from_millis(test_composer.cfg.block_time_ms), - mock_guard.wait_until_satisfied(), - ) - .await - .expect("mock failed to verify transaction integrity"); -} - -/// Deserizalizes the bytes contained in a `tx_sync::Request` to a signed sequencer transaction and -/// verifies that the contained sequence action is in the given `expected_rollup_ids` and -/// `expected_nonces`. -async fn mount_broadcast_tx_sync_mock( - server: &MockServer, - expected_rollup_ids: Vec, - expected_nonces: Vec, -) -> MockGuard { - let expected_calls = expected_nonces.len().try_into().unwrap(); - let matcher = move |request: &Request| { - let (rollup_id, nonce) = rollup_id_nonce_from_request(request); - - let valid_rollup_id = expected_rollup_ids.contains(&rollup_id); - let valid_nonce = expected_nonces.contains(&nonce); - - valid_rollup_id && valid_nonce - }; - let jsonrpc_rsp = response::Wrapper::new_with_id( - Id::Num(1), - Some(tx_sync::Response { - code: 0.into(), - data: vec![].into(), - log: String::new(), - hash: tendermint::Hash::Sha256([0; 32]), - }), - None, - ); - - Mock::given(matcher) - .respond_with(ResponseTemplate::new(200).set_body_json(&jsonrpc_rsp)) - .up_to_n_times(expected_calls) - .expect(expected_calls) - .mount_as_scoped(server) - .await -} - -/// Deserizalizes the bytes contained in a `tx_sync::Request` to a signed sequencer transaction and -/// verifies that the contained sequence action is for the given `expected_rollup_id`. It then -/// rejects the transaction for an invalid nonce. -async fn mount_broadcast_tx_sync_invalid_nonce_mock( - server: &MockServer, - expected_rollup_id: RollupId, -) -> MockGuard { - let matcher = move |request: &Request| { - let (rollup_id, _) = rollup_id_nonce_from_request(request); - rollup_id == expected_rollup_id - }; - let jsonrpc_rsp = response::Wrapper::new_with_id( - Id::Num(1), - Some(tx_sync::Response { - code: AbciErrorCode::INVALID_NONCE.into(), - data: vec![].into(), - log: String::new(), - hash: tendermint::Hash::Sha256([0; 32]), - }), - None, - ); - Mock::given(matcher) - .respond_with(ResponseTemplate::new(200).set_body_json(&jsonrpc_rsp)) - .up_to_n_times(1) - .expect(1) - .mount_as_scoped(server) - .await -} - -/// Deserizalizes the bytes contained in a `tx_sync::Request` to a signed sequencer transaction and -/// verifies that it contains a sequence action with `expected_payload` as its contents. -async fn mount_matcher_verifying_tx_integrity( - server: &MockServer, - expected_rlp: Transaction, -) -> MockGuard { - let matcher = move |request: &Request| { - let sequencer_tx = signed_tx_from_request(request); - let sequence_action = sequencer_tx - .actions() - .first() - .unwrap() - .as_sequence() - .unwrap(); - - let expected_rlp = expected_rlp.rlp().to_vec(); - - expected_rlp == sequence_action.data - }; - let jsonrpc_rsp = response::Wrapper::new_with_id( - Id::Num(1), - Some(tx_sync::Response { - code: 0.into(), - data: vec![].into(), - log: String::new(), - hash: tendermint::Hash::Sha256([0; 32]), - }), - None, - ); - - Mock::given(matcher) - .respond_with(ResponseTemplate::new(200).set_body_json(&jsonrpc_rsp)) - .up_to_n_times(1) - .expect(1) - .mount_as_scoped(server) - .await -} - -fn signed_tx_from_request(request: &Request) -> SignedTransaction { - use astria_core::generated::sequencer::v1::SignedTransaction as RawSignedTransaction; - use prost::Message as _; - - let wrapped_tx_sync_req: request::Wrapper = - serde_json::from_slice(&request.body) - .expect("can't deserialize to JSONRPC wrapped tx_sync::Request"); - let raw_signed_tx = RawSignedTransaction::decode(&*wrapped_tx_sync_req.params().tx) - .expect("can't deserialize signed sequencer tx from broadcast jsonrpc request"); - let signed_tx = SignedTransaction::try_from_raw(raw_signed_tx) - .expect("can't convert raw signed tx to checked signed tx"); - debug!(?signed_tx, "sequencer mock received signed transaction"); - - signed_tx -} - -fn rollup_id_nonce_from_request(request: &Request) -> (RollupId, u32) { - let signed_tx = signed_tx_from_request(request); - - // validate that the transaction's first action is a sequence action - let Some(sent_action) = signed_tx.actions().first() else { - panic!("received transaction contained no actions"); - }; - let Some(sequence_action) = sent_action.as_sequence() else { - panic!("mocked sequencer expected a sequence action"); - }; - - ( - sequence_action.rollup_id, - signed_tx.unsigned_transaction().nonce, - ) -} - -// A Uniswap V2 DAI-ETH swap transaction from mainnet -// Etherscan link: https://etherscan.io/tx/0x99850dd1cf325c8ede9ba62b9d8a11aa199794450b581ce3a7bb8c1e5bb7562f -const TEST_ETH_TX_JSON: &str = r#"{"blockHash":"0xe365f2163edb844b617ebe3d2af183b31d6c7ffa794f21d0b2d111d63e979a02","blockNumber":"0x1157959","from":"0xdc975a9bb00f4c030e4eb3268f68e4b8d0fa0362","gas":"0xcdf49","gasPrice":"0x374128344","maxFeePerGas":"0x374128344","maxPriorityFeePerGas":"0x0","hash":"0x99850dd1cf325c8ede9ba62b9d8a11aa199794450b581ce3a7bb8c1e5bb7562f","input":"0x022c0d9f0000000000000000000000000000000000000000000000c88a1ad5e15105525500000000000000000000000000000000000000000000000000000000000000000000000000000000000000001a2d11cb90d1de13bb81ee7b772a08ac234a8058000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000000001208974000000000000000000000000000000000000000000000000000000004de4000000000000000000000000000000000000000000000000017038152c223cb100000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000005200000000000000000000000000000000000000000000000000000000000000000000000000000000000000087870bca3f3fd6335c3f4ce8392d69350b4fa4e2000000000000000000000000ab12275f2d91f87b301a4f01c9af4e83b3f45baa0000000000000000000000006b175474e89094c44da98b954eedeac495271d0f000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2","nonce":"0x28","to":"0xa478c2975ab1ea89e8196811f51a7b7ade33eb11","transactionIndex":"0x2","value":"0x0","type":"0x2","accessList":[{"address":"0x5f4ec3df9cbd43714fe2740f5e3616155c5b8419","storageKeys":["0x0000000000000000000000000000000000000000000000000000000000000005","0x0000000000000000000000000000000000000000000000000000000000000002"]},{"address":"0x7effd7b47bfd17e52fb7559d3f924201b9dbff3d","storageKeys":[]},{"address":"0x018008bfb33d285247a21d44e50697654f754e63","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc"]},{"address":"0x1a2d11cb90d1de13bb81ee7b772a08ac234a8058","storageKeys":[]},{"address":"0xe62b71cf983019bff55bc83b48601ce8419650cc","storageKeys":["0x9a09f352b299559621084d9b8d2625e8d5a97f382735872dd3bb1bdbdccc3fee","0x000000000000000000000000000000000000000000000000000000000000002b","0xfee3a99380070b792e111dd9a6a15e929983e2d0b7e170a5520e51b99be0c359"]},{"address":"0x87870bca3f3fd6335c3f4ce8392d69350b4fa4e2","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x070a95ec3546cae47592e0bcea195bf8f96287077fbb7a23785cc2887152941c","0x070a95ec3546cae47592e0bcea195bf8f96287077fbb7a23785cc28871529420","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec6","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4b","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ebf","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec0","0x4c0bd942d17410ca1f6d3278a62feef7078602605466e37de958808f1454efbd","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e48","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec3","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4f","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4a","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e50","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4d","0x4cb2b152c1b54ce671907a93c300fd5aa72383a9d4ec19a81e3333632ae92e00","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec4","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec7","0x4bea7244bd9088ac961c659a818b4f060de9712d20dc006c24f0985f19cf62d1","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e49","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec2","0x070a95ec3546cae47592e0bcea195bf8f96287077fbb7a23785cc2887152941d","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4c","0x5e14560e314427eb9d0c466a6058089f672317c8e26719a770a709c3f2481e4e","0x4480713a5820391a4815a640728dab70c3847e45854ef9e8117382da26ce9105","0x070a95ec3546cae47592e0bcea195bf8f96287077fbb7a23785cc2887152941f","0x000000000000000000000000000000000000000000000000000000000000003b","0x108718ddd11d4cf696a068770009c44aef387eb858097a37824291f99278d5e3","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec1","0xf81d8d79f42adb4c73cc3aa0c78e25d3343882d0313c0b80ece3d3a103ef1ec5"]},{"address":"0x2f39d218133afab8f2b819b1066c7e434ad94e9e","storageKeys":["0x740f710666bd7a12af42df98311e541e47f7fd33d382d11602457a6d540cbd63","0x0d2c1bcee56447b4f46248272f34207a580a5c40f666a31f4e2fbb470ea53ab8"]},{"address":"0xe7b67f44ea304dd7f6d215b13686637ff64cd2b2","storageKeys":[]},{"address":"0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2","storageKeys":["0x7f6377583d24615ddfe989626525aeed0d158f924ee8c91664ab0dffd7863d00","0x3afb575d989d656a39ee0690da12b019915f3bd8709cc522e681b8dd04237970","0xa535fbd0ab3e0ad4ee444570368f3d474545b71fcc49228fe96a6406676fc126","0xb064600732a82908427d092d333e607598a6238a59aeb45e1288cb0bac7161cf"]},{"address":"0x4d5f47fa6a74757f35c14fd3a6ef8e3c9bc514e8","storageKeys":["0x000000000000000000000000000000000000000000000000000000000000003c","0x14a553e31736f19e3e380cf55bfb2f82dfd6d880cd07235affb68d8d3e0cac4d","0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x5e8cc6ee686108b7fd15638e2dbb32555b30d0bd1a191628bb70b5459b86cedc","0x000000000000000000000000000000000000000000000000000000000000003d","0x0000000000000000000000000000000000000000000000000000000000000036","0x0000000000000000000000000000000000000000000000000000000000000039"]},{"address":"0x6b175474e89094c44da98b954eedeac495271d0f","storageKeys":["0xd86cc1e239204d48eb0055f151744c4bb3d2337612287be803ae8247e95a67d2","0xe7ab5c3b3c86286a122f1937d4c70a3170dba7ef4f7603d830e8bcf7c9af583b","0x87c358b8e65d7446f52ffce25e44c9673d2bf461b3d3e4748afcf1238e9224a3","0xad740bfd58072c0bd719418966c52da18e837afec1b47e07bba370568cc87fbb"]},{"address":"0xe175de51f29d822b86e46a9a61246ec90631210d","storageKeys":[]},{"address":"0xcf8d0c70c850859266f5c338b38f9d663181c314","storageKeys":["0x0000000000000000000000000000000000000000000000000000000000000037","0x000000000000000000000000000000000000000000000000000000000000003d","0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x000000000000000000000000000000000000000000000000000000000000003a","0x4bea7244bd9088ac961c659a818b4f060de9712d20dc006c24f0985f19cf62d1"]},{"address":"0x413adac9e2ef8683adf5ddaece8f19613d60d1bb","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x000000000000000000000000000000000000000000000000000000000000003f","0x000000000000000000000000000000000000000000000000000000000000003a","0x4bea7244bd9088ac961c659a818b4f060de9712d20dc006c24f0985f19cf62d1"]},{"address":"0xaed0c38402a5d19df6e4c03f4e2dced6e29c1ee9","storageKeys":["0x0000000000000000000000000000000000000000000000000000000000000005","0x0000000000000000000000000000000000000000000000000000000000000002"]},{"address":"0xea51d7853eefb32b6ee06b1c12e6dcca88be0ffe","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x000000000000000000000000000000000000000000000000000000000000003a"]},{"address":"0x54586be62e3c3580375ae3723c145253060ca0c2","storageKeys":["0x7145bb02480b505fc02ccfdba07d3ba3a9d821606f0688263abedd0ac6e5bec5","0x2a11cb67ca5c7e99dba99b50e02c11472d0f19c22ed5af42a1599a7f57e1c7a4","0x5306b8fbe80b30a74098357ee8e26fad8dc069da9011cca5f0870a0a5982e541"]},{"address":"0x478238a1c8b862498c74d0647329aef9ea6819ed","storageKeys":["0x9ef04667c5a1bd8192837ceac2ad5f2c41549d4db3406185e8c6aa95ea557bc5","0x000000000000000000000000000000000000000000000000000000000000002b","0x0020b304a2489d03d215fadd3bb6d3de2dda5a6a1235e76d693c30263e3cd054"]},{"address":"0xa700b4eb416be35b2911fd5dee80678ff64ff6c9","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x5e8cc6ee686108b7fd15638e2dbb32555b30d0bd1a191628bb70b5459b86cedc"]},{"address":"0x8164cc65827dcfe994ab23944cbc90e0aa80bfcb","storageKeys":["0x76f8b43dabb591eb6681562420f7f6aa393e6903d4e02e6f59e2957d94ceab20","0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x176062dac4e737f036c34baf4b07185f9c9fd3c1337ca36eb7c1f7a74aedb8ea"]},{"address":"0x9a158802cd924747ef336ca3f9de3bdb60cf43d3","storageKeys":[]},{"address":"0xac725cb59d16c81061bdea61041a8a5e73da9ec6","storageKeys":[]},{"address":"0x15c5620dffac7c7366eed66c20ad222ddbb1ed57","storageKeys":[]},{"address":"0x547a514d5e3769680ce22b2361c10ea13619e8a9","storageKeys":["0x0000000000000000000000000000000000000000000000000000000000000005","0x0000000000000000000000000000000000000000000000000000000000000002"]},{"address":"0x8116b273cd75d79c382afacc706659ded5e0a59d","storageKeys":["0x0fb35ae12d348b84dc0910bcce7d3b0a3f6d23a3e1d0b53bbe5f135078b97b13","0x000000000000000000000000000000000000000000000000000000000000002b","0x1d90d8e683e6736ac0564a19732a642e4be100e7ee8c225feba909bbdaf1522b"]},{"address":"0x9f8ccdafcc39f3c7d6ebf637c9151673cbc36b88","storageKeys":[]},{"address":"0xa478c2975ab1ea89e8196811f51a7b7ade33eb11","storageKeys":["0x0000000000000000000000000000000000000000000000000000000000000007","0x0000000000000000000000000000000000000000000000000000000000000009","0x000000000000000000000000000000000000000000000000000000000000000a","0x000000000000000000000000000000000000000000000000000000000000000c","0x0000000000000000000000000000000000000000000000000000000000000008","0x0000000000000000000000000000000000000000000000000000000000000006"]},{"address":"0xf1cd4193bbc1ad4a23e833170f49d60f3d35a621","storageKeys":[]},{"address":"0x102633152313c81cd80419b6ecf66d14ad68949a","storageKeys":["0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc","0x000000000000000000000000000000000000000000000000000000000000003f","0x000000000000000000000000000000000000000000000000000000000000003a"]},{"address":"0xb02381b1d27aa9845e5012083ca288c1818884f0","storageKeys":[]}],"chainId":"0x1","v":"0x0","r":"0xcb4eccf09e298388220c5560a6539322bde17581cee6908d56a92a19575e28e2","s":"0x2b4e34adad48aee14b6600c6366ad683c00c63c9da88fc2a232308421cf69a21"}"#; diff --git a/crates/astria-composer/tests/blackbox/helper/mod.rs b/crates/astria-composer/tests/blackbox/helper/mod.rs index ecdbf99c6e..d447e42295 100644 --- a/crates/astria-composer/tests/blackbox/helper/mod.rs +++ b/crates/astria-composer/tests/blackbox/helper/mod.rs @@ -24,9 +24,6 @@ use tendermint_rpc::{ }; use test_utils::mock::Geth; use tokio::task::JoinHandle; -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD use tracing::debug; use wiremock::{ Mock, @@ -35,14 +32,6 @@ use wiremock::{ Request, ResponseTemplate, }; -======= -======= -use tracing::debug; ->>>>>>> 2029427 (move executor and geth collectors to the composer) -======= ->>>>>>> 715a738 (add gRPC collector) -use wiremock::MockGuard; ->>>>>>> ce2b9a8 (add gRPC collector) pub mod mock_sequencer; @@ -106,38 +95,12 @@ pub async fn spawn_composer(rollup_ids: &[&str]) -> TestComposer { no_metrics: true, metrics_http_listener_addr: String::new(), pretty_print: true, -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD - grpc_addr: "127.0.0.1:0".parse().unwrap(), -======= - grpc_collector_addr: "127.0.0.1:0".parse().unwrap(), ->>>>>>> ce2b9a8 (add gRPC collector) -======= ->>>>>>> 2029427 (move executor and geth collectors to the composer) -======= - grpc_collector_addr: "127.0.0.1:0".parse().unwrap(), ->>>>>>> 715a738 (add gRPC collector) -======= grpc_addr: "127.0.0.1:0".parse().unwrap(), ->>>>>>> c23f6ce (add GRPC composer addr to composer configmap in smoke tests) }; let (composer_addr, grpc_collector_addr, composer_handle) = { let composer = Composer::from_config(&config).await.unwrap(); let composer_addr = composer.local_addr(); -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD let grpc_collector_addr = composer.grpc_local_addr().unwrap(); -======= - let grpc_collector_addr = composer.grpc_collector_local_addr().unwrap(); ->>>>>>> ce2b9a8 (add gRPC collector) -======= ->>>>>>> 2029427 (move executor and geth collectors to the composer) -======= - let grpc_collector_addr = composer.grpc_collector_local_addr().unwrap(); ->>>>>>> 715a738 (add gRPC collector) let task = tokio::spawn(composer.run_until_stopped()); (composer_addr, grpc_collector_addr, task) };