Skip to content

Commit

Permalink
feat(sequencer-relayer): provide a shutdown controller (#889)
Browse files Browse the repository at this point in the history
## Summary
Adds a shutdown handle for the sequencer-relayer.

## Background
We want the ability to invoke the shutdown sequence from `main` and from
tests.

## Changes
A new RAII object `ShutdownHandle` has been added. This cancels the
wrapped `CancellationToken` when the `ShutdownHandle` is dropped or when
its `shutdown` method is called.

`SIGTERM` handling has been moved to main.rs; on receiving a `SIGTERM`,
`shutdown` is called on the shutdown handle associated with the relayer.

The relayer's main `select!` loop was simplified to only contain the two
tasks.

## Testing
The `TestSequencerRelayer` was updated to make use of the new shutdown
controller. Also manually tested by sending a `SIGTERM` to a running
sequencer-relayer.

## Related Issues
Closes #882
  • Loading branch information
Fraser999 authored Apr 3, 2024
1 parent 1b72d6b commit 0877d69
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 46 deletions.
2 changes: 1 addition & 1 deletion crates/astria-sequencer-relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jsonrpsee = { workspace = true, features = ["server"] }
once_cell = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tokio-stream = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
wiremock = { workspace = true }
assert-json-diff = "2.0.2"
tower-http = { version = "0.4", features = ["auth"] }
Expand Down
5 changes: 4 additions & 1 deletion crates/astria-sequencer-relayer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ pub(crate) mod validator;

pub use build_info::BUILD_INFO;
pub use config::Config;
pub use sequencer_relayer::SequencerRelayer;
pub use sequencer_relayer::{
SequencerRelayer,
ShutdownHandle,
};
36 changes: 31 additions & 5 deletions crates/astria-sequencer-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@ use astria_sequencer_relayer::{
SequencerRelayer,
BUILD_INFO,
};
use tracing::info;
use tokio::signal::unix::{
signal,
SignalKind,
};
use tracing::{
error,
info,
warn,
};

#[tokio::main]
async fn main() -> ExitCode {
Expand Down Expand Up @@ -44,10 +52,28 @@ async fn main() -> ExitCode {
"initializing sequencer relayer"
);

SequencerRelayer::new(cfg)
.expect("could not initialize sequencer relayer")
.run()
.await;
let mut sigterm = signal(SignalKind::terminate())
.expect("setting a SIGTERM listener should always work on Unix");
let (sequencer_relayer, shutdown_handle) =
SequencerRelayer::new(cfg).expect("could not initialize sequencer relayer");
let sequencer_relayer_handle = tokio::spawn(sequencer_relayer.run());

let shutdown_token = shutdown_handle.token();
tokio::select!(
_ = sigterm.recv() => {
// We don't care about the result (i.e. whether there could be more SIGTERM signals
// incoming); we just want to shut down as soon as we receive the first `SIGTERM`.
info!("received SIGTERM, issuing shutdown to all services");
shutdown_handle.shutdown();
}
() = shutdown_token.cancelled() => {
warn!("stopped waiting for SIGTERM");
}
);

if let Err(error) = sequencer_relayer_handle.await {
error!(%error, "failed to join main sequencer relayer task");
}

ExitCode::SUCCESS
}
91 changes: 60 additions & 31 deletions crates/astria-sequencer-relayer/src/sequencer_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ use astria_eyre::eyre::{
};
use tokio::{
select,
signal::unix::{
signal,
SignalKind,
},
sync::oneshot,
task::{
JoinError,
Expand Down Expand Up @@ -47,8 +43,8 @@ impl SequencerRelayer {
/// # Errors
///
/// Returns an error if constructing the inner relayer type failed.
pub fn new(cfg: Config) -> eyre::Result<Self> {
let shutdown_token = CancellationToken::new();
pub fn new(cfg: Config) -> eyre::Result<(Self, ShutdownHandle)> {
let shutdown_handle = ShutdownHandle::new();
let Config {
cometbft_endpoint,
sequencer_grpc_endpoint,
Expand All @@ -65,7 +61,7 @@ impl SequencerRelayer {

let validator_key_path = relay_only_validator_key_blocks.then_some(validator_key_file);
let relayer = relayer::Builder {
shutdown_token: shutdown_token.clone(),
shutdown_token: shutdown_handle.token(),
celestia_endpoint,
celestia_bearer_token,
cometbft_endpoint,
Expand All @@ -83,22 +79,19 @@ impl SequencerRelayer {
format!("failed to parse provided `api_addr` string as socket address: `{api_addr}`",)
})?;
let api_server = api::start(api_socket_addr, state_rx);
Ok(Self {
let relayer = Self {
api_server,
relayer,
shutdown_token,
})
shutdown_token: shutdown_handle.token(),
};
Ok((relayer, shutdown_handle))
}

pub fn local_addr(&self) -> SocketAddr {
self.api_server.local_addr()
}

/// Run Sequencer Relayer.
///
/// # Panics
/// Panics if a signal listener could not be constructed (usually if this binary is not run on a
/// Unix).
/// Runs Sequencer Relayer.
pub async fn run(self) {
let Self {
api_server,
Expand All @@ -121,16 +114,7 @@ impl SequencerRelayer {
let mut relayer_task = tokio::spawn(relayer.run());
info!("spawned relayer task");

let mut sigterm = signal(SignalKind::terminate()).expect(
"setting a SIGTERM listener should always work on unix; is this running on Unix?",
);

let shutdown = select!(
_ = sigterm.recv() => {
info!("received SIGTERM, issuing shutdown to all services");
ShutDown { api_task: Some(api_task), relayer_task: Some(relayer_task), api_shutdown_signal, shutdown_token }
},

o = &mut api_task => {
report_exit("api server", o);
ShutDown { api_task: None, relayer_task: Some(relayer_task), api_shutdown_signal, shutdown_token }
Expand All @@ -145,6 +129,44 @@ impl SequencerRelayer {
}
}

/// A handle for instructing the [`SequencerRelayer`] to shut down.
///
/// It is returned along with its related `SequencerRelayer` from [`SequencerRelayer::new`]. The
/// `SequencerRelayer` will begin to shut down as soon as [`ShutdownHandle::shutdown`] is called or
/// when the `ShutdownHandle` is dropped.
pub struct ShutdownHandle {
token: CancellationToken,
}

impl ShutdownHandle {
#[must_use]
fn new() -> Self {
Self {
token: CancellationToken::new(),
}
}

/// Returns a clone of the wrapped cancellation token.
#[must_use]
pub fn token(&self) -> CancellationToken {
self.token.clone()
}

/// Consumes `self` and cancels the wrapped cancellation token.
pub fn shutdown(self) {
self.token.cancel();
}
}

impl Drop for ShutdownHandle {
fn drop(&mut self) {
if !self.token.is_cancelled() {
info!("shutdown handle dropped, issuing shutdown to all services");
}
self.token.cancel();
}
}

fn report_exit(task_name: &str, outcome: Result<eyre::Result<()>, JoinError>) {
match outcome {
Ok(Ok(())) => info!(task = task_name, "task has exited"),
Expand Down Expand Up @@ -180,18 +202,21 @@ impl ShutDown {
// Giving relayer 25 seconds to shutdown because Kubernetes issues a SIGKILL after 30.
if let Some(mut relayer_task) = relayer_task {
info!("waiting for relayer task to shut down");
match timeout(Duration::from_secs(25), &mut relayer_task)
let limit = Duration::from_secs(25);
match timeout(limit, &mut relayer_task)
.await
.map(crate::utils::flatten)
{
Ok(Ok(())) => info!("relayer exited gracefully"),
Ok(Err(error)) => error!(%error, "relayer exited with an error"),
Err(_) => {
error!("relayer did not shut down after 25 seconds; killing it");
error!(
timeout_secs = limit.as_secs(),
"relayer did not shut down within timeout; killing it"
);
relayer_task.abort();
}
}
info!("sending shutdown signal to API server");
} else {
info!("relayer task was already dead");
}
Expand All @@ -200,14 +225,18 @@ impl ShutDown {
if let Some(mut api_task) = api_task {
info!("sending shutdown signal to API server");
let _ = api_shutdown_signal.send(());
match timeout(Duration::from_secs(4), &mut api_task)
let limit = Duration::from_secs(4);
match timeout(limit, &mut api_task)
.await
.map(crate::utils::flatten)
{
Ok(Ok(())) => info!("api server exited gracefully"),
Ok(Err(error)) => error!(%error, "api server exited with an error"),
Ok(Ok(())) => info!("API server exited gracefully"),
Ok(Err(error)) => error!(%error, "API server exited with an error"),
Err(_) => {
error!("api server did not shut down after 25 seconds; killing it");
error!(
timeout_secs = limit.as_secs(),
"API server did not shut down within timeout; killing it"
);
api_task.abort();
}
}
Expand Down
33 changes: 31 additions & 2 deletions crates/astria-sequencer-relayer/tests/blackbox/helper.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{
collections::VecDeque,
mem,
net::SocketAddr,
sync::{
Arc,
Mutex,
},
time::Duration,
};

use assert_json_diff::assert_json_include;
Expand All @@ -27,6 +29,7 @@ use astria_core::{
use astria_sequencer_relayer::{
config::Config,
SequencerRelayer,
ShutdownHandle,
};
use celestia_client::celestia_types::{
blob::SubmitOptions,
Expand All @@ -43,11 +46,16 @@ use tendermint_rpc::{
};
use tokio::{
net::TcpListener,
runtime::{
self,
RuntimeFlavor,
},
sync::{
mpsc,
oneshot,
},
task::JoinHandle,
time::timeout,
};
use tonic::{
Request,
Expand Down Expand Up @@ -164,6 +172,8 @@ pub struct TestSequencerRelayer {

pub sequencer: JoinHandle<()>,

/// A handle which issues a shutdown to the sequencer relayer on being dropped.
pub relayer_shutdown_handle: Option<ShutdownHandle>,
pub sequencer_relayer: JoinHandle<()>,

pub config: Config,
Expand All @@ -180,7 +190,18 @@ pub struct TestSequencerRelayer {

impl Drop for TestSequencerRelayer {
fn drop(&mut self) {
self.sequencer_relayer.abort();
// We drop the shutdown handle here to cause the sequencer relayer to shut down.
let _ = self.relayer_shutdown_handle.take();

let sequencer_relayer = mem::replace(&mut self.sequencer_relayer, tokio::spawn(async {}));
let _ = futures::executor::block_on(async move {
timeout(Duration::from_secs(30), sequencer_relayer)
.await
.unwrap_or_else(|_| {
panic!("timed out waiting for sequencer relayer to shut down");
})
});

self.sequencer.abort();
self.celestia.server_handle.stop().unwrap();
}
Expand Down Expand Up @@ -305,6 +326,12 @@ pub struct TestSequencerRelayerConfig {

impl TestSequencerRelayerConfig {
pub async fn spawn_relayer(self) -> TestSequencerRelayer {
assert_ne!(
runtime::Handle::current().runtime_flavor(),
RuntimeFlavor::CurrentThread,
"the sequencer relayer must be run on a multi-threaded runtime, e.g. the test could \
be configured using `#[tokio::test(flavor = \"multi_thread\", worker_threads = 1)]`"
);
Lazy::force(&TELEMETRY);

let mut celestia = MockCelestia::start().await;
Expand Down Expand Up @@ -379,7 +406,8 @@ impl TestSequencerRelayerConfig {
};

info!(config = serde_json::to_string(&config).unwrap());
let sequencer_relayer = SequencerRelayer::new(config.clone()).unwrap();
let (sequencer_relayer, relayer_shutdown_handle) =
SequencerRelayer::new(config.clone()).unwrap();
let api_address = sequencer_relayer.local_addr();
let sequencer_relayer = tokio::task::spawn(sequencer_relayer.run());

Expand All @@ -390,6 +418,7 @@ impl TestSequencerRelayerConfig {
sequencer,
sequencer_server_blocks,
cometbft,
relayer_shutdown_handle: Some(relayer_shutdown_handle),
sequencer_relayer,
signing_key,
account: address,
Expand Down
12 changes: 6 additions & 6 deletions crates/astria-sequencer-relayer/tests/blackbox/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::time::{
const RELAY_SELF: bool = true;
const RELAY_ALL: bool = false;

#[tokio::test(flavor = "current_thread")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn report_degraded_if_block_fetch_fails() {
let mut sequencer_relayer = TestSequencerRelayerConfig {
relay_only_self: false,
Expand Down Expand Up @@ -62,7 +62,7 @@ async fn report_degraded_if_block_fetch_fails() {
),
)
.await
.expect("requesting abci info and block must have occured")
.expect("requesting abci info and block must have occurred")
.1
.unwrap();

Expand All @@ -82,7 +82,7 @@ async fn report_degraded_if_block_fetch_fails() {
);
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn one_block_is_relayed_to_celestia() {
let mut sequencer_relayer = TestSequencerRelayerConfig {
relay_only_self: false,
Expand Down Expand Up @@ -119,7 +119,7 @@ async fn one_block_is_relayed_to_celestia() {
assert_eq!(blobs_seen_by_celestia.len(), 2);
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn later_height_in_state_leads_to_expected_relay() {
let mut sequencer_relayer = TestSequencerRelayerConfig {
relay_only_self: false,
Expand Down Expand Up @@ -162,7 +162,7 @@ async fn later_height_in_state_leads_to_expected_relay() {
sequencer_relayer.assert_state_files_are_as_expected(6, 6);
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn three_blocks_are_relayed() {
let mut sequencer_relayer = TestSequencerRelayerConfig {
relay_only_self: false,
Expand Down Expand Up @@ -213,7 +213,7 @@ async fn three_blocks_are_relayed() {
);
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn block_from_other_proposer_is_skipped() {
let mut sequencer_relayer = TestSequencerRelayerConfig {
relay_only_self: true,
Expand Down

0 comments on commit 0877d69

Please sign in to comment.