diff --git a/crates/astria-sequencer-relayer/Cargo.toml b/crates/astria-sequencer-relayer/Cargo.toml index 84abef63b7..0178197ffe 100644 --- a/crates/astria-sequencer-relayer/Cargo.toml +++ b/crates/astria-sequencer-relayer/Cargo.toml @@ -69,7 +69,7 @@ jsonrpsee = { workspace = true, features = ["server"] } once_cell = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["test-util"] } -tokio-stream = { workspace = true } +tokio-stream = { workspace = true, features = ["net"] } wiremock = { workspace = true } assert-json-diff = "2.0.2" tower-http = { version = "0.4", features = ["auth"] } diff --git a/crates/astria-sequencer-relayer/src/lib.rs b/crates/astria-sequencer-relayer/src/lib.rs index 5e86ffa1cf..5f5433ff66 100644 --- a/crates/astria-sequencer-relayer/src/lib.rs +++ b/crates/astria-sequencer-relayer/src/lib.rs @@ -9,4 +9,7 @@ pub(crate) mod validator; pub use build_info::BUILD_INFO; pub use config::Config; -pub use sequencer_relayer::SequencerRelayer; +pub use sequencer_relayer::{ + SequencerRelayer, + ShutdownHandle, +}; diff --git a/crates/astria-sequencer-relayer/src/main.rs b/crates/astria-sequencer-relayer/src/main.rs index 45720ab794..fde982f4dc 100644 --- a/crates/astria-sequencer-relayer/src/main.rs +++ b/crates/astria-sequencer-relayer/src/main.rs @@ -7,7 +7,15 @@ use astria_sequencer_relayer::{ SequencerRelayer, BUILD_INFO, }; -use tracing::info; +use tokio::signal::unix::{ + signal, + SignalKind, +}; +use tracing::{ + error, + info, + warn, +}; #[tokio::main] async fn main() -> ExitCode { @@ -44,10 +52,28 @@ async fn main() -> ExitCode { "initializing sequencer relayer" ); - SequencerRelayer::new(cfg) - .expect("could not initialize sequencer relayer") - .run() - .await; + let mut sigterm = signal(SignalKind::terminate()) + .expect("setting a SIGTERM listener should always work on Unix"); + let (sequencer_relayer, shutdown_handle) = + SequencerRelayer::new(cfg).expect("could not initialize sequencer relayer"); + let sequencer_relayer_handle = tokio::spawn(sequencer_relayer.run()); + + let shutdown_token = shutdown_handle.token(); + tokio::select!( + _ = sigterm.recv() => { + // We don't care about the result (i.e. whether there could be more SIGTERM signals + // incoming); we just want to shut down as soon as we receive the first `SIGTERM`. + info!("received SIGTERM, issuing shutdown to all services"); + shutdown_handle.shutdown(); + } + () = shutdown_token.cancelled() => { + warn!("stopped waiting for SIGTERM"); + } + ); + + if let Err(error) = sequencer_relayer_handle.await { + error!(%error, "failed to join main sequencer relayer task"); + } ExitCode::SUCCESS } diff --git a/crates/astria-sequencer-relayer/src/sequencer_relayer.rs b/crates/astria-sequencer-relayer/src/sequencer_relayer.rs index 270a7a78a8..8183640d85 100644 --- a/crates/astria-sequencer-relayer/src/sequencer_relayer.rs +++ b/crates/astria-sequencer-relayer/src/sequencer_relayer.rs @@ -9,10 +9,6 @@ use astria_eyre::eyre::{ }; use tokio::{ select, - signal::unix::{ - signal, - SignalKind, - }, sync::oneshot, task::{ JoinError, @@ -47,8 +43,8 @@ impl SequencerRelayer { /// # Errors /// /// Returns an error if constructing the inner relayer type failed. - pub fn new(cfg: Config) -> eyre::Result { - let shutdown_token = CancellationToken::new(); + pub fn new(cfg: Config) -> eyre::Result<(Self, ShutdownHandle)> { + let shutdown_handle = ShutdownHandle::new(); let Config { cometbft_endpoint, sequencer_grpc_endpoint, @@ -65,7 +61,7 @@ impl SequencerRelayer { let validator_key_path = relay_only_validator_key_blocks.then_some(validator_key_file); let relayer = relayer::Builder { - shutdown_token: shutdown_token.clone(), + shutdown_token: shutdown_handle.token(), celestia_endpoint, celestia_bearer_token, cometbft_endpoint, @@ -83,22 +79,19 @@ impl SequencerRelayer { format!("failed to parse provided `api_addr` string as socket address: `{api_addr}`",) })?; let api_server = api::start(api_socket_addr, state_rx); - Ok(Self { + let relayer = Self { api_server, relayer, - shutdown_token, - }) + shutdown_token: shutdown_handle.token(), + }; + Ok((relayer, shutdown_handle)) } pub fn local_addr(&self) -> SocketAddr { self.api_server.local_addr() } - /// Run Sequencer Relayer. - /// - /// # Panics - /// Panics if a signal listener could not be constructed (usually if this binary is not run on a - /// Unix). + /// Runs Sequencer Relayer. pub async fn run(self) { let Self { api_server, @@ -121,16 +114,7 @@ impl SequencerRelayer { let mut relayer_task = tokio::spawn(relayer.run()); info!("spawned relayer task"); - let mut sigterm = signal(SignalKind::terminate()).expect( - "setting a SIGTERM listener should always work on unix; is this running on Unix?", - ); - let shutdown = select!( - _ = sigterm.recv() => { - info!("received SIGTERM, issuing shutdown to all services"); - ShutDown { api_task: Some(api_task), relayer_task: Some(relayer_task), api_shutdown_signal, shutdown_token } - }, - o = &mut api_task => { report_exit("api server", o); ShutDown { api_task: None, relayer_task: Some(relayer_task), api_shutdown_signal, shutdown_token } @@ -145,6 +129,44 @@ impl SequencerRelayer { } } +/// A handle for instructing the [`SequencerRelayer`] to shut down. +/// +/// It is returned along with its related `SequencerRelayer` from [`SequencerRelayer::new`]. The +/// `SequencerRelayer` will begin to shut down as soon as [`ShutdownHandle::shutdown`] is called or +/// when the `ShutdownHandle` is dropped. +pub struct ShutdownHandle { + token: CancellationToken, +} + +impl ShutdownHandle { + #[must_use] + fn new() -> Self { + Self { + token: CancellationToken::new(), + } + } + + /// Returns a clone of the wrapped cancellation token. + #[must_use] + pub fn token(&self) -> CancellationToken { + self.token.clone() + } + + /// Consumes `self` and cancels the wrapped cancellation token. + pub fn shutdown(self) { + self.token.cancel(); + } +} + +impl Drop for ShutdownHandle { + fn drop(&mut self) { + if !self.token.is_cancelled() { + info!("shutdown handle dropped, issuing shutdown to all services"); + } + self.token.cancel(); + } +} + fn report_exit(task_name: &str, outcome: Result, JoinError>) { match outcome { Ok(Ok(())) => info!(task = task_name, "task has exited"), @@ -180,18 +202,21 @@ impl ShutDown { // Giving relayer 25 seconds to shutdown because Kubernetes issues a SIGKILL after 30. if let Some(mut relayer_task) = relayer_task { info!("waiting for relayer task to shut down"); - match timeout(Duration::from_secs(25), &mut relayer_task) + let limit = Duration::from_secs(25); + match timeout(limit, &mut relayer_task) .await .map(crate::utils::flatten) { Ok(Ok(())) => info!("relayer exited gracefully"), Ok(Err(error)) => error!(%error, "relayer exited with an error"), Err(_) => { - error!("relayer did not shut down after 25 seconds; killing it"); + error!( + timeout_secs = limit.as_secs(), + "relayer did not shut down within timeout; killing it" + ); relayer_task.abort(); } } - info!("sending shutdown signal to API server"); } else { info!("relayer task was already dead"); } @@ -200,14 +225,18 @@ impl ShutDown { if let Some(mut api_task) = api_task { info!("sending shutdown signal to API server"); let _ = api_shutdown_signal.send(()); - match timeout(Duration::from_secs(4), &mut api_task) + let limit = Duration::from_secs(4); + match timeout(limit, &mut api_task) .await .map(crate::utils::flatten) { - Ok(Ok(())) => info!("api server exited gracefully"), - Ok(Err(error)) => error!(%error, "api server exited with an error"), + Ok(Ok(())) => info!("API server exited gracefully"), + Ok(Err(error)) => error!(%error, "API server exited with an error"), Err(_) => { - error!("api server did not shut down after 25 seconds; killing it"); + error!( + timeout_secs = limit.as_secs(), + "API server did not shut down within timeout; killing it" + ); api_task.abort(); } } diff --git a/crates/astria-sequencer-relayer/tests/blackbox/helper.rs b/crates/astria-sequencer-relayer/tests/blackbox/helper.rs index 3880585176..6844a5307e 100644 --- a/crates/astria-sequencer-relayer/tests/blackbox/helper.rs +++ b/crates/astria-sequencer-relayer/tests/blackbox/helper.rs @@ -1,10 +1,12 @@ use std::{ collections::VecDeque, + mem, net::SocketAddr, sync::{ Arc, Mutex, }, + time::Duration, }; use assert_json_diff::assert_json_include; @@ -27,6 +29,7 @@ use astria_core::{ use astria_sequencer_relayer::{ config::Config, SequencerRelayer, + ShutdownHandle, }; use celestia_client::celestia_types::{ blob::SubmitOptions, @@ -43,11 +46,16 @@ use tendermint_rpc::{ }; use tokio::{ net::TcpListener, + runtime::{ + self, + RuntimeFlavor, + }, sync::{ mpsc, oneshot, }, task::JoinHandle, + time::timeout, }; use tonic::{ Request, @@ -164,6 +172,8 @@ pub struct TestSequencerRelayer { pub sequencer: JoinHandle<()>, + /// A handle which issues a shutdown to the sequencer relayer on being dropped. + pub relayer_shutdown_handle: Option, pub sequencer_relayer: JoinHandle<()>, pub config: Config, @@ -180,7 +190,18 @@ pub struct TestSequencerRelayer { impl Drop for TestSequencerRelayer { fn drop(&mut self) { - self.sequencer_relayer.abort(); + // We drop the shutdown handle here to cause the sequencer relayer to shut down. + let _ = self.relayer_shutdown_handle.take(); + + let sequencer_relayer = mem::replace(&mut self.sequencer_relayer, tokio::spawn(async {})); + let _ = futures::executor::block_on(async move { + timeout(Duration::from_secs(30), sequencer_relayer) + .await + .unwrap_or_else(|_| { + panic!("timed out waiting for sequencer relayer to shut down"); + }) + }); + self.sequencer.abort(); self.celestia.server_handle.stop().unwrap(); } @@ -305,6 +326,12 @@ pub struct TestSequencerRelayerConfig { impl TestSequencerRelayerConfig { pub async fn spawn_relayer(self) -> TestSequencerRelayer { + assert_ne!( + runtime::Handle::current().runtime_flavor(), + RuntimeFlavor::CurrentThread, + "the sequencer relayer must be run on a multi-threaded runtime, e.g. the test could \ + be configured using `#[tokio::test(flavor = \"multi_thread\", worker_threads = 1)]`" + ); Lazy::force(&TELEMETRY); let mut celestia = MockCelestia::start().await; @@ -379,7 +406,8 @@ impl TestSequencerRelayerConfig { }; info!(config = serde_json::to_string(&config).unwrap()); - let sequencer_relayer = SequencerRelayer::new(config.clone()).unwrap(); + let (sequencer_relayer, relayer_shutdown_handle) = + SequencerRelayer::new(config.clone()).unwrap(); let api_address = sequencer_relayer.local_addr(); let sequencer_relayer = tokio::task::spawn(sequencer_relayer.run()); @@ -390,6 +418,7 @@ impl TestSequencerRelayerConfig { sequencer, sequencer_server_blocks, cometbft, + relayer_shutdown_handle: Some(relayer_shutdown_handle), sequencer_relayer, signing_key, account: address, diff --git a/crates/astria-sequencer-relayer/tests/blackbox/main.rs b/crates/astria-sequencer-relayer/tests/blackbox/main.rs index 016c4c41f5..af6aa0de8e 100644 --- a/crates/astria-sequencer-relayer/tests/blackbox/main.rs +++ b/crates/astria-sequencer-relayer/tests/blackbox/main.rs @@ -16,7 +16,7 @@ use tokio::time::{ const RELAY_SELF: bool = true; const RELAY_ALL: bool = false; -#[tokio::test(flavor = "current_thread")] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn report_degraded_if_block_fetch_fails() { let mut sequencer_relayer = TestSequencerRelayerConfig { relay_only_self: false, @@ -62,7 +62,7 @@ async fn report_degraded_if_block_fetch_fails() { ), ) .await - .expect("requesting abci info and block must have occured") + .expect("requesting abci info and block must have occurred") .1 .unwrap(); @@ -82,7 +82,7 @@ async fn report_degraded_if_block_fetch_fails() { ); } -#[tokio::test(flavor = "current_thread")] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn one_block_is_relayed_to_celestia() { let mut sequencer_relayer = TestSequencerRelayerConfig { relay_only_self: false, @@ -119,7 +119,7 @@ async fn one_block_is_relayed_to_celestia() { assert_eq!(blobs_seen_by_celestia.len(), 2); } -#[tokio::test(flavor = "current_thread")] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn later_height_in_state_leads_to_expected_relay() { let mut sequencer_relayer = TestSequencerRelayerConfig { relay_only_self: false, @@ -162,7 +162,7 @@ async fn later_height_in_state_leads_to_expected_relay() { sequencer_relayer.assert_state_files_are_as_expected(6, 6); } -#[tokio::test(flavor = "current_thread")] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn three_blocks_are_relayed() { let mut sequencer_relayer = TestSequencerRelayerConfig { relay_only_self: false, @@ -213,7 +213,7 @@ async fn three_blocks_are_relayed() { ); } -#[tokio::test(flavor = "current_thread")] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn block_from_other_proposer_is_skipped() { let mut sequencer_relayer = TestSequencerRelayerConfig { relay_only_self: true,