Skip to content

Commit

Permalink
refactor(conductor) use cancellation tokens for shutdown (#845)
Browse files Browse the repository at this point in the history
## Summary
Switches from using oneshot channels to cancellation tokens.

## Background
This is much neater than using channels to signal shutdown. One token
can be passed to all tasks, instead of creating (and storing) one
channel per task.

Note that conductor will shut down immediately and not try to drain its
queue of pending blocks.

Also note that this PR does not address conductor hanging if it receives
SIGTERM during startup, where various async initializers might hang
(leaving to a followup PR).

## Changes
- Replace all `oneshot` channels used for signalling shutdown with
`CancellationToken`
- Make all `select!` loops biased, favoring cancellation over all other
branches
- reorder the other `select!` arms, favoring sending existing blocks to
executor over pulling new blocks, and heights
- Add comments to various unclear code parts
- Unify shutdown messages for all long lived tasks.

## Testing
This needs to be tested end-to-end.

---------

Co-authored-by: noot <36753753+noot@users.noreply.github.com>
  • Loading branch information
SuperFluffy and noot authored Mar 20, 2024
1 parent 2c0d2b1 commit 8620e83
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 122 deletions.
4 changes: 2 additions & 2 deletions crates/astria-conductor/src/celestia/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use celestia_client::celestia_types::nmt::Namespace;
use sequencer_client::HttpClient;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;

use super::Reader;
use crate::{
Expand All @@ -17,7 +17,7 @@ pub(crate) struct Builder {
pub(crate) executor: executor::Handle,
pub(crate) sequencer_cometbft_client: HttpClient,
pub(crate) sequencer_namespace: Namespace,
pub(crate) shutdown: oneshot::Receiver<()>,
pub(crate) shutdown: CancellationToken,
}

impl Builder {
Expand Down
63 changes: 38 additions & 25 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,12 @@ use telemetry::display::{
};
use tokio::{
select,
sync::{
mpsc::error::{
SendError,
TrySendError,
},
oneshot,
sync::mpsc::error::{
SendError,
TrySendError,
},
};
use tokio_util::sync::CancellationToken;
use tracing::{
debug,
error,
Expand Down Expand Up @@ -143,8 +141,8 @@ pub(crate) struct Reader {
/// The celestia namespace sequencer blobs will be read from.
sequencer_namespace: Namespace,

/// The channel to listen for shutdown signals.
shutdown: oneshot::Receiver<()>,
/// Token to listen for Conductor being shut down.
shutdown: CancellationToken,
}

impl Reader {
Expand All @@ -170,6 +168,8 @@ impl Reader {
"setting up celestia reader",
);

// XXX: The websocket client must be kept alive so that the subscription doesn't die.
// We bind to `_wsclient` because `_` will immediately drop the object.
let (mut _wsclient, mut headers) =
subscribe_to_celestia_headers(&self.celestia_ws_endpoint, &self.celestia_auth_token)
.await
Expand Down Expand Up @@ -217,33 +217,34 @@ impl Reader {
namespace.sequencer = %hex(&self.sequencer_namespace.as_bytes()),
));

let mut scheduled_block: Fuse<BoxFuture<Result<_, SendError<ReconstructedBlock>>>> =
// Enqueued block waiting for executor to free up. Set if the executor exhibits
// backpressure.
let mut enqueued_block: Fuse<BoxFuture<Result<_, SendError<ReconstructedBlock>>>> =
future::Fuse::terminated();

// Pending subcription to Celestia network headers. Set if the current subscription fails.
let mut resubscribing = Fuse::terminated();
loop {
let reason = loop {
select!(
shutdown_res = &mut self.shutdown => {
match shutdown_res {
Ok(()) => info!("received shutdown command; exiting"),
Err(error) => {
warn!(%error, "shutdown receiver dropped; exiting");
}
}
break;
biased;

() = self.shutdown.cancelled() => {
break Ok("received shutdown signal");
}

// Processing block executions which were scheduled due to channel being full
res = &mut scheduled_block, if !scheduled_block.is_terminated() => {
// Process block execution which was enqueued due to executor channel being full
res = &mut enqueued_block, if !enqueued_block.is_terminated() => {
match res {
Ok(celestia_height) => {
block_stream.inner_mut().update_reference_height_if_greater(celestia_height);
debug!("submitted enqueued block to executor, resuming normal operation");
}
Err(_) => bail!("executor channel closed while waiting for it to free up"),
Err(err) => break Err(err).wrap_err("failed sending enqueued block to executor"),
}
}

// Attempt sending next sequential block, if channel is full will be scheduled
Some(block) = sequential_blocks.next_block(), if scheduled_block.is_terminated() => {
// Forward the next block to executor. Enqueue if the executor channel is full.
Some(block) = sequential_blocks.next_block(), if enqueued_block.is_terminated() => {
let celestia_height = block.celestia_height;
match executor.try_send_firm_block(block) {
Ok(()) => {
Expand All @@ -253,7 +254,7 @@ impl Reader {
trace!("executor channel is full; rescheduling block fetch until the channel opens up");
let executor_clone = executor.clone();
// must return the celestia height to update the reference height upon completion
scheduled_block = async move {
enqueued_block = async move {
let celestia_height = block.celestia_height;
executor_clone.send_firm_block(block).await?;
Ok(celestia_height)
Expand Down Expand Up @@ -328,8 +329,20 @@ impl Reader {
}
}
);
};

// XXX: explicitly setting the message (usually implicitly set by tracing)
let message = "shutting down";
match reason {
Ok(reason) => {
info!(reason, message);
Ok(())
}
Err(reason) => {
error!(%reason, message);
Err(reason)
}
}
Ok(())
}
}

Expand Down
37 changes: 14 additions & 23 deletions crates/astria-conductor/src/conductor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
collections::HashMap,
error::Error as StdError,
rc::Rc,
time::Duration,
Expand All @@ -17,14 +16,16 @@ use tokio::{
signal,
SignalKind,
},
sync::oneshot,
task::{
spawn_local,
LocalSet,
},
time::timeout,
};
use tokio_util::task::JoinMap;
use tokio_util::{
sync::CancellationToken,
task::JoinMap,
};
use tracing::{
error,
info,
Expand All @@ -39,8 +40,8 @@ use crate::{
};

pub struct Conductor {
/// Channels to the long-running tasks to shut them down gracefully
shutdown_channels: HashMap<&'static str, oneshot::Sender<()>>,
/// Token to signal to all tasks to shut down gracefully.
shutdown: CancellationToken,

/// The different long-running tasks that make up the conductor;
tasks: JoinMap<&'static str, eyre::Result<()>>,
Expand All @@ -59,31 +60,27 @@ impl Conductor {
/// This usually happens if the actors failed to connect to their respective endpoints.
pub async fn new(cfg: Config) -> eyre::Result<Self> {
let mut tasks = JoinMap::new();
let mut shutdown_channels = HashMap::new();

let sequencer_cometbft_client = HttpClient::new(&*cfg.sequencer_cometbft_url)
.wrap_err("failed constructing sequencer cometbft RPC client")?;

let shutdown = CancellationToken::new();

// Spawn the executor task.
let executor_handle = {
let (shutdown_tx, shutdown_rx) = oneshot::channel();

let (executor, handle) = executor::Builder {
consider_commitment_spread: !cfg.execution_commit_level.is_soft_only(),
rollup_address: cfg.execution_rpc_url,
shutdown: shutdown_rx,
shutdown: shutdown.clone(),
}
.build()
.wrap_err("failed constructing exectur")?;

tasks.spawn(Self::EXECUTOR, executor.run_until_stopped());
shutdown_channels.insert(Self::EXECUTOR, shutdown_tx);
handle
};

if !cfg.execution_commit_level.is_firm_only() {
let (shutdown_tx, shutdown_rx) = oneshot::channel();

let sequencer_grpc_client =
sequencer::SequencerGrpcClient::new(&cfg.sequencer_grpc_url)
.wrap_err("failed constructing grpc client for Sequencer")?;
Expand All @@ -95,18 +92,14 @@ impl Conductor {
sequencer_grpc_client,
sequencer_cometbft_client: sequencer_cometbft_client.clone(),
sequencer_block_time: Duration::from_millis(cfg.sequencer_block_time_ms),
shutdown: shutdown_rx,
shutdown: shutdown.clone(),
executor: executor_handle.clone(),
}
.build();
tasks.spawn(Self::SEQUENCER, sequencer_reader.run_until_stopped());
shutdown_channels.insert(Self::SEQUENCER, shutdown_tx);
}

if !cfg.execution_commit_level.is_soft_only() {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
shutdown_channels.insert(Self::CELESTIA, shutdown_tx);

// Sequencer namespace is defined by the chain id of attached sequencer node
// which can be fetched from any block header.
let sequencer_namespace = get_sequencer_namespace(sequencer_cometbft_client.clone())
Expand All @@ -120,15 +113,15 @@ impl Conductor {
executor: executor_handle.clone(),
sequencer_cometbft_client: sequencer_cometbft_client.clone(),
sequencer_namespace,
shutdown: shutdown_rx,
shutdown: shutdown.clone(),
}
.build();

tasks.spawn(Self::CELESTIA, reader.run_until_stopped());
};

Ok(Self {
shutdown_channels,
shutdown,
tasks,
})
}
Expand Down Expand Up @@ -208,10 +201,8 @@ impl Conductor {
}

async fn shutdown(self) {
info!("sending shutdown command to all tasks");
for (_, channel) in self.shutdown_channels {
let _ = channel.send(());
}
info!("sending shutdown signal to all tasks");
self.shutdown.cancel();

info!("waiting 5 seconds for all tasks to shut down");
// put the tasks into an Rc to make them 'static so they can run on a local set
Expand Down
4 changes: 2 additions & 2 deletions crates/astria-conductor/src/executor/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use astria_eyre::eyre::{
};
use tokio::sync::{
mpsc,
oneshot,
watch,
};
use tokio_util::sync::CancellationToken;

use super::{
Executor,
Expand All @@ -20,7 +20,7 @@ use super::{
pub(crate) struct Builder {
pub(crate) consider_commitment_spread: bool,
pub(crate) rollup_address: String,
pub(crate) shutdown: oneshot::Receiver<()>,
pub(crate) shutdown: CancellationToken,
}

impl Builder {
Expand Down
41 changes: 21 additions & 20 deletions crates/astria-conductor/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ use tokio::{
TrySendError,
},
},
oneshot,
watch::{
self,
error::RecvError,
},
},
};
use tokio_util::sync::CancellationToken;
use tracing::{
debug,
error,
Expand Down Expand Up @@ -171,7 +171,8 @@ pub(crate) struct Executor {
firm_blocks: mpsc::Receiver<ReconstructedBlock>,
soft_blocks: channel::Receiver<FilteredSequencerBlock>,

shutdown: oneshot::Receiver<()>,
/// Token to listen for Conductor being shut down.
shutdown: CancellationToken,

rollup_address: tonic::transport::Uri,

Expand Down Expand Up @@ -209,7 +210,7 @@ impl Executor {
spread (this has no effect if conductor is set to perform soft-sync only)"
);

loop {
let reason = loop {
let spread_not_too_large = !self.is_spread_too_large(max_spread);
if spread_not_too_large {
self.soft_blocks.fill_permits();
Expand All @@ -218,16 +219,8 @@ impl Executor {
select!(
biased;

shutdown = &mut self.shutdown => {
let ret = if let Err(error) = shutdown {
let reason = "shutdown channel closed unexpectedly";
error!(%error, reason, "shutting down");
Err(error).wrap_err(reason)
} else {
info!(reason = "received shutdown signal", "shutting down");
Ok(())
};
break ret;
() = self.shutdown.cancelled() => {
break Ok("received shutdown signal");
}

Some(block) = self.firm_blocks.recv() => {
Expand All @@ -237,9 +230,7 @@ impl Executor {
"received block from celestia reader",
);
if let Err(error) = self.execute_firm(client.clone(), block).await {
let reason = "failed executing firm block";
error!(%error, reason, "shutting down");
break Err(error).wrap_err(reason);
break Err(error).wrap_err("failed executing firm block");
}
}

Expand All @@ -250,14 +241,24 @@ impl Executor {
"received block from sequencer reader",
);
if let Err(error) = self.execute_soft(client.clone(), block).await {
let reason = "failed executing soft block";
error!(%error, reason, "shutting down");
break Err(error).wrap_err(reason);
break Err(error).wrap_err("failed executing soft block");
}
}
);
};

// XXX: explicitly setting the message (usually implicitly set by tracing)
let message = "shutting down";
match reason {
Ok(reason) => {
info!(reason, message);
Ok(())
}
Err(reason) => {
error!(%reason, message);
Err(reason)
}
}
// XXX: shut down the channels here and attempt to drain them before returning.
}

/// Calculates the maximum allowed spread between firm and soft commitments heights.
Expand Down
Loading

0 comments on commit 8620e83

Please sign in to comment.