Skip to content

Commit

Permalink
fix(conductor): don't panic while shutting down (#846)
Browse files Browse the repository at this point in the history
## Summary
Fixes conductor panicking during shutdown.

## Background
Conductor was using a tokio `LocalSet` together with an `Rc` to run its
shutdown logic. This was entirely unnecessary and panicky (since the
`Rc` was cloned into multiple tasks, which was obviously wrong).

Note that this PR does not address conductor ignoring `SIGTERM` during
initialization.

## Changes
- Simplify the awkward `LocalSet` + `Rc` to shutdown conductor tasks
- Simplify reporting task failure and exist

## Testing
This needs to be tested end to end (since conductor receives an external
signal).

## Related Issues
Closes #828
  • Loading branch information
SuperFluffy authored Mar 20, 2024
1 parent 8620e83 commit da01133
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 104 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/astria-conductor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ sequencer-client = { package = "astria-sequencer-client", path = "../astria-sequ
telemetry = { package = "astria-telemetry", path = "../astria-telemetry", features = [
"display",
] }
itertools = "0.12.1"

[dev-dependencies]
jsonrpsee = { workspace = true, features = ["server"] }
Expand Down
142 changes: 39 additions & 103 deletions crates/astria-conductor/src/conductor.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
use std::{
error::Error as StdError,
rc::Rc,
time::Duration,
};
use std::time::Duration;

use astria_eyre::eyre::{
self,
eyre,
WrapErr as _,
};
use celestia_client::celestia_types::nmt::Namespace;
use itertools::Itertools as _;
use sequencer_client::HttpClient;
use tokio::{
select,
signal::unix::{
signal,
SignalKind,
},
task::{
spawn_local,
LocalSet,
},
time::timeout,
};
use tokio_util::{
Expand All @@ -36,6 +30,7 @@ use crate::{
celestia,
executor,
sequencer,
utils::flatten,
Config,
};

Expand Down Expand Up @@ -131,124 +126,65 @@ impl Conductor {
/// # Panics
/// Panics if it could not install a signal handler.
pub async fn run_until_stopped(mut self) {
enum ExitReason {
Sigterm,
TaskExited {
name: &'static str,
},
TaskErrored {
name: &'static str,
error: eyre::Report,
},
TaskPanicked {
name: &'static str,
error: tokio::task::JoinError,
},
}
use ExitReason::{
Sigterm,
TaskErrored,
TaskExited,
TaskPanicked,
};
let mut sigterm = signal(SignalKind::terminate()).expect(
"setting a SIGTERM listener should always work on unix; is this running on unix?",
);

let exit_reason = select! {
biased;

_ = sigterm.recv() => Sigterm,
_ = sigterm.recv() => Ok("received SIGTERM"),

Some((name, res)) = self.tasks.join_next() => {
match res {
Ok(Ok(())) => TaskExited { name, },
Ok(Err(error)) => TaskErrored { name, error},
Err(error) => TaskPanicked { name, error },
match flatten(res) {
Ok(()) => Err(eyre!("task `{name}` exited unexpectedly")),
Err(err) => Err(err).wrap_err_with(|| "task `{name}` failed"),
}
}
};

let message = "initiating shutdown";
match exit_reason {
Sigterm => info!(reason = "received SIGTERM", "shutting down"),
TaskExited {
name,
} => error!(
task.name = name,
reason = "task exited unexpectedly",
"shutting down"
),
TaskErrored {
name,
error,
} => error!(
task.name = name,
reason = "task exited with error",
%error,
"shutting down"
),
TaskPanicked {
name,
error,
} => error!(
task.name = name,
reason = "task panicked",
error = &error as &dyn StdError,
"shutting down",
),
Ok(reason) => info!(reason, message),
Err(reason) => error!(%reason, message),
}
self.shutdown().await;
}

async fn shutdown(self) {
info!("sending shutdown signal to all tasks");
/// Shuts down all tasks.
///
/// Waits 25 seconds for all tasks to shut down before aborting them. 25 seconds
/// because kubernetes issues SIGKILL 30 seconds after SIGTERM, giving 5 seconds
/// to abort the remaining tasks.
async fn shutdown(mut self) {
self.shutdown.cancel();

info!("waiting 5 seconds for all tasks to shut down");
// put the tasks into an Rc to make them 'static so they can run on a local set
let mut tasks = Rc::new(self.tasks);
let local_set = LocalSet::new();
local_set
.run_until(async {
let mut tasks = tasks.clone();
let _ = timeout(
Duration::from_secs(5),
spawn_local(async move {
while let Some((name, res)) = Rc::get_mut(&mut tasks)
.expect(
"only one Rc to the conductor tasks should exist; this is a bug",
)
.join_next()
.await
{
match res {
Ok(Ok(())) => info!(task.name = name, "task exited normally"),
Ok(Err(e)) => {
let error: &(dyn std::error::Error + 'static) = e.as_ref();
error!(task.name = name, error, "task exited with error");
}
Err(e) => {
let error = &e as &(dyn std::error::Error + 'static);
error!(task.name = name, error, "task failed");
}
}
}
}),
)
.await;
})
.await;
info!("signalled all tasks to shut down; waiting for 25 seconds to exit");

let shutdown_loop = async {
while let Some((name, res)) = self.tasks.join_next().await {
let message = "task shut down";
match flatten(res) {
Ok(()) => info!(name, message),
Err(error) => error!(name, %error, message),
}
}
};

if !tasks.is_empty() {
if timeout(Duration::from_secs(25), shutdown_loop)
.await
.is_err()
{
let tasks = self.tasks.keys().join(", ");
warn!(
number = tasks.len(),
"aborting tasks that haven't shutdown yet"
tasks = format_args!("[{tasks}]"),
"aborting all tasks that have not yet shut down",
);
Rc::get_mut(&mut tasks)
.expect("only one Rc to the conductor tasks should exist; this is a bug")
.shutdown()
.await;
self.tasks.abort_all();
} else {
info!("all tasks shut down regularly");
}
info!("shutting down");
}
}

Expand Down
13 changes: 13 additions & 0 deletions crates/astria-conductor/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use astria_eyre::eyre::{
self,
WrapErr as _,
};
use celestia_client::celestia_types::Height as CelestiaHeight;
use sequencer_client::tendermint::block::Height as SequencerHeight;
use tokio::task::JoinError;

/// A necessary evil because the celestia client code uses a forked tendermint-rs.
pub(crate) trait IncrementableHeight {
Expand All @@ -17,3 +22,11 @@ impl IncrementableHeight for SequencerHeight {
self.increment()
}
}

pub(crate) fn flatten<T>(res: Result<eyre::Result<T>, JoinError>) -> eyre::Result<T> {
match res {
Ok(Ok(val)) => Ok(val),
Ok(Err(err)) => Err(err).wrap_err("task returned with error"),
Err(err) => Err(err).wrap_err("task panicked"),
}
}
2 changes: 1 addition & 1 deletion crates/astria-sequencer-relayer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::task::JoinError;
pub(crate) fn flatten<T>(res: Result<eyre::Result<T>, JoinError>) -> eyre::Result<T> {
match res {
Ok(Ok(val)) => Ok(val),
Ok(Err(err)) => Err(err).wrap_err("fask returned with error"),
Ok(Err(err)) => Err(err).wrap_err("task returned with error"),
Err(err) => Err(err).wrap_err("task panicked"),
}
}

0 comments on commit da01133

Please sign in to comment.