Skip to content

Commit

Permalink
refactor(conductor): remove websocket subscription to Celestia height…
Browse files Browse the repository at this point in the history
…s in favor of periodic polling
  • Loading branch information
SuperFluffy committed Mar 20, 2024
1 parent da01133 commit fb789c4
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 208 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/astria-conductor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ name = "astria-conductor"
[dependencies]
async-trait = "0.1.73"
futures-bounded = "0.2.3"
http = "0.2.9"
itertools = "0.12.1"
pin-project-lite = "0.2"
tokio-stream = "0.1.14"
tracing-futures = { version = "0.2.5", features = ["futures-03"] }

base64 = { workspace = true }
Expand Down Expand Up @@ -50,7 +53,6 @@ sequencer-client = { package = "astria-sequencer-client", path = "../astria-sequ
telemetry = { package = "astria-telemetry", path = "../astria-telemetry", features = [
"display",
] }
itertools = "0.12.1"

[dev-dependencies]
jsonrpsee = { workspace = true, features = ["server"] }
Expand Down
8 changes: 4 additions & 4 deletions crates/astria-conductor/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
# specification as defined here:
# https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/protocol/exporter.md

# The Celestia block time in milliseconds. Used for periodically polling Celestia for its latest network height.
# 12000 milliseconds is the default Celestia block time.
ASTRIA_CONDUCTOR_CELESTIA_BLOCK_TIME_MS=12000

# The bearer token to retrieve sequencer blocks as blobs from Celestia.
# The token is obtained by running `celestia bridge auth <permissions>`
# on the host running the celestia node.
Expand All @@ -11,10 +15,6 @@ ASTRIA_CONDUCTOR_CELESTIA_BEARER_TOKEN="<JWT Bearer token>"
# either http or https as scheme.
ASTRIA_CONDUCTOR_CELESTIA_NODE_HTTP_URL="http://127.0.0.1:26658"

# The URL of the celestia node to subscribe to new network headers. This URL must contain
# either ws or wss as scheme.
ASTRIA_CONDUCTOR_CELESTIA_NODE_WEBSOCKET_URL="ws://127.0.0.1:26658"

# Execution RPC URL
ASTRIA_CONDUCTOR_EXECUTION_RPC_URL="http://127.0.0.1:50051"

Expand Down
64 changes: 48 additions & 16 deletions crates/astria-conductor/src/celestia/builder.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
//! Boilerplate to construct a [`super::Reader`] via a type-state builder.
use celestia_client::celestia_types::nmt::Namespace;
use sequencer_client::HttpClient;
use std::time::Duration;

use astria_eyre::eyre::{
self,
WrapErr as _,
};
use celestia_client::{
celestia_types::nmt::Namespace,
jsonrpsee::http_client::HttpClient as CelestiaClient,
};
use sequencer_client::HttpClient as SequencerClient;
use tokio_util::sync::CancellationToken;

use super::Reader;
use crate::{
celestia::block_verifier::BlockVerifier,
executor,
use super::{
block_verifier::BlockVerifier,
latest_height_stream::stream_latest_heights,
Reader,
};
use crate::executor;

pub(crate) struct Builder {
pub(crate) celestia_block_time: Duration,
pub(crate) celestia_http_endpoint: String,
pub(crate) celestia_websocket_endpoint: String,
pub(crate) celestia_token: String,
pub(crate) executor: executor::Handle,
pub(crate) sequencer_cometbft_client: HttpClient,
pub(crate) sequencer_cometbft_client: SequencerClient,
pub(crate) sequencer_namespace: Namespace,
pub(crate) shutdown: CancellationToken,
}

impl Builder {
/// Creates a new [`Reader`] instance,
pub(crate) fn build(self) -> Reader {
pub(crate) fn build(self) -> eyre::Result<Reader> {
let Self {
celestia_block_time,
celestia_http_endpoint,
celestia_websocket_endpoint,
celestia_token,
executor,
sequencer_cometbft_client,
Expand All @@ -35,14 +45,36 @@ impl Builder {

let block_verifier = BlockVerifier::new(sequencer_cometbft_client);

Reader {
executor,
celestia_http_endpoint,
celestia_ws_endpoint: celestia_websocket_endpoint,
celestia_auth_token: celestia_token,
let celestia_client = create_celestia_client(celestia_http_endpoint, &celestia_token)
.wrap_err("failed initializing client for Celestia HTTP RPC")?;

let latest_celestia_heights =
stream_latest_heights(celestia_client.clone(), celestia_block_time);

Ok(Reader {
block_verifier,
celestia_client,
latest_celestia_heights,
executor,
sequencer_namespace,
shutdown,
}
})
}
}

fn create_celestia_client(endpoint: String, bearer_token: &str) -> eyre::Result<CelestiaClient> {
use celestia_client::jsonrpsee::http_client::{
HeaderMap,
HttpClientBuilder,
};
let mut headers = HeaderMap::new();
let auth_value = format!("Bearer {bearer_token}").parse().wrap_err(
"failed to construct Authorization header value from provided Celestia bearer token",
)?;
headers.insert(http::header::AUTHORIZATION, auth_value);
let client = HttpClientBuilder::default()
.set_headers(headers)
.build(endpoint)
.wrap_err("failed constructing Celestia JSONRPC HTTP Client")?;
Ok(client)
}
63 changes: 63 additions & 0 deletions crates/astria-conductor/src/celestia/latest_height_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::{
pin::Pin,
time::Duration,
};

use astria_eyre::eyre::{
Result,
WrapErr as _,
};
use celestia_client::{
celestia_rpc::HeaderClient as _,
jsonrpsee::http_client::HttpClient,
};
use futures::{
Future,
FutureExt as _,
Stream,
StreamExt as _,
};
use tokio_stream::wrappers::IntervalStream;

pub(super) fn stream_latest_heights(
client: HttpClient,
poll_period: Duration,
) -> LatestHeightStream {
let f = Box::new(move |_: tokio::time::Instant| {
let client = client.clone();
async move {
client
.header_network_head()
.await
.wrap_err("failed to fetch network head")
.map(|header| header.height().value())
}
.boxed()
});
let mut interval = tokio::time::interval(poll_period);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
LatestHeightStream {
inner: IntervalStream::new(interval).then(f),
}
}

type HeightFromHeaderFut = Pin<Box<dyn Future<Output = Result<u64>> + Send>>;

pub(super) struct LatestHeightStream {
inner: futures::stream::Then<
IntervalStream,
HeightFromHeaderFut,
Box<dyn FnMut(tokio::time::Instant) -> HeightFromHeaderFut + Send>,
>,
}

impl Stream for LatestHeightStream {
type Item = Result<u64>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
Loading

0 comments on commit fb789c4

Please sign in to comment.