From 234829fe05abebe3c350efe8b55ce0f31146a3b7 Mon Sep 17 00:00:00 2001 From: Richard Janis Goldschmidt Date: Tue, 21 May 2024 12:27:20 +0200 Subject: [PATCH] feat(conductor): respect shutdown signals during init (#1080) ## Summary Conductor now respects shutdown signals it receives during init. ## Background Conductor's task ignored shutdowns while still initializing. This meant that Conductor would hang for up to 30 seconds. ## Changes - refactor conductor's constituent long-running tasks to separate initialization and running - listen for the shutdown signal in all of conductor's tasks ## Testing Run conductor with endpoints that hang indefinitely and sending it SIGTERM. Observe that conductor shuts down quickly. The main operation of conductor is unaffected on the happy path: all blackbox tests run to completion. A proper test for the shutdown logic will be implemented in a follow-up refactor similar to https://github.com/astriaorg/astria/pull/889 --- crates/astria-conductor/src/celestia/mod.rs | 14 +- .../astria-conductor/src/executor/builder.rs | 15 +- .../astria-conductor/src/executor/client.rs | 19 +- crates/astria-conductor/src/executor/mod.rs | 127 +++++---- crates/astria-conductor/src/sequencer/mod.rs | 268 ++++++++++++------ 5 files changed, 290 insertions(+), 153 deletions(-) diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index caaad4ce5a..db32c69152 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -138,10 +138,16 @@ pub(crate) struct Reader { impl Reader { pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { - let (executor, sequencer_chain_id) = self - .initialize() - .await - .wrap_err("initialization of runtime information failed")?; + let (executor, sequencer_chain_id) = select!( + () = self.shutdown.clone().cancelled_owned() => { + info!("received shutdown signal while waiting for Celestia reader task to initialize"); + return Ok(()); + } + + res = self.initialize() => { + res.wrap_err("initialization of runtime information failed")? + } + ); RunningReader::from_parts(self, executor, sequencer_chain_id) .wrap_err("failed entering run loop")? diff --git a/crates/astria-conductor/src/executor/builder.rs b/crates/astria-conductor/src/executor/builder.rs index e0b1e806cd..67941479a0 100644 --- a/crates/astria-conductor/src/executor/builder.rs +++ b/crates/astria-conductor/src/executor/builder.rs @@ -29,9 +29,12 @@ impl Builder { shutdown, } = self; - let rollup_address = rollup_address - .parse() - .wrap_err("failed to parse rollup address as URI")?; + let client = super::client::Client::connect_lazy(&rollup_address).wrap_err_with(|| { + format!( + "failed to construct execution client for provided rollup address \ + `{rollup_address}`" + ) + })?; let mut firm_block_tx = None; let mut firm_block_rx = None; @@ -52,16 +55,18 @@ impl Builder { let (state_tx, state_rx) = state::channel(); let executor = Executor { + client, + mode, firm_blocks: firm_block_rx, soft_blocks: soft_block_rx, - rollup_address, - shutdown, state: state_tx, blocks_pending_finalization: HashMap::new(), + + max_spread: None, }; let handle = Handle { firm_blocks: firm_block_tx, diff --git a/crates/astria-conductor/src/executor/client.rs b/crates/astria-conductor/src/executor/client.rs index d7689b779a..f57ceee8e8 100644 --- a/crates/astria-conductor/src/executor/client.rs +++ b/crates/astria-conductor/src/executor/client.rs @@ -20,23 +20,28 @@ use astria_eyre::eyre::{ }; use bytes::Bytes; use pbjson_types::Timestamp; -use tonic::transport::Channel; +use tonic::transport::{ + Channel, + Endpoint, + Uri, +}; use tracing::instrument; /// A newtype wrapper around [`ExecutionServiceClient`] to work with /// idiomatic types. #[derive(Clone)] pub(crate) struct Client { - uri: tonic::transport::Uri, + uri: Uri, inner: ExecutionServiceClient, } impl Client { - #[instrument(skip_all, fields(rollup_uri = %uri))] - pub(crate) async fn connect(uri: tonic::transport::Uri) -> eyre::Result { - let inner = ExecutionServiceClient::connect(uri.clone()) - .await - .wrap_err("failed constructing execution service client")?; + pub(crate) fn connect_lazy(uri: &str) -> eyre::Result { + let uri: Uri = uri + .parse() + .wrap_err("failed to parse provided string as uri")?; + let endpoint = Endpoint::from(uri.clone()).connect_lazy(); + let inner = ExecutionServiceClient::new(endpoint); Ok(Self { uri, inner, diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index c7d794155f..0f7e9078bd 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -215,6 +215,9 @@ impl Handle { } pub(crate) struct Executor { + /// The execution client driving the rollup. + client: Client, + /// The mode under which this executor (and hence conductor) runs. mode: CommitLevel, @@ -231,8 +234,6 @@ pub(crate) struct Executor { /// Token to listen for Conductor being shut down. shutdown: CancellationToken, - rollup_address: tonic::transport::Uri, - /// Tracks the status of the execution chain. state: StateSender, @@ -241,32 +242,29 @@ pub(crate) struct Executor { /// Required to mark firm blocks received from celestia as executed /// without re-executing on top of the rollup node. blocks_pending_finalization: HashMap, + + /// The maximum permitted spread between firm and soft blocks. + max_spread: Option, } impl Executor { - #[instrument(skip_all)] + #[instrument(skip_all, err)] pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { - let client = Client::connect(self.rollup_address.clone()) - .await - .wrap_err("failed connecting to rollup node")?; - - self.set_initial_node_state(client.clone()) - .await - .wrap_err("failed setting initial rollup node state")?; - - let max_spread: usize = self.calculate_max_spread(); - if let Some(channel) = self.soft_blocks.as_mut() { - channel.set_capacity(max_spread); - } - - info!( - max_spread, - "setting capacity of soft blocks channel to maximum permitted firm<>soft commitment \ - spread (this has no effect if conductor is set to perform soft-sync only)" + select!( + () = self.shutdown.clone().cancelled_owned() => { + info!( + "received shutdown signal while initializing task; \ + aborting intialization and exiting" + ); + return Ok(()); + } + res = self.init() => { + res.wrap_err("initialization failed")?; + } ); let reason = loop { - let spread_not_too_large = !self.is_spread_too_large(max_spread); + let spread_not_too_large = !self.is_spread_too_large(); if spread_not_too_large { if let Some(channel) = self.soft_blocks.as_mut() { channel.fill_permits(); @@ -288,7 +286,7 @@ impl Executor { block.hash = %telemetry::display::base64(&block.block_hash), "received block from celestia reader", ); - if let Err(error) = self.execute_firm(client.clone(), block).await { + if let Err(error) = self.execute_firm(block).await { break Err(error).wrap_err("failed executing firm block"); } } @@ -301,7 +299,7 @@ impl Executor { block.hash = %telemetry::display::base64(&block.block_hash()), "received block from sequencer reader", ); - if let Err(error) = self.execute_soft(client.clone(), block).await { + if let Err(error) = self.execute_soft(block).await { break Err(error).wrap_err("failed executing soft block"); } } @@ -322,6 +320,27 @@ impl Executor { } } + /// Runs the init logic that needs to happen before [`Executor`] can enter its main loop. + async fn init(&mut self) -> eyre::Result<()> { + self.set_initial_node_state() + .await + .wrap_err("failed setting initial rollup node state")?; + + let max_spread: usize = self.calculate_max_spread(); + self.max_spread.replace(max_spread); + if let Some(channel) = self.soft_blocks.as_mut() { + channel.set_capacity(max_spread); + info!( + max_spread, + "setting capacity of soft blocks channel to maximum permitted firm<>soft \ + commitment spread (this has no effect if conductor is set to perform soft-sync \ + only)" + ); + } + + Ok(()) + } + /// Calculates the maximum allowed spread between firm and soft commitments heights. /// /// The maximum allowed spread is taken as `max_spread = variance * 6`, where `variance` @@ -345,7 +364,11 @@ impl Executor { /// large. /// /// Always returns `false` if this executor was configured to run without firm commitments. - fn is_spread_too_large(&self, max_spread: usize) -> bool { + /// + /// # Panics + /// + /// Panics if called before [`Executor::init`] because `max_spread` must be set. + fn is_spread_too_large(&self) -> bool { if self.firm_blocks.is_none() { return false; } @@ -356,7 +379,12 @@ impl Executor { }; let is_too_far_ahead = usize::try_from(next_soft.saturating_sub(next_firm)) - .map(|spread| spread >= max_spread) + .map(|spread| { + spread + >= self + .max_spread + .expect("executor must be initalized and this field set") + }) .unwrap_or(false); if is_too_far_ahead { @@ -369,11 +397,7 @@ impl Executor { block.hash = %telemetry::display::base64(&block.block_hash()), block.height = block.height().value(), ))] - async fn execute_soft( - &mut self, - client: Client, - block: FilteredSequencerBlock, - ) -> eyre::Result<()> { + async fn execute_soft(&mut self, block: FilteredSequencerBlock) -> eyre::Result<()> { // TODO(https://github.com/astriaorg/astria/issues/624): add retry logic before failing hard. let executable_block = ExecutableBlock::from_sequencer(block, self.state.rollup_id()); @@ -410,14 +434,14 @@ impl Executor { // The parent hash of the next block is the hash of the block at the current head. let parent_hash = self.state.soft_hash(); let executed_block = self - .execute_block(client.clone(), parent_hash, executable_block) + .execute_block(parent_hash, executable_block) .await .wrap_err("failed to execute block")?; self.does_block_response_fulfill_contract(ExecutionKind::Soft, &executed_block) .wrap_err("execution API server violated contract")?; - self.update_commitment_state(client.clone(), Update::OnlySoft(executed_block.clone())) + self.update_commitment_state(Update::OnlySoft(executed_block.clone())) .await .wrap_err("failed to update soft commitment state")?; @@ -431,11 +455,7 @@ impl Executor { block.hash = %telemetry::display::base64(&block.block_hash), block.height = block.sequencer_height().value(), ))] - async fn execute_firm( - &mut self, - client: Client, - block: ReconstructedBlock, - ) -> eyre::Result<()> { + async fn execute_firm(&mut self, block: ReconstructedBlock) -> eyre::Result<()> { let executable_block = ExecutableBlock::from_reconstructed(block); let expected_height = self.state.next_expected_firm_sequencer_height(); let block_height = executable_block.height; @@ -459,7 +479,7 @@ impl Executor { let update = if self.should_execute_firm_block() { let parent_hash = self.state.firm_hash(); let executed_block = self - .execute_block(client.clone(), parent_hash, executable_block) + .execute_block(parent_hash, executable_block) .await .wrap_err("failed to execute block")?; self.does_block_response_fulfill_contract(ExecutionKind::Firm, &executed_block) @@ -477,7 +497,7 @@ impl Executor { "pending block not found for block number in cache. THIS SHOULD NOT HAPPEN. \ Trying to fetch the already-executed block from the rollup before giving up." ); - match client.clone().get_block(block_number).await { + match self.client.get_block(block_number).await { Ok(block) => Update::OnlyFirm(block), Err(error) => { error!( @@ -493,7 +513,7 @@ impl Executor { } }; - self.update_commitment_state(client.clone(), update) + self.update_commitment_state(update) .await .wrap_err("failed to setting both commitment states to executed block")?; Ok(()) @@ -511,7 +531,6 @@ impl Executor { ))] async fn execute_block( &mut self, - mut client: Client, parent_hash: Bytes, block: ExecutableBlock, ) -> eyre::Result { @@ -521,7 +540,8 @@ impl Executor { .. } = block; - let executed_block = client + let executed_block = self + .client .execute_block(parent_hash, transactions, timestamp) .await .wrap_err("failed to run execute_block RPC")?; @@ -536,20 +556,20 @@ impl Executor { } #[instrument(skip_all)] - async fn set_initial_node_state(&mut self, client: Client) -> eyre::Result<()> { + async fn set_initial_node_state(&mut self) -> eyre::Result<()> { let genesis_info = { - let mut client = client.clone(); - async move { - client + async { + self.client + .clone() .get_genesis_info() .await .wrap_err("failed getting genesis info") } }; let commitment_state = { - let mut client = client.clone(); - async move { - client + async { + self.client + .clone() .get_commitment_state() .await .wrap_err("failed getting commitment state") @@ -568,11 +588,7 @@ impl Executor { } #[instrument(skip_all)] - async fn update_commitment_state( - &mut self, - mut client: Client, - update: Update, - ) -> eyre::Result<()> { + async fn update_commitment_state(&mut self, update: Update) -> eyre::Result<()> { use Update::{ OnlyFirm, OnlySoft, @@ -588,7 +604,8 @@ impl Executor { .soft(soft) .build() .wrap_err("failed constructing commitment state")?; - let new_state = client + let new_state = self + .client .update_commitment_state(commitment_state) .await .wrap_err("failed updating remote commitment state")?; diff --git a/crates/astria-conductor/src/sequencer/mod.rs b/crates/astria-conductor/src/sequencer/mod.rs index 205268b854..535c61d4fd 100644 --- a/crates/astria-conductor/src/sequencer/mod.rs +++ b/crates/astria-conductor/src/sequencer/mod.rs @@ -2,8 +2,10 @@ use std::time::Duration; +use astria_core::sequencerblock::v1alpha1::block::FilteredSequencerBlock; use astria_eyre::eyre::{ self, + bail, Report, WrapErr as _, }; @@ -16,14 +18,18 @@ use futures::{ FutureExt as _, StreamExt as _, }; -use sequencer_client::HttpClient; +use sequencer_client::{ + tendermint::block::Height, + HttpClient, + LatestHeightStream, + StreamLatestHeight as _, +}; use tokio::select; use tokio_util::sync::CancellationToken; use tracing::{ debug, error, info, - instrument, trace, warn, }; @@ -32,8 +38,11 @@ use crate::{ block_cache::BlockCache, executor::{ self, + SoftSendError, SoftTrySendError, + StateIsInit, }, + sequencer::block_stream::BlocksFromHeightStream, }; mod block_stream; @@ -43,13 +52,26 @@ mod reporting; pub(crate) use builder::Builder; pub(crate) use client::SequencerGrpcClient; +/// [`Reader`] reads Sequencer blocks and forwards them to the [`crate::Executor`] task. +/// +/// The blocks are forwarded in strictly sequential order of their Sequencr heights. +/// A [`Reader`] is created with [`Builder::build`] and run with [`Reader::run_until_stopped`]. pub(crate) struct Reader { + /// The handle for sending sequencer blocks as soft commits to the executor + /// and checking it for the next expected height, and rollup ID associated with + /// this instance of Conductor. + /// Must be initialized before it can be used. executor: executor::Handle, + /// The gRPC client to fetch new blocks from the Sequencer network. sequencer_grpc_client: SequencerGrpcClient, + /// The cometbft client to periodically query the latest height of the Sequencer network. sequencer_cometbft_client: HttpClient, + /// The duration for the Sequencer network to produce a new block (and advance its height). + /// The reader will wait `sequencer_block_time` before querying the network for its latest + /// height. sequencer_block_time: Duration, /// Token to listen for Conductor being shut down. @@ -57,114 +79,157 @@ pub(crate) struct Reader { } impl Reader { - #[instrument(skip_all, err)] - pub(crate) async fn run_until_stopped(self) -> eyre::Result<()> { - use futures::future::FusedFuture as _; - let Self { - mut executor, + pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { + let executor = select!( + () = self.shutdown.clone().cancelled_owned() => { + info!("received shutdown signal while waiting for Sequencer reader task to initialize"); + return Ok(()); + } + res = self.initialize() => { + res? + } + ); + RunningReader::try_from_parts(self, executor) + .wrap_err("failed entering run loop")? + .run_until_stopped() + .await + } + + async fn initialize(&mut self) -> eyre::Result> { + self.executor + .wait_for_init() + .await + .wrap_err("handle to executor failed while waiting for it being initialized") + } +} + +struct RunningReader { + /// The initialized handle to the executor task. + /// Used for sending sequencer blocks as soft commits to the executor + /// and checking it for the next expected height, and rollup ID associated with + /// this instance of Conductor. + executor: executor::Handle, + + /// Caches the filtered sequencer blocks retrieved from the Sequencer. + /// This cache will yield a block if it contains a block that matches the + /// next expected soft block height of the executor task (as indicated by + /// the handle). + block_cache: BlockCache, + + /// A stream of the latest heights observed from the Sequencer network. + latest_height_stream: LatestHeightStream, + + /// A stream of block heights fetched from the Sequencer network up to + /// the latest observed sequencer height (as obtained from the `latest_height_stream`) field. + blocks_from_heights: BlocksFromHeightStream, + + /// An enqueued block waiting for executor to free up. Set if the executor exhibits + /// backpressure. + enqueued_block: Fuse>>, + + /// Token to listen for Conductor being shut down. + shutdown: CancellationToken, +} + +impl RunningReader { + fn try_from_parts( + reader: Reader, + mut executor: executor::Handle, + ) -> eyre::Result { + let Reader { sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, shutdown, - } = self; + .. + } = reader; - let mut executor = executor - .wait_for_init() - .await - .wrap_err("handle to executor failed while waiting for it being initialized")?; let next_expected_height = executor.next_expected_soft_sequencer_height(); - let mut latest_height_stream = { - use sequencer_client::StreamLatestHeight as _; - sequencer_cometbft_client.stream_latest_height(sequencer_block_time) - }; + let latest_height_stream = + sequencer_cometbft_client.stream_latest_height(sequencer_block_time); - let mut sequential_blocks = BlockCache::with_next_height(next_expected_height) + let block_cache = BlockCache::with_next_height(next_expected_height) .wrap_err("failed constructing sequential block cache")?; - let mut blocks_from_heights = block_stream::BlocksFromHeightStream::new( + let blocks_from_heights = BlocksFromHeightStream::new( executor.rollup_id(), next_expected_height, - sequencer_grpc_client.clone(), + sequencer_grpc_client, ); - // Enqueued block waiting for executor to free up. Set if the executor exhibits - // backpressure. - let mut enqueued_block: Fuse>> = future::Fuse::terminated(); + let enqueued_block: Fuse>> = future::Fuse::terminated(); + Ok(RunningReader { + executor, + block_cache, + latest_height_stream, + blocks_from_heights, + enqueued_block, + shutdown, + }) + } + + async fn run_until_stopped(mut self) -> eyre::Result<()> { + let stop_reason = self.run_loop().await; - let reason = loop { + // XXX: explicitly setting the message (usually implicitly set by tracing) + let message = "shutting down"; + match stop_reason { + Ok(stop_reason) => { + info!(stop_reason, message); + Ok(()) + } + Err(stop_reason) => { + error!(%stop_reason, message); + Err(stop_reason) + } + } + } + + async fn run_loop(&mut self) -> eyre::Result<&'static str> { + use futures::future::FusedFuture as _; + + loop { select! { biased; - () = shutdown.cancelled() => { - break Ok("received shutdown signal"); + () = self.shutdown.cancelled() => { + return Ok("received shutdown signal"); } // Process block execution which was enqueued due to executor channel being full. - res = &mut enqueued_block, if !enqueued_block.is_terminated() => { - match res { - Ok(()) => debug!("submitted enqueued block to executor, resuming normal operation"), - Err(err) => break Err(err).wrap_err("failed sending enqueued block to executor"), - } + res = &mut self.enqueued_block, if !self.enqueued_block.is_terminated() => { + res.wrap_err("failed sending enqueued block to executor")?; + debug!("submitted enqueued block to executor, resuming normal operation"); } // Skip heights that executor has already executed (e.g. firm blocks from Celestia) - Ok(next_height) = executor.next_expected_soft_height_if_changed() => { - blocks_from_heights.set_next_expected_height_if_greater(next_height); - sequential_blocks.drop_obsolete(next_height); + Ok(next_height) = self.executor.next_expected_soft_height_if_changed() => { + self.update_next_expected_height(next_height); } // Forward the next block to executor. Enqueue if the executor channel is full. - Some(block) = sequential_blocks.next_block(), if enqueued_block.is_terminated() => { - if let Err(err) = executor.try_send_soft_block(block) { - match err { - SoftTrySendError::Channel{source} => { - match *source { - executor::channel::TrySendError::Closed(_) => { - break Err(Report::msg( - "could not send block to executor because its channel was closed" - )); - } - - executor::channel::TrySendError::NoPermits(block) => { - trace!("executor channel is full; scheduling block and stopping block fetch until a slot opens up"); - enqueued_block = executor.clone().send_soft_block_owned(block).boxed().fuse(); - } - } - } - - SoftTrySendError::NotSet => { - break Err(Report::msg( - "conductor was configured without soft commitments; the \ - sequencer reader task should have never been started" - )); - } - } - } + Some(block) = self.block_cache.next_block(), if self.enqueued_block.is_terminated() => { + self.send_to_executor(block)?; } // Pull a block from the stream and put it in the block cache. - Some(block) = blocks_from_heights.next() => { - // XXX: blocks_from_heights stream uses SequencerGrpcClient::get, which has + Some(block) = self.blocks_from_heights.next() => { + // XXX: blocks_from_heights stream uses self::client::SequencerGrpcClient::get, which has // retry logic. An error here means that it could not retry or // otherwise recover from a failed block fetch. - let block = match block - .wrap_err("the stream of new blocks returned a catastrophic error") - { - Err(error) => break Err(error), - Ok(block) => block, - }; - if let Err(error) = sequential_blocks.insert(block).wrap_err("failed adding block to sequential cache") { - warn!(%error, "failed pushing block into cache, dropping it"); + let block = block.wrap_err("the stream of new blocks returned a catastrophic error")?; + if let Err(error) = self.block_cache.insert(block) { + warn!(%error, "failed pushing block into sequential cache, dropping it"); } } // Record the latest height of the Sequencer network, allowing `blocks_from_heights` to progress. - Some(res) = latest_height_stream.next() => { + Some(res) = self.latest_height_stream.next() => { match res { Ok(height) => { debug!(%height, "received latest height from sequencer"); - blocks_from_heights.set_latest_observed_height_if_greater(height); + self.blocks_from_heights.set_latest_observed_height_if_greater(height); } Err(error) => { warn!( @@ -175,19 +240,58 @@ impl Reader { } } } - }; + } + } - // XXX: explicitly setting the message (usually implicitly set by tracing) - let message = "shutting down"; - match reason { - Ok(reason) => { - info!(reason, message); - Ok(()) - } - Err(reason) => { - error!(%reason, message); - Err(reason) + /// Sends `block` to the executor task. + /// + /// Enqueues the block is the channel to the executor is full, sending it once + /// it frees up. + fn send_to_executor(&mut self, block: FilteredSequencerBlock) -> eyre::Result<()> { + if let Err(err) = self.executor.try_send_soft_block(block) { + match err { + SoftTrySendError::Channel { + source, + } => match *source { + executor::channel::TrySendError::Closed(_) => { + bail!("could not send block to executor because its channel was closed"); + } + + executor::channel::TrySendError::NoPermits(block) => { + trace!( + "executor channel is full; scheduling block and stopping block fetch \ + until a slot opens up" + ); + self.enqueued_block = self + .executor + .clone() + .send_soft_block_owned(block) + .boxed() + .fuse(); + } + }, + + SoftTrySendError::NotSet => { + bail!( + "conductor was configured without soft commitments; the sequencer reader \ + task should have never been started", + ); + } } } + Ok(()) + } + + /// Updates the next expected height to forward to the executor. + /// + /// This will all older heights from the cache and advance the stream of blocks + /// so that blocks older than `next_height` will not be fetched. + /// + /// Already in-flight fetches will still run their course but be rejected by + /// the block cache. + fn update_next_expected_height(&mut self, next_height: Height) { + self.blocks_from_heights + .set_next_expected_height_if_greater(next_height); + self.block_cache.drop_obsolete(next_height); } }