From 8620e834e0fbbbcaf3a4ddacbb3bbf1d31bc846e Mon Sep 17 00:00:00 2001 From: Richard Janis Goldschmidt Date: Wed, 20 Mar 2024 22:54:28 +0100 Subject: [PATCH] refactor(conductor) use cancellation tokens for shutdown (#845) ## Summary Switches from using oneshot channels to cancellation tokens. ## Background This is much neater than using channels to signal shutdown. One token can be passed to all tasks, instead of creating (and storing) one channel per task. Note that conductor will shut down immediately and not try to drain its queue of pending blocks. Also note that this PR does not address conductor hanging if it receives SIGTERM during startup, where various async initializers might hang (leaving to a followup PR). ## Changes - Replace all `oneshot` channels used for signalling shutdown with `CancellationToken` - Make all `select!` loops biased, favoring cancellation over all other branches - reorder the other `select!` arms, favoring sending existing blocks to executor over pulling new blocks, and heights - Add comments to various unclear code parts - Unify shutdown messages for all long lived tasks. ## Testing This needs to be tested end-to-end. --------- Co-authored-by: noot <36753753+noot@users.noreply.github.com> --- .../astria-conductor/src/celestia/builder.rs | 4 +- crates/astria-conductor/src/celestia/mod.rs | 63 +++++++----- crates/astria-conductor/src/conductor.rs | 37 +++---- .../astria-conductor/src/executor/builder.rs | 4 +- crates/astria-conductor/src/executor/mod.rs | 41 ++++---- crates/astria-conductor/src/executor/tests.rs | 15 ++- .../astria-conductor/src/sequencer/builder.rs | 4 +- crates/astria-conductor/src/sequencer/mod.rs | 97 +++++++++++-------- 8 files changed, 143 insertions(+), 122 deletions(-) diff --git a/crates/astria-conductor/src/celestia/builder.rs b/crates/astria-conductor/src/celestia/builder.rs index 4c01ac2930..93b3eaf761 100644 --- a/crates/astria-conductor/src/celestia/builder.rs +++ b/crates/astria-conductor/src/celestia/builder.rs @@ -2,7 +2,7 @@ use celestia_client::celestia_types::nmt::Namespace; use sequencer_client::HttpClient; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use super::Reader; use crate::{ @@ -17,7 +17,7 @@ pub(crate) struct Builder { pub(crate) executor: executor::Handle, pub(crate) sequencer_cometbft_client: HttpClient, pub(crate) sequencer_namespace: Namespace, - pub(crate) shutdown: oneshot::Receiver<()>, + pub(crate) shutdown: CancellationToken, } impl Builder { diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 57e771aed2..af86bd3721 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -58,14 +58,12 @@ use telemetry::display::{ }; use tokio::{ select, - sync::{ - mpsc::error::{ - SendError, - TrySendError, - }, - oneshot, + sync::mpsc::error::{ + SendError, + TrySendError, }, }; +use tokio_util::sync::CancellationToken; use tracing::{ debug, error, @@ -143,8 +141,8 @@ pub(crate) struct Reader { /// The celestia namespace sequencer blobs will be read from. sequencer_namespace: Namespace, - /// The channel to listen for shutdown signals. - shutdown: oneshot::Receiver<()>, + /// Token to listen for Conductor being shut down. + shutdown: CancellationToken, } impl Reader { @@ -170,6 +168,8 @@ impl Reader { "setting up celestia reader", ); + // XXX: The websocket client must be kept alive so that the subscription doesn't die. + // We bind to `_wsclient` because `_` will immediately drop the object. let (mut _wsclient, mut headers) = subscribe_to_celestia_headers(&self.celestia_ws_endpoint, &self.celestia_auth_token) .await @@ -217,33 +217,34 @@ impl Reader { namespace.sequencer = %hex(&self.sequencer_namespace.as_bytes()), )); - let mut scheduled_block: Fuse>>> = + // Enqueued block waiting for executor to free up. Set if the executor exhibits + // backpressure. + let mut enqueued_block: Fuse>>> = future::Fuse::terminated(); + + // Pending subcription to Celestia network headers. Set if the current subscription fails. let mut resubscribing = Fuse::terminated(); - loop { + let reason = loop { select!( - shutdown_res = &mut self.shutdown => { - match shutdown_res { - Ok(()) => info!("received shutdown command; exiting"), - Err(error) => { - warn!(%error, "shutdown receiver dropped; exiting"); - } - } - break; + biased; + + () = self.shutdown.cancelled() => { + break Ok("received shutdown signal"); } - // Processing block executions which were scheduled due to channel being full - res = &mut scheduled_block, if !scheduled_block.is_terminated() => { + // Process block execution which was enqueued due to executor channel being full + res = &mut enqueued_block, if !enqueued_block.is_terminated() => { match res { Ok(celestia_height) => { block_stream.inner_mut().update_reference_height_if_greater(celestia_height); + debug!("submitted enqueued block to executor, resuming normal operation"); } - Err(_) => bail!("executor channel closed while waiting for it to free up"), + Err(err) => break Err(err).wrap_err("failed sending enqueued block to executor"), } } - // Attempt sending next sequential block, if channel is full will be scheduled - Some(block) = sequential_blocks.next_block(), if scheduled_block.is_terminated() => { + // Forward the next block to executor. Enqueue if the executor channel is full. + Some(block) = sequential_blocks.next_block(), if enqueued_block.is_terminated() => { let celestia_height = block.celestia_height; match executor.try_send_firm_block(block) { Ok(()) => { @@ -253,7 +254,7 @@ impl Reader { trace!("executor channel is full; rescheduling block fetch until the channel opens up"); let executor_clone = executor.clone(); // must return the celestia height to update the reference height upon completion - scheduled_block = async move { + enqueued_block = async move { let celestia_height = block.celestia_height; executor_clone.send_firm_block(block).await?; Ok(celestia_height) @@ -328,8 +329,20 @@ impl Reader { } } ); + }; + + // XXX: explicitly setting the message (usually implicitly set by tracing) + let message = "shutting down"; + match reason { + Ok(reason) => { + info!(reason, message); + Ok(()) + } + Err(reason) => { + error!(%reason, message); + Err(reason) + } } - Ok(()) } } diff --git a/crates/astria-conductor/src/conductor.rs b/crates/astria-conductor/src/conductor.rs index 509ab93176..f7b6c30f32 100644 --- a/crates/astria-conductor/src/conductor.rs +++ b/crates/astria-conductor/src/conductor.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, error::Error as StdError, rc::Rc, time::Duration, @@ -17,14 +16,16 @@ use tokio::{ signal, SignalKind, }, - sync::oneshot, task::{ spawn_local, LocalSet, }, time::timeout, }; -use tokio_util::task::JoinMap; +use tokio_util::{ + sync::CancellationToken, + task::JoinMap, +}; use tracing::{ error, info, @@ -39,8 +40,8 @@ use crate::{ }; pub struct Conductor { - /// Channels to the long-running tasks to shut them down gracefully - shutdown_channels: HashMap<&'static str, oneshot::Sender<()>>, + /// Token to signal to all tasks to shut down gracefully. + shutdown: CancellationToken, /// The different long-running tasks that make up the conductor; tasks: JoinMap<&'static str, eyre::Result<()>>, @@ -59,31 +60,27 @@ impl Conductor { /// This usually happens if the actors failed to connect to their respective endpoints. pub async fn new(cfg: Config) -> eyre::Result { let mut tasks = JoinMap::new(); - let mut shutdown_channels = HashMap::new(); let sequencer_cometbft_client = HttpClient::new(&*cfg.sequencer_cometbft_url) .wrap_err("failed constructing sequencer cometbft RPC client")?; + let shutdown = CancellationToken::new(); + // Spawn the executor task. let executor_handle = { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let (executor, handle) = executor::Builder { consider_commitment_spread: !cfg.execution_commit_level.is_soft_only(), rollup_address: cfg.execution_rpc_url, - shutdown: shutdown_rx, + shutdown: shutdown.clone(), } .build() .wrap_err("failed constructing exectur")?; tasks.spawn(Self::EXECUTOR, executor.run_until_stopped()); - shutdown_channels.insert(Self::EXECUTOR, shutdown_tx); handle }; if !cfg.execution_commit_level.is_firm_only() { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let sequencer_grpc_client = sequencer::SequencerGrpcClient::new(&cfg.sequencer_grpc_url) .wrap_err("failed constructing grpc client for Sequencer")?; @@ -95,18 +92,14 @@ impl Conductor { sequencer_grpc_client, sequencer_cometbft_client: sequencer_cometbft_client.clone(), sequencer_block_time: Duration::from_millis(cfg.sequencer_block_time_ms), - shutdown: shutdown_rx, + shutdown: shutdown.clone(), executor: executor_handle.clone(), } .build(); tasks.spawn(Self::SEQUENCER, sequencer_reader.run_until_stopped()); - shutdown_channels.insert(Self::SEQUENCER, shutdown_tx); } if !cfg.execution_commit_level.is_soft_only() { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - shutdown_channels.insert(Self::CELESTIA, shutdown_tx); - // Sequencer namespace is defined by the chain id of attached sequencer node // which can be fetched from any block header. let sequencer_namespace = get_sequencer_namespace(sequencer_cometbft_client.clone()) @@ -120,7 +113,7 @@ impl Conductor { executor: executor_handle.clone(), sequencer_cometbft_client: sequencer_cometbft_client.clone(), sequencer_namespace, - shutdown: shutdown_rx, + shutdown: shutdown.clone(), } .build(); @@ -128,7 +121,7 @@ impl Conductor { }; Ok(Self { - shutdown_channels, + shutdown, tasks, }) } @@ -208,10 +201,8 @@ impl Conductor { } async fn shutdown(self) { - info!("sending shutdown command to all tasks"); - for (_, channel) in self.shutdown_channels { - let _ = channel.send(()); - } + info!("sending shutdown signal to all tasks"); + self.shutdown.cancel(); info!("waiting 5 seconds for all tasks to shut down"); // put the tasks into an Rc to make them 'static so they can run on a local set diff --git a/crates/astria-conductor/src/executor/builder.rs b/crates/astria-conductor/src/executor/builder.rs index 14b665bb10..a9357a78f8 100644 --- a/crates/astria-conductor/src/executor/builder.rs +++ b/crates/astria-conductor/src/executor/builder.rs @@ -6,9 +6,9 @@ use astria_eyre::eyre::{ }; use tokio::sync::{ mpsc, - oneshot, watch, }; +use tokio_util::sync::CancellationToken; use super::{ Executor, @@ -20,7 +20,7 @@ use super::{ pub(crate) struct Builder { pub(crate) consider_commitment_spread: bool, pub(crate) rollup_address: String, - pub(crate) shutdown: oneshot::Receiver<()>, + pub(crate) shutdown: CancellationToken, } impl Builder { diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index 19da0a3f7c..f2b99af47f 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -35,13 +35,13 @@ use tokio::{ TrySendError, }, }, - oneshot, watch::{ self, error::RecvError, }, }, }; +use tokio_util::sync::CancellationToken; use tracing::{ debug, error, @@ -171,7 +171,8 @@ pub(crate) struct Executor { firm_blocks: mpsc::Receiver, soft_blocks: channel::Receiver, - shutdown: oneshot::Receiver<()>, + /// Token to listen for Conductor being shut down. + shutdown: CancellationToken, rollup_address: tonic::transport::Uri, @@ -209,7 +210,7 @@ impl Executor { spread (this has no effect if conductor is set to perform soft-sync only)" ); - loop { + let reason = loop { let spread_not_too_large = !self.is_spread_too_large(max_spread); if spread_not_too_large { self.soft_blocks.fill_permits(); @@ -218,16 +219,8 @@ impl Executor { select!( biased; - shutdown = &mut self.shutdown => { - let ret = if let Err(error) = shutdown { - let reason = "shutdown channel closed unexpectedly"; - error!(%error, reason, "shutting down"); - Err(error).wrap_err(reason) - } else { - info!(reason = "received shutdown signal", "shutting down"); - Ok(()) - }; - break ret; + () = self.shutdown.cancelled() => { + break Ok("received shutdown signal"); } Some(block) = self.firm_blocks.recv() => { @@ -237,9 +230,7 @@ impl Executor { "received block from celestia reader", ); if let Err(error) = self.execute_firm(client.clone(), block).await { - let reason = "failed executing firm block"; - error!(%error, reason, "shutting down"); - break Err(error).wrap_err(reason); + break Err(error).wrap_err("failed executing firm block"); } } @@ -250,14 +241,24 @@ impl Executor { "received block from sequencer reader", ); if let Err(error) = self.execute_soft(client.clone(), block).await { - let reason = "failed executing soft block"; - error!(%error, reason, "shutting down"); - break Err(error).wrap_err(reason); + break Err(error).wrap_err("failed executing soft block"); } } ); + }; + + // XXX: explicitly setting the message (usually implicitly set by tracing) + let message = "shutting down"; + match reason { + Ok(reason) => { + info!(reason, message); + Ok(()) + } + Err(reason) => { + error!(%reason, message); + Err(reason) + } } - // XXX: shut down the channels here and attempt to drain them before returning. } /// Calculates the maximum allowed spread between firm and soft commitments heights. diff --git a/crates/astria-conductor/src/executor/tests.rs b/crates/astria-conductor/src/executor/tests.rs index 9d83ff4257..89ba733b45 100644 --- a/crates/astria-conductor/src/executor/tests.rs +++ b/crates/astria-conductor/src/executor/tests.rs @@ -39,10 +39,8 @@ use astria_core::{ }; use bytes::Bytes; use prost::Message; -use tokio::{ - sync::oneshot, - task::JoinHandle, -}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use tonic::transport::Server; use super::{ @@ -234,7 +232,7 @@ fn make_reconstructed_block() -> ReconstructedBlock { struct MockEnvironment { _server: MockExecutionServer, - _shutdown_tx: oneshot::Sender<()>, + _shutdown: CancellationToken, executor: Executor, client: Client, } @@ -243,12 +241,11 @@ async fn start_mock() -> MockEnvironment { let server = MockExecutionServer::spawn().await; let server_url = format!("http://{}", server.local_addr()); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - + let shutdown_token = CancellationToken::new(); let (executor, _) = crate::executor::Builder { consider_commitment_spread: false, rollup_address: server_url, - shutdown: shutdown_rx, + shutdown: shutdown_token.clone(), } .build() .unwrap(); @@ -264,7 +261,7 @@ async fn start_mock() -> MockEnvironment { MockEnvironment { _server: server, - _shutdown_tx: shutdown_tx, + _shutdown: shutdown_token, executor, client, } diff --git a/crates/astria-conductor/src/sequencer/builder.rs b/crates/astria-conductor/src/sequencer/builder.rs index 52f6d24d87..2cdaf4bf62 100644 --- a/crates/astria-conductor/src/sequencer/builder.rs +++ b/crates/astria-conductor/src/sequencer/builder.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use super::SequencerGrpcClient; use crate::executor; @@ -10,7 +10,7 @@ pub(crate) struct Builder { pub(crate) sequencer_grpc_client: SequencerGrpcClient, pub(crate) sequencer_cometbft_client: sequencer_client::HttpClient, pub(crate) sequencer_block_time: Duration, - pub(crate) shutdown: oneshot::Receiver<()>, + pub(crate) shutdown: CancellationToken, } impl Builder { diff --git a/crates/astria-conductor/src/sequencer/mod.rs b/crates/astria-conductor/src/sequencer/mod.rs index 1771e7bdff..948c69fdd8 100644 --- a/crates/astria-conductor/src/sequencer/mod.rs +++ b/crates/astria-conductor/src/sequencer/mod.rs @@ -1,9 +1,6 @@ //! [`Reader`] reads reads blocks from sequencer and forwards them to [`crate::executor::Executor`]. -use std::{ - error::Error as StdError, - time::Duration, -}; +use std::time::Duration; use astria_eyre::eyre::{ self, @@ -21,12 +18,11 @@ use futures::{ StreamExt as _, }; use sequencer_client::HttpClient; -use tokio::{ - select, - sync::oneshot, -}; +use tokio::select; +use tokio_util::sync::CancellationToken; use tracing::{ debug, + error, info, instrument, trace, @@ -54,8 +50,8 @@ pub(crate) struct Reader { sequencer_block_time: Duration, - /// The shutdown channel to notify `Reader` to shut down. - shutdown: oneshot::Receiver<()>, + /// Token to listen for Conductor being shut down. + shutdown: CancellationToken, } impl Reader { @@ -67,7 +63,7 @@ impl Reader { sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, - mut shutdown, + shutdown, } = self; let mut executor = executor @@ -97,56 +93,66 @@ impl Reader { sequencer_grpc_client.clone(), ); - let mut scheduled_send: Fuse>> = future::Fuse::terminated(); - 'reader_loop: loop { + // Enqueued block waiting for executor to free up. Set if the executor exhibits + // backpressure. + let mut enqueued_block: Fuse>> = future::Fuse::terminated(); + + let reason = loop { select! { - shutdown = &mut shutdown => { - let ret = if let Err(e) = shutdown { - warn!( - error = &e as &dyn StdError, - "shutdown channel closed unexpectedly; shutting down", - ); - Err(e).wrap_err("shutdown channel closed unexpectedly") - } else { - info!("received shutdown signal; shutting down"); - Ok(()) - }; - break 'reader_loop ret; + biased; + + () = shutdown.cancelled() => { + break Ok("received shutdown signal"); } - Some(block) = blocks_from_heights.next() => { - let block = block.wrap_err("failed getting block")?; - if let Err(e) = sequential_blocks.insert(block) { - // XXX: we could temporarily kill the subscription if we put an upper limit on the cache size - warn!(error = &e as &dyn std::error::Error, "failed pushing block into cache, dropping it"); + // Process block execution which was enqueued due to executor channel being full. + res = &mut enqueued_block, if !enqueued_block.is_terminated() => { + match res { + Ok(()) => debug!("submitted enqueued block to executor, resuming normal operation"), + Err(err) => break Err(err).wrap_err("failed sending enqueued block to executor"), } } + // Skip heights that executor has already executed (e.g. firm blocks from Celestia) Ok(next_height) = executor.next_expected_soft_height_if_changed() => { blocks_from_heights.set_next_expected_height_if_greater(next_height); sequential_blocks.drop_obsolete(next_height); } - res = &mut scheduled_send, if !scheduled_send.is_terminated() => { - if res.is_err() { - bail!("executor channel closed while waiting for it to free up"); - } - } - - Some(block) = sequential_blocks.next_block(), if scheduled_send.is_terminated() => { + // Forward the next block to executor. Enqueue if the executor channel is full. + Some(block) = sequential_blocks.next_block(), if enqueued_block.is_terminated() => { if let Err(err) = executor.try_send_soft_block(block) { match err { + // `Closed` contains the block. Dropping it because there is no use for it. executor::channel::TrySendError::Closed(_) => { - bail!("executor channel is closed") + break Err(Report::msg("could not send block to executor because its channel was closed")); } + executor::channel::TrySendError::NoPermits(block) => { trace!("executor channel is full; scheduling block and stopping block fetch until a slot opens up"); - scheduled_send = executor.clone().send_soft_block_owned(block).boxed().fuse(); + enqueued_block = executor.clone().send_soft_block_owned(block).boxed().fuse(); } } } } + // Pull a block from the stream and put it in the block cache. + Some(block) = blocks_from_heights.next() => { + // XXX: blocks_from_heights stream uses SequencerGrpcClient::get, which has + // retry logic. An error here means that it could not retry or + // otherwise recover from a failed block fetch. + let block = match block + .wrap_err("the stream of new blocks returned a catastrophic error") + { + Err(error) => break Err(error), + Ok(block) => block, + }; + if let Err(error) = sequential_blocks.insert(block).wrap_err("failed adding block to sequential cache") { + warn!(%error, "failed pushing block into cache, dropping it"); + } + } + + // Record the latest height of the Sequencer network, allowing `blocks_from_heights` to progress. Some(res) = latest_height_stream.next() => { match res { Ok(height) => { @@ -162,6 +168,19 @@ impl Reader { } } } + }; + + // XXX: explicitly setting the message (usually implicitly set by tracing) + let message = "shutting down"; + match reason { + Ok(reason) => { + info!(reason, message); + Ok(()) + } + Err(reason) => { + error!(%reason, message); + Err(reason) + } } } }