diff --git a/crates/astria-conductor/src/celestia/builder.rs b/crates/astria-conductor/src/celestia/builder.rs index 4c01ac2930..93b3eaf761 100644 --- a/crates/astria-conductor/src/celestia/builder.rs +++ b/crates/astria-conductor/src/celestia/builder.rs @@ -2,7 +2,7 @@ use celestia_client::celestia_types::nmt::Namespace; use sequencer_client::HttpClient; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use super::Reader; use crate::{ @@ -17,7 +17,7 @@ pub(crate) struct Builder { pub(crate) executor: executor::Handle, pub(crate) sequencer_cometbft_client: HttpClient, pub(crate) sequencer_namespace: Namespace, - pub(crate) shutdown: oneshot::Receiver<()>, + pub(crate) shutdown: CancellationToken, } impl Builder { diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 57e771aed2..af86bd3721 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -58,14 +58,12 @@ use telemetry::display::{ }; use tokio::{ select, - sync::{ - mpsc::error::{ - SendError, - TrySendError, - }, - oneshot, + sync::mpsc::error::{ + SendError, + TrySendError, }, }; +use tokio_util::sync::CancellationToken; use tracing::{ debug, error, @@ -143,8 +141,8 @@ pub(crate) struct Reader { /// The celestia namespace sequencer blobs will be read from. sequencer_namespace: Namespace, - /// The channel to listen for shutdown signals. - shutdown: oneshot::Receiver<()>, + /// Token to listen for Conductor being shut down. + shutdown: CancellationToken, } impl Reader { @@ -170,6 +168,8 @@ impl Reader { "setting up celestia reader", ); + // XXX: The websocket client must be kept alive so that the subscription doesn't die. + // We bind to `_wsclient` because `_` will immediately drop the object. let (mut _wsclient, mut headers) = subscribe_to_celestia_headers(&self.celestia_ws_endpoint, &self.celestia_auth_token) .await @@ -217,33 +217,34 @@ impl Reader { namespace.sequencer = %hex(&self.sequencer_namespace.as_bytes()), )); - let mut scheduled_block: Fuse>>> = + // Enqueued block waiting for executor to free up. Set if the executor exhibits + // backpressure. + let mut enqueued_block: Fuse>>> = future::Fuse::terminated(); + + // Pending subcription to Celestia network headers. Set if the current subscription fails. let mut resubscribing = Fuse::terminated(); - loop { + let reason = loop { select!( - shutdown_res = &mut self.shutdown => { - match shutdown_res { - Ok(()) => info!("received shutdown command; exiting"), - Err(error) => { - warn!(%error, "shutdown receiver dropped; exiting"); - } - } - break; + biased; + + () = self.shutdown.cancelled() => { + break Ok("received shutdown signal"); } - // Processing block executions which were scheduled due to channel being full - res = &mut scheduled_block, if !scheduled_block.is_terminated() => { + // Process block execution which was enqueued due to executor channel being full + res = &mut enqueued_block, if !enqueued_block.is_terminated() => { match res { Ok(celestia_height) => { block_stream.inner_mut().update_reference_height_if_greater(celestia_height); + debug!("submitted enqueued block to executor, resuming normal operation"); } - Err(_) => bail!("executor channel closed while waiting for it to free up"), + Err(err) => break Err(err).wrap_err("failed sending enqueued block to executor"), } } - // Attempt sending next sequential block, if channel is full will be scheduled - Some(block) = sequential_blocks.next_block(), if scheduled_block.is_terminated() => { + // Forward the next block to executor. Enqueue if the executor channel is full. + Some(block) = sequential_blocks.next_block(), if enqueued_block.is_terminated() => { let celestia_height = block.celestia_height; match executor.try_send_firm_block(block) { Ok(()) => { @@ -253,7 +254,7 @@ impl Reader { trace!("executor channel is full; rescheduling block fetch until the channel opens up"); let executor_clone = executor.clone(); // must return the celestia height to update the reference height upon completion - scheduled_block = async move { + enqueued_block = async move { let celestia_height = block.celestia_height; executor_clone.send_firm_block(block).await?; Ok(celestia_height) @@ -328,8 +329,20 @@ impl Reader { } } ); + }; + + // XXX: explicitly setting the message (usually implicitly set by tracing) + let message = "shutting down"; + match reason { + Ok(reason) => { + info!(reason, message); + Ok(()) + } + Err(reason) => { + error!(%reason, message); + Err(reason) + } } - Ok(()) } } diff --git a/crates/astria-conductor/src/conductor.rs b/crates/astria-conductor/src/conductor.rs index 509ab93176..f7b6c30f32 100644 --- a/crates/astria-conductor/src/conductor.rs +++ b/crates/astria-conductor/src/conductor.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, error::Error as StdError, rc::Rc, time::Duration, @@ -17,14 +16,16 @@ use tokio::{ signal, SignalKind, }, - sync::oneshot, task::{ spawn_local, LocalSet, }, time::timeout, }; -use tokio_util::task::JoinMap; +use tokio_util::{ + sync::CancellationToken, + task::JoinMap, +}; use tracing::{ error, info, @@ -39,8 +40,8 @@ use crate::{ }; pub struct Conductor { - /// Channels to the long-running tasks to shut them down gracefully - shutdown_channels: HashMap<&'static str, oneshot::Sender<()>>, + /// Token to signal to all tasks to shut down gracefully. + shutdown: CancellationToken, /// The different long-running tasks that make up the conductor; tasks: JoinMap<&'static str, eyre::Result<()>>, @@ -59,31 +60,27 @@ impl Conductor { /// This usually happens if the actors failed to connect to their respective endpoints. pub async fn new(cfg: Config) -> eyre::Result { let mut tasks = JoinMap::new(); - let mut shutdown_channels = HashMap::new(); let sequencer_cometbft_client = HttpClient::new(&*cfg.sequencer_cometbft_url) .wrap_err("failed constructing sequencer cometbft RPC client")?; + let shutdown = CancellationToken::new(); + // Spawn the executor task. let executor_handle = { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let (executor, handle) = executor::Builder { consider_commitment_spread: !cfg.execution_commit_level.is_soft_only(), rollup_address: cfg.execution_rpc_url, - shutdown: shutdown_rx, + shutdown: shutdown.clone(), } .build() .wrap_err("failed constructing exectur")?; tasks.spawn(Self::EXECUTOR, executor.run_until_stopped()); - shutdown_channels.insert(Self::EXECUTOR, shutdown_tx); handle }; if !cfg.execution_commit_level.is_firm_only() { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let sequencer_grpc_client = sequencer::SequencerGrpcClient::new(&cfg.sequencer_grpc_url) .wrap_err("failed constructing grpc client for Sequencer")?; @@ -95,18 +92,14 @@ impl Conductor { sequencer_grpc_client, sequencer_cometbft_client: sequencer_cometbft_client.clone(), sequencer_block_time: Duration::from_millis(cfg.sequencer_block_time_ms), - shutdown: shutdown_rx, + shutdown: shutdown.clone(), executor: executor_handle.clone(), } .build(); tasks.spawn(Self::SEQUENCER, sequencer_reader.run_until_stopped()); - shutdown_channels.insert(Self::SEQUENCER, shutdown_tx); } if !cfg.execution_commit_level.is_soft_only() { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - shutdown_channels.insert(Self::CELESTIA, shutdown_tx); - // Sequencer namespace is defined by the chain id of attached sequencer node // which can be fetched from any block header. let sequencer_namespace = get_sequencer_namespace(sequencer_cometbft_client.clone()) @@ -120,7 +113,7 @@ impl Conductor { executor: executor_handle.clone(), sequencer_cometbft_client: sequencer_cometbft_client.clone(), sequencer_namespace, - shutdown: shutdown_rx, + shutdown: shutdown.clone(), } .build(); @@ -128,7 +121,7 @@ impl Conductor { }; Ok(Self { - shutdown_channels, + shutdown, tasks, }) } @@ -208,10 +201,8 @@ impl Conductor { } async fn shutdown(self) { - info!("sending shutdown command to all tasks"); - for (_, channel) in self.shutdown_channels { - let _ = channel.send(()); - } + info!("sending shutdown signal to all tasks"); + self.shutdown.cancel(); info!("waiting 5 seconds for all tasks to shut down"); // put the tasks into an Rc to make them 'static so they can run on a local set diff --git a/crates/astria-conductor/src/executor/builder.rs b/crates/astria-conductor/src/executor/builder.rs index 14b665bb10..a9357a78f8 100644 --- a/crates/astria-conductor/src/executor/builder.rs +++ b/crates/astria-conductor/src/executor/builder.rs @@ -6,9 +6,9 @@ use astria_eyre::eyre::{ }; use tokio::sync::{ mpsc, - oneshot, watch, }; +use tokio_util::sync::CancellationToken; use super::{ Executor, @@ -20,7 +20,7 @@ use super::{ pub(crate) struct Builder { pub(crate) consider_commitment_spread: bool, pub(crate) rollup_address: String, - pub(crate) shutdown: oneshot::Receiver<()>, + pub(crate) shutdown: CancellationToken, } impl Builder { diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index 19da0a3f7c..f2b99af47f 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -35,13 +35,13 @@ use tokio::{ TrySendError, }, }, - oneshot, watch::{ self, error::RecvError, }, }, }; +use tokio_util::sync::CancellationToken; use tracing::{ debug, error, @@ -171,7 +171,8 @@ pub(crate) struct Executor { firm_blocks: mpsc::Receiver, soft_blocks: channel::Receiver, - shutdown: oneshot::Receiver<()>, + /// Token to listen for Conductor being shut down. + shutdown: CancellationToken, rollup_address: tonic::transport::Uri, @@ -209,7 +210,7 @@ impl Executor { spread (this has no effect if conductor is set to perform soft-sync only)" ); - loop { + let reason = loop { let spread_not_too_large = !self.is_spread_too_large(max_spread); if spread_not_too_large { self.soft_blocks.fill_permits(); @@ -218,16 +219,8 @@ impl Executor { select!( biased; - shutdown = &mut self.shutdown => { - let ret = if let Err(error) = shutdown { - let reason = "shutdown channel closed unexpectedly"; - error!(%error, reason, "shutting down"); - Err(error).wrap_err(reason) - } else { - info!(reason = "received shutdown signal", "shutting down"); - Ok(()) - }; - break ret; + () = self.shutdown.cancelled() => { + break Ok("received shutdown signal"); } Some(block) = self.firm_blocks.recv() => { @@ -237,9 +230,7 @@ impl Executor { "received block from celestia reader", ); if let Err(error) = self.execute_firm(client.clone(), block).await { - let reason = "failed executing firm block"; - error!(%error, reason, "shutting down"); - break Err(error).wrap_err(reason); + break Err(error).wrap_err("failed executing firm block"); } } @@ -250,14 +241,24 @@ impl Executor { "received block from sequencer reader", ); if let Err(error) = self.execute_soft(client.clone(), block).await { - let reason = "failed executing soft block"; - error!(%error, reason, "shutting down"); - break Err(error).wrap_err(reason); + break Err(error).wrap_err("failed executing soft block"); } } ); + }; + + // XXX: explicitly setting the message (usually implicitly set by tracing) + let message = "shutting down"; + match reason { + Ok(reason) => { + info!(reason, message); + Ok(()) + } + Err(reason) => { + error!(%reason, message); + Err(reason) + } } - // XXX: shut down the channels here and attempt to drain them before returning. } /// Calculates the maximum allowed spread between firm and soft commitments heights. diff --git a/crates/astria-conductor/src/executor/tests.rs b/crates/astria-conductor/src/executor/tests.rs index 9d83ff4257..89ba733b45 100644 --- a/crates/astria-conductor/src/executor/tests.rs +++ b/crates/astria-conductor/src/executor/tests.rs @@ -39,10 +39,8 @@ use astria_core::{ }; use bytes::Bytes; use prost::Message; -use tokio::{ - sync::oneshot, - task::JoinHandle, -}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use tonic::transport::Server; use super::{ @@ -234,7 +232,7 @@ fn make_reconstructed_block() -> ReconstructedBlock { struct MockEnvironment { _server: MockExecutionServer, - _shutdown_tx: oneshot::Sender<()>, + _shutdown: CancellationToken, executor: Executor, client: Client, } @@ -243,12 +241,11 @@ async fn start_mock() -> MockEnvironment { let server = MockExecutionServer::spawn().await; let server_url = format!("http://{}", server.local_addr()); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - + let shutdown_token = CancellationToken::new(); let (executor, _) = crate::executor::Builder { consider_commitment_spread: false, rollup_address: server_url, - shutdown: shutdown_rx, + shutdown: shutdown_token.clone(), } .build() .unwrap(); @@ -264,7 +261,7 @@ async fn start_mock() -> MockEnvironment { MockEnvironment { _server: server, - _shutdown_tx: shutdown_tx, + _shutdown: shutdown_token, executor, client, } diff --git a/crates/astria-conductor/src/sequencer/builder.rs b/crates/astria-conductor/src/sequencer/builder.rs index 52f6d24d87..2cdaf4bf62 100644 --- a/crates/astria-conductor/src/sequencer/builder.rs +++ b/crates/astria-conductor/src/sequencer/builder.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use super::SequencerGrpcClient; use crate::executor; @@ -10,7 +10,7 @@ pub(crate) struct Builder { pub(crate) sequencer_grpc_client: SequencerGrpcClient, pub(crate) sequencer_cometbft_client: sequencer_client::HttpClient, pub(crate) sequencer_block_time: Duration, - pub(crate) shutdown: oneshot::Receiver<()>, + pub(crate) shutdown: CancellationToken, } impl Builder { diff --git a/crates/astria-conductor/src/sequencer/mod.rs b/crates/astria-conductor/src/sequencer/mod.rs index 1771e7bdff..948c69fdd8 100644 --- a/crates/astria-conductor/src/sequencer/mod.rs +++ b/crates/astria-conductor/src/sequencer/mod.rs @@ -1,9 +1,6 @@ //! [`Reader`] reads reads blocks from sequencer and forwards them to [`crate::executor::Executor`]. -use std::{ - error::Error as StdError, - time::Duration, -}; +use std::time::Duration; use astria_eyre::eyre::{ self, @@ -21,12 +18,11 @@ use futures::{ StreamExt as _, }; use sequencer_client::HttpClient; -use tokio::{ - select, - sync::oneshot, -}; +use tokio::select; +use tokio_util::sync::CancellationToken; use tracing::{ debug, + error, info, instrument, trace, @@ -54,8 +50,8 @@ pub(crate) struct Reader { sequencer_block_time: Duration, - /// The shutdown channel to notify `Reader` to shut down. - shutdown: oneshot::Receiver<()>, + /// Token to listen for Conductor being shut down. + shutdown: CancellationToken, } impl Reader { @@ -67,7 +63,7 @@ impl Reader { sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, - mut shutdown, + shutdown, } = self; let mut executor = executor @@ -97,56 +93,66 @@ impl Reader { sequencer_grpc_client.clone(), ); - let mut scheduled_send: Fuse>> = future::Fuse::terminated(); - 'reader_loop: loop { + // Enqueued block waiting for executor to free up. Set if the executor exhibits + // backpressure. + let mut enqueued_block: Fuse>> = future::Fuse::terminated(); + + let reason = loop { select! { - shutdown = &mut shutdown => { - let ret = if let Err(e) = shutdown { - warn!( - error = &e as &dyn StdError, - "shutdown channel closed unexpectedly; shutting down", - ); - Err(e).wrap_err("shutdown channel closed unexpectedly") - } else { - info!("received shutdown signal; shutting down"); - Ok(()) - }; - break 'reader_loop ret; + biased; + + () = shutdown.cancelled() => { + break Ok("received shutdown signal"); } - Some(block) = blocks_from_heights.next() => { - let block = block.wrap_err("failed getting block")?; - if let Err(e) = sequential_blocks.insert(block) { - // XXX: we could temporarily kill the subscription if we put an upper limit on the cache size - warn!(error = &e as &dyn std::error::Error, "failed pushing block into cache, dropping it"); + // Process block execution which was enqueued due to executor channel being full. + res = &mut enqueued_block, if !enqueued_block.is_terminated() => { + match res { + Ok(()) => debug!("submitted enqueued block to executor, resuming normal operation"), + Err(err) => break Err(err).wrap_err("failed sending enqueued block to executor"), } } + // Skip heights that executor has already executed (e.g. firm blocks from Celestia) Ok(next_height) = executor.next_expected_soft_height_if_changed() => { blocks_from_heights.set_next_expected_height_if_greater(next_height); sequential_blocks.drop_obsolete(next_height); } - res = &mut scheduled_send, if !scheduled_send.is_terminated() => { - if res.is_err() { - bail!("executor channel closed while waiting for it to free up"); - } - } - - Some(block) = sequential_blocks.next_block(), if scheduled_send.is_terminated() => { + // Forward the next block to executor. Enqueue if the executor channel is full. + Some(block) = sequential_blocks.next_block(), if enqueued_block.is_terminated() => { if let Err(err) = executor.try_send_soft_block(block) { match err { + // `Closed` contains the block. Dropping it because there is no use for it. executor::channel::TrySendError::Closed(_) => { - bail!("executor channel is closed") + break Err(Report::msg("could not send block to executor because its channel was closed")); } + executor::channel::TrySendError::NoPermits(block) => { trace!("executor channel is full; scheduling block and stopping block fetch until a slot opens up"); - scheduled_send = executor.clone().send_soft_block_owned(block).boxed().fuse(); + enqueued_block = executor.clone().send_soft_block_owned(block).boxed().fuse(); } } } } + // Pull a block from the stream and put it in the block cache. + Some(block) = blocks_from_heights.next() => { + // XXX: blocks_from_heights stream uses SequencerGrpcClient::get, which has + // retry logic. An error here means that it could not retry or + // otherwise recover from a failed block fetch. + let block = match block + .wrap_err("the stream of new blocks returned a catastrophic error") + { + Err(error) => break Err(error), + Ok(block) => block, + }; + if let Err(error) = sequential_blocks.insert(block).wrap_err("failed adding block to sequential cache") { + warn!(%error, "failed pushing block into cache, dropping it"); + } + } + + // Record the latest height of the Sequencer network, allowing `blocks_from_heights` to progress. Some(res) = latest_height_stream.next() => { match res { Ok(height) => { @@ -162,6 +168,19 @@ impl Reader { } } } + }; + + // XXX: explicitly setting the message (usually implicitly set by tracing) + let message = "shutting down"; + match reason { + Ok(reason) => { + info!(reason, message); + Ok(()) + } + Err(reason) => { + error!(%reason, message); + Err(reason) + } } } }