diff --git a/Cargo.lock b/Cargo.lock index b0cab0c86b..25d0f21f14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -604,6 +604,7 @@ dependencies = [ "futures", "futures-bounded", "hex", + "http", "humantime", "indexmap 2.2.6", "insta", diff --git a/charts/evm-rollup/Chart.yaml b/charts/evm-rollup/Chart.yaml index ee87f843c7..173f79af77 100644 --- a/charts/evm-rollup/Chart.yaml +++ b/charts/evm-rollup/Chart.yaml @@ -16,7 +16,7 @@ type: application # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.11.0 +version: 0.11.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/evm-rollup/templates/configmap.yaml b/charts/evm-rollup/templates/configmap.yaml index 56aeec544b..59963451c2 100644 --- a/charts/evm-rollup/templates/configmap.yaml +++ b/charts/evm-rollup/templates/configmap.yaml @@ -8,11 +8,9 @@ data: {{- if (index .Values "celestia-node").enabled }} TOKEN_SERVER_URL: "{{ include "celestiaNode.service.addresses.token" (index .Subcharts "celestia-node") }}" ASTRIA_CONDUCTOR_CELESTIA_NODE_HTTP_URL: "{{ include "celestiaNode.service.addresses.rpc" (index .Subcharts "celestia-node") }}" - ASTRIA_CONDUCTOR_CELESTIA_NODE_WEBSOCKET_URL: "{{ include "celestiaNode.service.addresses.ws" (index .Subcharts "celestia-node") }}" {{- else }} TOKEN_SERVER_URL: "{{ .Values.config.celestia.token }}" ASTRIA_CONDUCTOR_CELESTIA_NODE_HTTP_URL: "{{ .Values.config.celestia.rpc }}" - ASTRIA_CONDUCTOR_CELESTIA_NODE_WEBSOCKET_URL: "{{ .Values.config.celestia.ws }}" {{- end }} ASTRIA_CONDUCTOR_EXECUTION_RPC_URL: "http://127.0.0.1:{{ .Values.ports.executionGRPC }}" ASTRIA_CONDUCTOR_EXECUTION_COMMIT_LEVEL: "{{ .Values.config.rollup.executionCommitLevel }}" @@ -32,7 +30,17 @@ data: OTEL_EXPORTER_OTLP_HEADERS: "{{ .Values.config.rollup.otel.otlpHeaders }}" OTEL_EXPORTER_OTLP_TRACE_HEADERS: "{{ .Values.config.rollup.otel.traceHeaders }}" {{- if not .Values.global.dev }} + + # This block is for removing `ASTRIA_CONDUCTOR_CELESTIA_NODE_WEBSOCKET_URL`. + # + {{- if (index .Values "celestia-node").enabled }} + ASTRIA_CONDUCTOR_CELESTIA_NODE_WEBSOCKET_URL: "{{ include "celestiaNode.service.addresses.ws" (index .Subcharts "celestia-node") }}" + {{- else }} + ASTRIA_CONDUCTOR_CELESTIA_NODE_WEBSOCKET_URL: "{{ .Values.config.celestia.ws }}" + {{- end }} + {{- else }} + ASTRIA_CONDUCTOR_CELESTIA_BLOCK_TIME_MS: "12000" {{- end }} --- apiVersion: v1 diff --git a/crates/astria-conductor/Cargo.toml b/crates/astria-conductor/Cargo.toml index 3c5b5aabcd..6b4139cceb 100644 --- a/crates/astria-conductor/Cargo.toml +++ b/crates/astria-conductor/Cargo.toml @@ -48,8 +48,10 @@ tryhard = { workspace = true } async-trait = "0.1.73" futures-bounded = "0.2.3" +http = "0.2.9" itertools = "0.12.1" pin-project-lite = "0.2" +tokio-stream = "0.1.14" tracing-futures = { version = "0.2.5", features = ["futures-03"] } [dev-dependencies] diff --git a/crates/astria-conductor/local.env.example b/crates/astria-conductor/local.env.example index 2a18662604..deb57d0619 100644 --- a/crates/astria-conductor/local.env.example +++ b/crates/astria-conductor/local.env.example @@ -2,6 +2,10 @@ # specification as defined here: # https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/protocol/exporter.md +# The Celestia block time in milliseconds. Used for periodically polling Celestia for its latest network height. +# 12000 milliseconds is the default Celestia block time. +ASTRIA_CONDUCTOR_CELESTIA_BLOCK_TIME_MS=12000 + # The bearer token to retrieve sequencer blocks as blobs from Celestia. # The token is obtained by running `celestia bridge auth ` # on the host running the celestia node. @@ -11,10 +15,6 @@ ASTRIA_CONDUCTOR_CELESTIA_BEARER_TOKEN="" # either http or https as scheme. ASTRIA_CONDUCTOR_CELESTIA_NODE_HTTP_URL="http://127.0.0.1:26658" -# The URL of the celestia node to subscribe to new network headers. This URL must contain -# either ws or wss as scheme. -ASTRIA_CONDUCTOR_CELESTIA_NODE_WEBSOCKET_URL="ws://127.0.0.1:26658" - # Execution RPC URL ASTRIA_CONDUCTOR_EXECUTION_RPC_URL="http://127.0.0.1:50051" diff --git a/crates/astria-conductor/src/celestia/builder.rs b/crates/astria-conductor/src/celestia/builder.rs index 93b3eaf761..eed5f2e98e 100644 --- a/crates/astria-conductor/src/celestia/builder.rs +++ b/crates/astria-conductor/src/celestia/builder.rs @@ -1,31 +1,41 @@ //! Boilerplate to construct a [`super::Reader`] via a type-state builder. -use celestia_client::celestia_types::nmt::Namespace; -use sequencer_client::HttpClient; +use std::time::Duration; + +use astria_eyre::eyre::{ + self, + WrapErr as _, +}; +use celestia_client::{ + celestia_types::nmt::Namespace, + jsonrpsee::http_client::HttpClient as CelestiaClient, +}; +use sequencer_client::HttpClient as SequencerClient; use tokio_util::sync::CancellationToken; -use super::Reader; -use crate::{ - celestia::block_verifier::BlockVerifier, - executor, +use super::{ + block_verifier::BlockVerifier, + latest_height_stream::stream_latest_heights, + Reader, }; +use crate::executor; pub(crate) struct Builder { + pub(crate) celestia_block_time: Duration, pub(crate) celestia_http_endpoint: String, - pub(crate) celestia_websocket_endpoint: String, pub(crate) celestia_token: String, pub(crate) executor: executor::Handle, - pub(crate) sequencer_cometbft_client: HttpClient, + pub(crate) sequencer_cometbft_client: SequencerClient, pub(crate) sequencer_namespace: Namespace, pub(crate) shutdown: CancellationToken, } impl Builder { /// Creates a new [`Reader`] instance, - pub(crate) fn build(self) -> Reader { + pub(crate) fn build(self) -> eyre::Result { let Self { + celestia_block_time, celestia_http_endpoint, - celestia_websocket_endpoint, celestia_token, executor, sequencer_cometbft_client, @@ -35,14 +45,36 @@ impl Builder { let block_verifier = BlockVerifier::new(sequencer_cometbft_client); - Reader { - executor, - celestia_http_endpoint, - celestia_ws_endpoint: celestia_websocket_endpoint, - celestia_auth_token: celestia_token, + let celestia_client = create_celestia_client(celestia_http_endpoint, &celestia_token) + .wrap_err("failed initializing client for Celestia HTTP RPC")?; + + let latest_celestia_heights = + stream_latest_heights(celestia_client.clone(), celestia_block_time); + + Ok(Reader { block_verifier, + celestia_client, + latest_celestia_heights, + executor, sequencer_namespace, shutdown, - } + }) } } + +fn create_celestia_client(endpoint: String, bearer_token: &str) -> eyre::Result { + use celestia_client::jsonrpsee::http_client::{ + HeaderMap, + HttpClientBuilder, + }; + let mut headers = HeaderMap::new(); + let auth_value = format!("Bearer {bearer_token}").parse().wrap_err( + "failed to construct Authorization header value from provided Celestia bearer token", + )?; + headers.insert(http::header::AUTHORIZATION, auth_value); + let client = HttpClientBuilder::default() + .set_headers(headers) + .build(endpoint) + .wrap_err("failed constructing Celestia JSONRPC HTTP Client")?; + Ok(client) +} diff --git a/crates/astria-conductor/src/celestia/latest_height_stream.rs b/crates/astria-conductor/src/celestia/latest_height_stream.rs new file mode 100644 index 0000000000..aad706a49b --- /dev/null +++ b/crates/astria-conductor/src/celestia/latest_height_stream.rs @@ -0,0 +1,63 @@ +use std::{ + pin::Pin, + time::Duration, +}; + +use astria_eyre::eyre::{ + Result, + WrapErr as _, +}; +use celestia_client::{ + celestia_rpc::HeaderClient as _, + jsonrpsee::http_client::HttpClient, +}; +use futures::{ + Future, + FutureExt as _, + Stream, + StreamExt as _, +}; +use tokio_stream::wrappers::IntervalStream; + +pub(super) fn stream_latest_heights( + client: HttpClient, + poll_period: Duration, +) -> LatestHeightStream { + let f = Box::new(move |_: tokio::time::Instant| { + let client = client.clone(); + async move { + client + .header_network_head() + .await + .wrap_err("failed to fetch network head") + .map(|header| header.height().value()) + } + .boxed() + }); + let mut interval = tokio::time::interval(poll_period); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + LatestHeightStream { + inner: IntervalStream::new(interval).then(f), + } +} + +type HeightFromHeaderFut = Pin> + Send>>; + +pub(super) struct LatestHeightStream { + inner: futures::stream::Then< + IntervalStream, + HeightFromHeaderFut, + Box HeightFromHeaderFut + Send>, + >, +} + +impl Stream for LatestHeightStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_next_unpin(cx) + } +} diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 9c1ab911ce..4e0f6fe673 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -5,7 +5,6 @@ use std::{ Context, Poll, }, - time::Duration, }; use astria_eyre::eyre::{ @@ -16,22 +15,8 @@ use astria_eyre::eyre::{ }; use celestia_client::{ celestia_namespace_v0_from_rollup_id, - celestia_rpc::{ - self, - Client, - }, - celestia_types::{ - nmt::Namespace, - ExtendedHeader, - }, - jsonrpsee::{ - core::{ - client::Subscription, - Error as JrpcError, - }, - http_client::HttpClient, - ws_client::WsClient, - }, + celestia_types::nmt::Namespace, + jsonrpsee::http_client::HttpClient as CelestiaClient, CelestiaClientExt as _, CelestiaSequencerBlob, }; @@ -75,7 +60,17 @@ use tracing::{ }; mod block_verifier; +mod builder; +mod latest_height_stream; +mod reporting; + use block_verifier::BlockVerifier; +pub(crate) use builder::Builder; +use latest_height_stream::LatestHeightStream; +use reporting::{ + ReportReconstructedBlocks, + ReportSequencerHeights, +}; use tracing_futures::Instrument; use crate::{ @@ -85,13 +80,6 @@ use crate::{ }, executor, }; -mod builder; -pub(crate) use builder::Builder; -mod reporting; -use reporting::{ - ReportReconstructedBlocks, - ReportSequencerHeights, -}; type StdError = dyn std::error::Error; @@ -123,20 +111,17 @@ impl GetSequencerHeight for ReconstructedBlock { } pub(crate) struct Reader { - /// The channel used to send messages to the executor task. - executor: executor::Handle, - - // The HTTP endpoint to fetch celestia blocks. - celestia_http_endpoint: String, + /// Validates sequencer blobs read from celestia against sequencer. + block_verifier: BlockVerifier, - // The WS endpoint to subscribe to the latest celestia headers and read heights. - celestia_ws_endpoint: String, + // Client to fetch heights and blocks from Celestia. + celestia_client: CelestiaClient, - // The bearer token to authenticate with the celestia node. - celestia_auth_token: String, + // A stream of the latest Celestia heights. + latest_celestia_heights: LatestHeightStream, - /// Validates sequencer blobs read from celestia against sequencer. - block_verifier: BlockVerifier, + /// The channel used to send messages to the executor task. + executor: executor::Handle, /// The celestia namespace sequencer blobs will be read from. sequencer_namespace: Namespace, @@ -168,37 +153,29 @@ impl Reader { "setting up celestia reader", ); - // XXX: The websocket client must be kept alive so that the subscription doesn't die. - // We bind to `_wsclient` because `_` will immediately drop the object. - let (mut _wsclient, mut headers) = - subscribe_to_celestia_headers(&self.celestia_ws_endpoint, &self.celestia_auth_token) - .await - .wrap_err("failed to subscribe to celestia headers")?; - - let latest_celestia_height = match headers.next().await { - Some(Ok(header)) => header.height(), + let latest_celestia_height = match self.latest_celestia_heights.next().await { + Some(Ok(height)) => height, Some(Err(e)) => { return Err(e).wrap_err("subscription to celestia header returned an error"); } None => bail!("celestia header subscription was terminated unexpectedly"), }; - debug!(height = %latest_celestia_height, "received latest height from celestia"); + debug!( + height = latest_celestia_height, + "received latest height from celestia" + ); // XXX: This block cache always starts at height 1, the default value for `Height`. let mut sequential_blocks = BlockCache::::with_next_height(initial_expected_sequencer_height) .wrap_err("failed constructing sequential block cache")?; - let http_client = - connect_to_celestia(&self.celestia_http_endpoint, &self.celestia_auth_token) - .await - .wrap_err("failed to connect to the Celestia node HTTP RPC")?; let mut block_stream = ReconstructedBlocksStream { track_heights: TrackHeights { reference_height: initial_celestia_height.value(), variance: celestia_variance, - last_observed: latest_celestia_height.value(), + last_observed: latest_celestia_height, next_height: initial_celestia_height.value(), }, // NOTE: Gives Celestia 600 seconds to respond. This seems reasonable because we need to @@ -206,7 +183,7 @@ impl Reader { // blobs. // XXX: This should probably have explicit retry logic instead of this futures map. in_progress: FuturesMap::new(std::time::Duration::from_secs(600), 10), - client: http_client, + client: self.celestia_client.clone(), verifier: self.block_verifier.clone(), sequencer_namespace: self.sequencer_namespace, rollup_namespace, @@ -222,8 +199,6 @@ impl Reader { let mut enqueued_block: Fuse>>> = future::Fuse::terminated(); - // Pending subcription to Celestia network headers. Set if the current subscription fails. - let mut resubscribing = Fuse::terminated(); let reason = loop { select!( biased; @@ -265,59 +240,32 @@ impl Reader { } } - new_subscription = &mut resubscribing, if !resubscribing.is_terminated() => { - match new_subscription { - Ok(new_subscription) => (_wsclient, headers) = new_subscription, - Err(e) => return Err(e).wrap_err("resubscribing to celestia headers ultimately failed"), - } - } - - maybe_header = headers.next(), if resubscribing.is_terminated() => { - let mut resubscribe = false; - - if block_stream.inner().is_exhausted() { - info!( - reference_height = block_stream.inner().track_heights.reference_height(), - variance = block_stream.inner().track_heights.variance(), - max_permitted_height = block_stream.inner().track_heights.max_permitted(), - "received a new header from Celestia, but the stream is exhausted and won't fetch past its permitted window", - ); - } - - match maybe_header { - Some(Ok(header)) => { - if !block_stream.inner_mut().update_latest_observed_height_if_greater(header.height().value()) { - info!("received a new Celestia header, but the height therein was already seen"); - }} - - Some(Err(JrpcError::ParseError(e))) => { - warn!( - error = &e as &StdError, - "failed to parse return value of header subscription", - ); + // Write the latest Celestia height to the block stream. + Some(res) = self.latest_celestia_heights.next() => { + match res { + Ok(height) => { + debug!(height, "received height from Celestia"); + if block_stream.inner_mut().update_latest_observed_height_if_greater(height) + && block_stream.inner().is_exhausted() + { + info!( + reference_height = block_stream.inner().track_heights.reference_height(), + variance = block_stream.inner().track_heights.variance(), + max_permitted_height = block_stream.inner().track_heights.max_permitted(), + "updated reference height, but the block stream is exhausted and won't fetch past its permitted window", + ); + } } - - Some(Err(e)) => { + Err(error) => { warn!( - error = &e as &StdError, - "Celestia header subscription failed, resubscribing", + %error, + "failed fetching latest height from sequencer; waiting until next tick", ); - resubscribe = true; } - - None => { - warn!("Celestia header subscription is unexpectedly exhausted, resubscribing"); - resubscribe = true; - } - } - if resubscribe { - resubscribing = subscribe_to_celestia_headers( - &self.celestia_ws_endpoint, - &self.celestia_auth_token, - ).boxed().fuse(); } } + // Pull the the next reconstructed block from the stream reading off of Celestia. Some(reconstructed) = block_stream.next() => { for block in reconstructed.blocks { if let Err(e) = sequential_blocks.insert(block) { @@ -405,7 +353,7 @@ pin_project! { in_progress: FuturesMap>, - client: HttpClient, + client: CelestiaClient, verifier: BlockVerifier, sequencer_namespace: Namespace, rollup_namespace: Namespace, @@ -527,7 +475,7 @@ impl Stream for ReconstructedBlocksStream { err )] async fn fetch_blocks_at_celestia_height( - client: HttpClient, + client: CelestiaClient, verifier: BlockVerifier, celestia_height: u64, sequencer_namespace: Namespace, @@ -603,7 +551,7 @@ async fn fetch_blocks_at_celestia_height( err, )] async fn process_sequencer_blob( - client: HttpClient, + client: CelestiaClient, verifier: BlockVerifier, celestia_height: u64, rollup_namespace: Namespace, @@ -642,86 +590,6 @@ async fn process_sequencer_blob( }) } -#[instrument(err)] -async fn subscribe_to_celestia_headers( - endpoint: &str, - token: &str, -) -> eyre::Result<(WsClient, Subscription)> { - use celestia_client::celestia_rpc::HeaderClient as _; - - async fn connect(endpoint: &str, token: &str) -> Result { - let Client::Ws(client) = Client::new(endpoint, Some(token)).await? else { - panic!("expected a celestia Websocket client but got a HTTP client"); - }; - Ok(client) - } - - let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(5)) - .on_retry( - |attempt: u32, next_delay: Option, error: &eyre::Report| { - let wait_duration = next_delay - .map(humantime::format_duration) - .map(tracing::field::display); - warn!( - attempt, - wait_duration, - %error, - "attempt to connect to subscribe to Celestia headers failed; retrying after backoff", - ); - futures::future::ready(()) - }, - ); - - tryhard::retry_fn(|| async move { - let client = connect(endpoint, token) - .await - .wrap_err("failed to connect to Celestia Websocket RPC")?; - let headers = client - .header_subscribe() - .await - .wrap_err("failed to subscribe to Celestia headers")?; - Ok((client, headers)) - }) - .with_config(retry_config) - .await - .wrap_err("retry attempts exhausted; bailing") -} - -#[instrument(err)] -async fn connect_to_celestia(endpoint: &str, token: &str) -> eyre::Result { - async fn connect(endpoint: &str, token: &str) -> Result { - let Client::Http(client) = Client::new(endpoint, Some(token)).await? else { - panic!("expected a celestia HTTP client but got a Websocket client"); - }; - Ok(client) - } - - let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) - .exponential_backoff(Duration::from_millis(100)) - .max_delay(Duration::from_secs(5)) - .on_retry( - |attempt: u32, next_delay: Option, error: &celestia_rpc::Error| { - let wait_duration = next_delay - .map(humantime::format_duration) - .map(tracing::field::display); - warn!( - attempt, - wait_duration, - %error, - "attempt to connect to Celestia HTTP RPC failed; retrying after backoff", - ); - futures::future::ready(()) - }, - ); - - tryhard::retry_fn(|| connect(endpoint, token)) - .with_config(retry_config) - .await - .wrap_err("retry attempts exhausted; bailing") -} - #[cfg(test)] mod tests { use super::TrackHeights; diff --git a/crates/astria-conductor/src/conductor.rs b/crates/astria-conductor/src/conductor.rs index 3cf53d5ca0..72d046971c 100644 --- a/crates/astria-conductor/src/conductor.rs +++ b/crates/astria-conductor/src/conductor.rs @@ -103,14 +103,15 @@ impl Conductor { let reader = celestia::Builder { celestia_http_endpoint: cfg.celestia_node_http_url, - celestia_websocket_endpoint: cfg.celestia_node_websocket_url, celestia_token: cfg.celestia_bearer_token, + celestia_block_time: Duration::from_millis(cfg.celestia_block_time_ms), executor: executor_handle.clone(), sequencer_cometbft_client: sequencer_cometbft_client.clone(), sequencer_namespace, shutdown: shutdown.clone(), } - .build(); + .build() + .wrap_err("failed to build Celestia Reader")?; tasks.spawn(Self::CELESTIA, reader.run_until_stopped()); }; diff --git a/crates/astria-conductor/src/config.rs b/crates/astria-conductor/src/config.rs index fa47aede44..73ee517ad1 100644 --- a/crates/astria-conductor/src/config.rs +++ b/crates/astria-conductor/src/config.rs @@ -27,12 +27,12 @@ impl CommitLevel { #[allow(clippy::struct_excessive_bools)] #[derive(Debug, Serialize, Deserialize)] pub struct Config { + /// The block time of Celestia network in milliseconds. + pub celestia_block_time_ms: u64, + /// URL of the Celestia Node HTTP RPC pub celestia_node_http_url: String, - /// URL of the Celestia Node Websocket RPC - pub celestia_node_websocket_url: String, - /// The JWT bearer token supplied with each jsonrpc call pub celestia_bearer_token: String,