From 708e35f8a3786018e1b253647a3bc0eeed132c36 Mon Sep 17 00:00:00 2001 From: Bharath Date: Wed, 3 Apr 2024 14:57:34 +0530 Subject: [PATCH] implement graceful shutdown --- Cargo.lock | 1 + crates/astria-composer/Cargo.toml | 2 + crates/astria-composer/src/collectors/geth.rs | 122 +++++++--- crates/astria-composer/src/composer.rs | 219 ++++++++++++++++-- crates/astria-composer/src/executor/mod.rs | 98 +++++++- crates/astria-composer/src/executor/tests.rs | 7 + crates/astria-composer/src/grpc.rs | 11 +- 7 files changed, 409 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6092b3ff35..f20e88eefe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -560,6 +560,7 @@ dependencies = [ "humantime", "hyper", "insta", + "itertools 0.11.0", "once_cell", "pin-project-lite", "prost", diff --git a/crates/astria-composer/Cargo.toml b/crates/astria-composer/Cargo.toml index 0358d84c92..f04d69e367 100644 --- a/crates/astria-composer/Cargo.toml +++ b/crates/astria-composer/Cargo.toml @@ -46,6 +46,7 @@ tokio = { workspace = true, features = [ "rt-multi-thread", "sync", "time", + "signal", ] } tokio-util = { workspace = true, features = ["rt"] } tracing = { workspace = true, features = ["attributes"] } @@ -53,6 +54,7 @@ tryhard = { workspace = true } tonic = { workspace = true } tokio-stream = { workspace = true, features = ["net"] } tonic-health = "0.10.2" +itertools = "0.11.0" [dependencies.sequencer-client] package = "astria-sequencer-client" diff --git a/crates/astria-composer/src/collectors/geth.rs b/crates/astria-composer/src/collectors/geth.rs index 273321e2ae..5674fe875f 100644 --- a/crates/astria-composer/src/collectors/geth.rs +++ b/crates/astria-composer/src/collectors/geth.rs @@ -20,6 +20,7 @@ use astria_core::sequencer::v1::{ }; use astria_eyre::eyre::{ self, + eyre, WrapErr as _, }; use ethers::providers::{ @@ -27,12 +28,17 @@ use ethers::providers::{ ProviderError, Ws, }; -use tokio::sync::{ - mpsc::error::SendTimeoutError, - watch, +use tokio::{ + select, + sync::{ + mpsc::error::SendTimeoutError, + watch, + }, }; +use tokio_util::sync::CancellationToken; use tracing::{ debug, + error, info, instrument, warn, @@ -59,6 +65,8 @@ pub(crate) struct Geth { status: watch::Sender, /// Rollup URL url: String, + // Token to signal the geth collector to stop upon shutdown. + shutdown_token: CancellationToken, } #[derive(Debug)] @@ -80,7 +88,12 @@ impl Status { impl Geth { /// Initializes a new collector instance - pub(crate) fn new(chain_name: String, url: String, executor_handle: executor::Handle) -> Self { + pub(crate) fn new( + chain_name: String, + url: String, + executor_handle: executor::Handle, + shutdown_token: CancellationToken, + ) -> Self { let (status, _) = watch::channel(Status::new()); let rollup_id = RollupId::from_unhashed_bytes(&chain_name); info!( @@ -94,6 +107,7 @@ impl Geth { executor_handle, status, url, + shutdown_token, } } @@ -116,6 +130,7 @@ impl Geth { executor_handle, status, url, + shutdown_token, .. } = self; @@ -153,39 +168,82 @@ impl Geth { .await .wrap_err("failed to subscribe eth client to full pending transactions")?; + let send_timeout_duration = Duration::from_millis(500); + status.send_modify(|status| status.is_connected = true); - while let Some(tx) = tx_stream.next().await { - let tx_hash = tx.hash; - debug!(transaction.hash = %tx_hash, "collected transaction from rollup"); - let data = tx.rlp().to_vec(); - let seq_action = SequenceAction { - rollup_id, - data, - fee_asset_id: default_native_asset_id(), - }; - - match executor_handle - .send_timeout(seq_action, Duration::from_millis(500)) - .await - { - Ok(()) => {} - Err(SendTimeoutError::Timeout(_seq_action)) => { - warn!( - transaction.hash = %tx_hash, - "timed out sending new transaction to executor after 500ms; dropping tx" - ); + let reason = loop { + select! { + biased; + () = shutdown_token.cancelled() => { + info!("shutdown signal received"); + break Ok("shutdown signal received"); + }, + tx_res = tx_stream.next() => { + if let Some(tx) = tx_res { + let tx_hash = tx.hash; + debug!(transaction.hash = %tx_hash, "collected transaction from rollup"); + let data = tx.rlp().to_vec(); + let seq_action = SequenceAction { + rollup_id, + data, + fee_asset_id: default_native_asset_id(), + }; + + match executor_handle + .send_timeout(seq_action, send_timeout_duration) + .await + { + Ok(()) => {} + Err(SendTimeoutError::Timeout(_seq_action)) => { + warn!( + transaction.hash = %tx_hash, + timeout_ms = send_timeout_duration.as_millis(), + "timed out sending new transaction to executor; dropping tx", + ); + } + Err(SendTimeoutError::Closed(_seq_action)) => { + warn!( + transaction.hash = %tx_hash, + "executor channel closed while sending transaction; dropping transaction \ + and exiting event loop" + ); + break Err(eyre!("executor channel closed while sending transaction")); + } + } + } else { + break Err(eyre!("geth tx stream ended")); + } } - Err(SendTimeoutError::Closed(_seq_action)) => { - warn!( - transaction.hash = %tx_hash, - "executor channel closed while sending transaction; dropping transaction \ - and exiting event loop" - ); - break; + } + }; + + match &reason { + Ok(reason) => { + info!(reason, "starting shutdown"); + } + Err(reason) => { + error!(%reason, "starting shutdown"); + } + }; + + debug!("sending connection status to false"); + status.send_modify(|status| status.is_connected = false); + + debug!("unsubscribing from geth tx stream"); + match tx_stream.unsubscribe().await { + Ok(res) => { + if res { + info!("unsubscribed from geth tx stream"); + } else { + warn!("failed to unsubscribe from geth tx stream"); } } + Err(err) => { + error!(%err, "errored out while unsubscribing from geth tx stream"); + } } - Ok(()) + + reason.map(|_| ()) } } diff --git a/crates/astria-composer/src/composer.rs b/crates/astria-composer/src/composer.rs index f23b1d2b8e..6101a3b691 100644 --- a/crates/astria-composer/src/composer.rs +++ b/crates/astria-composer/src/composer.rs @@ -1,21 +1,35 @@ use std::{ collections::HashMap, net::SocketAddr, + time::Duration, }; use astria_eyre::eyre::{ self, WrapErr as _, }; +use itertools::Itertools; use tokio::{ io, + signal::unix::{ + signal, + SignalKind, + }, sync::watch, - task::JoinError, + task::{ + JoinError, + JoinHandle, + }, + time::timeout, +}; +use tokio_util::{ + sync::CancellationToken, + task::JoinMap, }; -use tokio_util::task::JoinMap; use tracing::{ error, info, + warn, }; use crate::{ @@ -61,6 +75,8 @@ pub struct Composer { /// The gRPC server that listens for incoming requests from the collectors via the /// GrpcCollector service. It also exposes a health service. grpc_server: GrpcServer, + /// Used to signal the Composer to shut down. + shutdown_token: CancellationToken, } /// Announces the current status of the Composer for other modules in the crate to use @@ -93,17 +109,24 @@ impl Composer { /// See `[from_config]` for its error scenarios. pub async fn from_config(cfg: &Config) -> eyre::Result { let (composer_status_sender, _) = watch::channel(Status::default()); + let shutdown_token = CancellationToken::new(); + let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, + shutdown_token.clone(), ) .wrap_err("executor construction from config failed")?; - let grpc_server = GrpcServer::new(cfg.grpc_addr, executor_handle.clone()) - .await - .wrap_err("failed to create grpc server")?; + let grpc_server = GrpcServer::new( + cfg.grpc_addr, + executor_handle.clone(), + shutdown_token.clone(), + ) + .await + .wrap_err("grpc collector construction from config failed")?; info!( listen_addr = %grpc_server.local_addr().wrap_err("grpc server listener not bound")?, @@ -128,8 +151,12 @@ impl Composer { let geth_collectors = rollups .iter() .map(|(rollup_name, url)| { - let collector = - Geth::new(rollup_name.clone(), url.clone(), executor_handle.clone()); + let collector = Geth::new( + rollup_name.clone(), + url.clone(), + executor_handle.clone(), + shutdown_token.clone(), + ); (rollup_name.clone(), collector) }) .collect::>(); @@ -149,6 +176,7 @@ impl Composer { geth_collector_statuses, geth_collector_tasks: JoinMap::new(), grpc_server, + shutdown_token, }) } @@ -168,6 +196,9 @@ impl Composer { /// /// # Errors /// It errors out if the API Server, Executor or any of the Geth Collectors fail to start. + /// + /// # Panics + /// It panics if the Composer cannot set the SIGTERM listener. pub async fn run_until_stopped(self) -> eyre::Result<()> { let Self { api_server, @@ -179,11 +210,21 @@ impl Composer { rollups, mut geth_collector_statuses, grpc_server, + shutdown_token, } = self; + // we need the API server to shutdown at the end, since it is used by k8s + // to report the liveness of the service + let api_server_shutdown_token = CancellationToken::new(); + + let api_server_task_shutdown_token = api_server_shutdown_token.clone(); // run the api server - let mut api_task = - tokio::spawn(async move { api_server.await.wrap_err("api server ended unexpectedly") }); + let mut api_task = tokio::spawn(async move { + api_server + .with_graceful_shutdown(api_server_task_shutdown_token.cancelled()) + .await + .wrap_err("api server ended unexpectedly") + }); // run the collectors and executor for (chain_id, collector) in geth_collectors.drain() { @@ -210,19 +251,57 @@ impl Composer { .wrap_err("grpc server failed") }); - loop { + let mut sigterm = signal(SignalKind::terminate()).expect( + "setting a SIGTERM listener should always work on unix; is this running on unix?", + ); + + let shutdown_info = loop { tokio::select!( + biased; + _ = sigterm.recv() => { + info!("received SIGTERM; shutting down"); + break ShutdownInfo { + api_server_shutdown_token, + composer_shutdown_token: shutdown_token, + api_server_task_handle: Some(api_task), + executor_task_handle: Some(executor_task), + grpc_server_task_handle: Some(grpc_server_handle), + geth_collector_tasks, + + }; + }, o = &mut api_task => { report_exit("api server unexpectedly ended", o); - return Ok(()); + break ShutdownInfo { + api_server_shutdown_token, + composer_shutdown_token: shutdown_token, + api_server_task_handle: None, + executor_task_handle: Some(executor_task), + grpc_server_task_handle: Some(grpc_server_handle), + geth_collector_tasks, + }; }, o = &mut executor_task => { report_exit("executor unexpectedly ended", o); - return Ok(()); + break ShutdownInfo { + api_server_shutdown_token, + composer_shutdown_token: shutdown_token, + api_server_task_handle: Some(api_task), + executor_task_handle: None, + grpc_server_task_handle: Some(grpc_server_handle), + geth_collector_tasks, + }; }, o = &mut grpc_server_handle => { report_exit("grpc server unexpectedly ended", o); - return Ok(()); + break ShutdownInfo { + api_server_shutdown_token, + composer_shutdown_token: shutdown_token, + api_server_task_handle: Some(api_task), + executor_task_handle: Some(executor_task), + grpc_server_task_handle: None, + geth_collector_tasks, + }; }, Some((rollup, collector_exit)) = geth_collector_tasks.join_next() => { reconnect_exited_collector( @@ -232,9 +311,112 @@ impl Composer { &rollups, rollup, collector_exit, + shutdown_token.clone() ); }); + }; + + shutdown_info.run().await + } +} + +struct ShutdownInfo { + api_server_shutdown_token: CancellationToken, + composer_shutdown_token: CancellationToken, + api_server_task_handle: Option>>, + executor_task_handle: Option>>, + grpc_server_task_handle: Option>>, + geth_collector_tasks: JoinMap>, +} + +impl ShutdownInfo { + async fn run(self) -> eyre::Result<()> { + let Self { + composer_shutdown_token, + api_server_shutdown_token, + api_server_task_handle, + executor_task_handle, + grpc_server_task_handle, + mut geth_collector_tasks, + } = self; + + // if the composer is shutting down because of an unexpected shutdown from any one of the + // components(and not because of a SIGTERM), we need to send the cancel signal to all + // the other components. + composer_shutdown_token.cancel(); + // k8s issues SIGKILL in 30s, so we need to make sure that the shutdown happens before 30s. + + // We give executor 14 seconds to shut down. We give executor more time to shut down because + // it has to drain all the remaining bundles and send them to the sequencer. + if let Some(executor_task_handle) = executor_task_handle { + match tokio::time::timeout(std::time::Duration::from_secs(14), executor_task_handle) + .await + .map(flatten) + { + Ok(Ok(())) => info!("executor shut down"), + Ok(Err(error)) => error!(%error, "executor shutdown with error"), + Err(error) => error!(%error, "executor failed to shutdown in time"), + } + } else { + info!("executor task was already dead"); + }; + + // We give the grpc server 5 seconds to shutdown. + if let Some(grpc_server_task_handle) = grpc_server_task_handle { + match tokio::time::timeout(std::time::Duration::from_secs(5), grpc_server_task_handle) + .await + .map(flatten) + { + Ok(Ok(())) => info!("grpc server shut down"), + Ok(Err(error)) => error!(%error, "grpc server shutdown with error"), + Err(error) => error!(%error, "grpc server failed to shut down in time"), + } + } else { + info!("grpc server task was already dead"); + }; + + let shutdown_loop = async { + while let Some((name, res)) = geth_collector_tasks.join_next().await { + let message = "task shut down"; + match flatten(res) { + Ok(()) => info!(name, message), + Err(error) => error!(name, %error, message), + } + } + }; + + // we give 5s to shut down all the other geth collectors. geth collectors shouldn't take too + // long to shutdown since they just need to unsubscribe to their WSS streams. + if timeout(Duration::from_secs(5), shutdown_loop) + .await + .is_err() + { + let tasks = geth_collector_tasks.keys().join(", "); + warn!( + tasks = format_args!("[{tasks}]"), + "aborting all geth collector tasks that have not yet shut down", + ); + geth_collector_tasks.abort_all(); + } else { + info!("all geth collector tasks shut down regularly"); } + + // cancel the api server at the end + api_server_shutdown_token.cancel(); + if let Some(api_server_task_handle) = api_server_task_handle { + match tokio::time::timeout(std::time::Duration::from_secs(5), api_server_task_handle) + .await + .map(flatten) + { + Ok(Ok(())) => info!("api server shut down"), + Ok(Err(error)) => error!(%error, "api server shutdown with error"), + Err(error) => error!(%error, "api server failed to shutdown in time"), + } + } else { + info!("api server task was already dead"); + }; + + Ok(()) } } @@ -301,6 +483,7 @@ pub(super) fn reconnect_exited_collector( rollups: &HashMap, rollup: String, exit_result: Result, JoinError>, + shutdown_token: CancellationToken, ) { report_exit("collector", exit_result); let Some(url) = rollups.get(&rollup) else { @@ -311,7 +494,7 @@ pub(super) fn reconnect_exited_collector( return; }; - let collector = Geth::new(rollup.clone(), url.clone(), executor_handle); + let collector = Geth::new(rollup.clone(), url.clone(), executor_handle, shutdown_token); collector_statuses.insert(rollup.clone(), collector.subscribe()); collector_tasks.spawn(rollup, collector.run_until_stopped()); } @@ -327,3 +510,11 @@ fn report_exit(task_name: &str, outcome: Result, JoinError>) { } } } + +pub(crate) fn flatten(res: Result, JoinError>) -> eyre::Result { + match res { + Ok(Ok(val)) => Ok(val), + Ok(Err(err)) => Err(err).wrap_err("task returned with error"), + Err(err) => Err(err).wrap_err("task panicked"), + } +} diff --git a/crates/astria-composer/src/executor/mod.rs b/crates/astria-composer/src/executor/mod.rs index cb35eccf49..ec10962e60 100644 --- a/crates/astria-composer/src/executor/mod.rs +++ b/crates/astria-composer/src/executor/mod.rs @@ -56,6 +56,7 @@ use tokio::{ Instant, }, }; +use tokio_util::sync::CancellationToken; use tracing::{ debug, error, @@ -103,6 +104,8 @@ pub(super) struct Executor { block_time: tokio::time::Duration, // Max bytes in a sequencer action bundle max_bytes_per_bundle: usize, + // Token to signal the executor to stop upon shutdown. + shutdown_token: CancellationToken, } #[derive(Clone)] @@ -157,6 +160,7 @@ impl Executor { private_key: &SecretString, block_time: u64, max_bytes_per_bundle: usize, + shutdown_token: CancellationToken, ) -> eyre::Result<(Self, Handle)> { let sequencer_client = sequencer_client::HttpClient::new(sequencer_url) .wrap_err("failed constructing sequencer client")?; @@ -182,6 +186,7 @@ impl Executor { address: sequencer_address, block_time: Duration::from_millis(block_time), max_bytes_per_bundle, + shutdown_token, }, Handle::new(serialized_rollup_transaction_tx), )) @@ -226,10 +231,14 @@ impl Executor { let reset_time = || Instant::now() + self.block_time; - loop { + let reason = loop { select! { biased; + () = self.shutdown_token.cancelled() => { + info!("shutdown signal received"); + break Ok("received shutdown signal"); + } // process submission result and update nonce rsp = &mut submission_fut, if !submission_fut.is_terminated() => { match rsp { @@ -276,7 +285,93 @@ impl Executor { } } } + }; + + match &reason { + Ok(reason) => { + info!(reason, "shutting down"); + } + Err(reason) => { + error!(%reason, "shutting down"); + } + }; + + let shutdown_logic = async { + info!("setting executor status to disconnected"); + self.status + .send_modify(|status| status.is_connected = false); + + info!("draining sequence actions from the executor receiver channel"); + // drain the receiver channel + while let Ok(seq_action) = self.serialized_rollup_transactions.try_recv() { + let rollup_id = seq_action.rollup_id; + if let Err(e) = bundle_factory.try_push(seq_action) { + warn!( + rollup_id = %rollup_id, + error = &e as &StdError, + "failed to bundle sequence action: too large. sequence action is dropped." + ); + } + } + + info!("draining remaining bundles from bundle factory"); + // when shutting down, drain all the remaining bundles and submit to the sequencer + // to avoid any bundle loss. + let mut bundles_to_drain: Vec = vec![]; + // drain all the bundles from the factory + loop { + let bundle = bundle_factory.pop_now(); + if bundle.is_empty() { + break; + } + bundles_to_drain.push(bundle); + } + let num_bundles_to_drain = bundles_to_drain.len(); + if num_bundles_to_drain == 0 { + info!("no bundles to drain; executor shutdown tasks completed"); + return Ok(()); + } + + info!( + "submitting {} bundles to sequencer to drain", + num_bundles_to_drain + ); + // we need to submit the bundles sequentially since that is the guarantee + // the bundle factory gives to the end users. + for bundle in bundles_to_drain { + match self.submit_bundle(nonce, bundle).await { + Ok(new_nonce) => { + debug!("drained bundle successfully; new nonce: {}", new_nonce); + nonce = new_nonce; + } + Err(e) => { + error!(%e, "failed submitting bundle to sequencer during shutdown; \ + aborting shutdown"); + // if we can't drain a bundle after multiple retries, we can abort + // the shutdown process + return Err(e); + } + } + } + info!("drained {} bundles successfully", num_bundles_to_drain); + + Ok(()) + }; + + match tokio::time::timeout(Duration::from_secs(13), shutdown_logic).await { + Ok(res) => { + if let Err(e) = res { + error!(%e, "executor shutdown tasks aborted"); + } else { + info!("executor shutdown tasks completed successfully"); + } + } + Err(err) => { + error!(%err, "executor shutdown tasks failed to complete in time"); + } } + + reason.map(|_| ()) } } @@ -317,7 +412,6 @@ async fn get_latest_nonce( .await .wrap_err("failed getting latest nonce from sequencer after 1024 attempts") } - /// Queries the sequencer for the latest nonce with an exponential backoff #[instrument( name = "submit signed transaction", diff --git a/crates/astria-composer/src/executor/tests.rs b/crates/astria-composer/src/executor/tests.rs index 59e13d3deb..f1ea0c582b 100644 --- a/crates/astria-composer/src/executor/tests.rs +++ b/crates/astria-composer/src/executor/tests.rs @@ -21,6 +21,7 @@ use tokio::{ sync::watch, time, }; +use tokio_util::sync::CancellationToken; use tracing::debug; use wiremock::{ matchers::{ @@ -195,11 +196,13 @@ async fn wait_for_startup( async fn full_bundle() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg) = setup().await; + let shutdown_token = CancellationToken::new(); let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, + shutdown_token, ) .unwrap(); @@ -281,11 +284,13 @@ async fn full_bundle() { async fn bundle_triggered_by_block_timer() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg) = setup().await; + let shutdown_token = CancellationToken::new(); let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, + shutdown_token, ) .unwrap(); @@ -360,11 +365,13 @@ async fn bundle_triggered_by_block_timer() { async fn two_seq_actions_single_bundle() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg) = setup().await; + let shutdown_token = CancellationToken::new(); let (executor, executor_handle) = Executor::new( &cfg.sequencer_url, &cfg.private_key, cfg.block_time_ms, cfg.max_bytes_per_bundle, + shutdown_token.clone(), ) .unwrap(); diff --git a/crates/astria-composer/src/grpc.rs b/crates/astria-composer/src/grpc.rs index 30fc02e642..9448cafc90 100644 --- a/crates/astria-composer/src/grpc.rs +++ b/crates/astria-composer/src/grpc.rs @@ -17,6 +17,7 @@ use tokio::{ io, net::TcpListener, }; +use tokio_util::sync::CancellationToken; use crate::{ collectors, @@ -30,12 +31,14 @@ use crate::{ pub(crate) struct GrpcServer { listener: TcpListener, grpc_collector: collectors::Grpc, + shutdown_token: CancellationToken, } impl GrpcServer { pub(crate) async fn new( grpc_addr: SocketAddr, executor: executor::Handle, + shutdown_token: CancellationToken, ) -> eyre::Result { let listener = TcpListener::bind(grpc_addr) .await @@ -45,6 +48,7 @@ impl GrpcServer { Ok(Self { listener, grpc_collector, + shutdown_token, }) } @@ -68,9 +72,10 @@ impl GrpcServer { .await; grpc_server - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new( - self.listener, - )) + .serve_with_incoming_shutdown( + tokio_stream::wrappers::TcpListenerStream::new(self.listener), + self.shutdown_token.cancelled(), + ) .await .wrap_err("failed to run grpc server") }