Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: revamp reorg handling logic #63

Merged
merged 11 commits into from
Mar 10, 2024
Merged
16 changes: 6 additions & 10 deletions src/clients/beacon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ impl BeaconClient {
}

pub async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
let block_id = match block_id {
BlockId::Hash(hash) => format!("0x{:x}", hash),
BlockId::Slot(slot) => slot.to_string(),
block_id => block_id.to_string(),
};

let path = format!("v2/beacon/blocks/{block_id}");
let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

json_get!(&self.client, url, BlockResponse, self.exp_backoff.clone()).map(|res| match res {
Expand All @@ -54,7 +48,7 @@ impl BeaconClient {
}

pub async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
let path = format!("v1/beacon/headers/{block_id}");
let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

json_get!(
Expand All @@ -70,7 +64,9 @@ impl BeaconClient {
}

pub async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
let path = format!("v1/beacon/blob_sidecars/{block_id}");
let path = format!("v1/beacon/blob_sidecars/{}", {
block_id.to_detailed_string()
});
let url = self.base_url.join(path.as_str())?;

json_get!(&self.client, url, BlobsResponse, self.exp_backoff.clone()).map(|res| match res {
Expand All @@ -79,7 +75,7 @@ impl BeaconClient {
})
}

pub fn subscribe_to_events(&self, topics: Vec<Topic>) -> ClientResult<EventSource> {
pub fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource> {
let topics = topics
.iter()
.map(|topic| topic.into())
Expand Down
41 changes: 37 additions & 4 deletions src/clients/beacon/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ pub enum BlockId {
}

#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub enum Topic {
Head,
FinalizedCheckpoint,
ChainReorg,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -85,7 +87,17 @@ pub struct BlockHeaderMessage {
}

#[derive(Deserialize, Debug)]
pub struct HeadBlockEventData {
pub struct ChainReorgEventData {
pub old_head_block: H256,
pub new_head_block: H256,
#[serde(deserialize_with = "deserialize_number")]
pub slot: u32,
#[serde(deserialize_with = "deserialize_number")]
pub depth: u32,
}

#[derive(Deserialize, Debug)]
pub struct HeadEventData {
#[serde(deserialize_with = "deserialize_number")]
pub slot: u32,
pub block: H256,
Expand All @@ -105,6 +117,17 @@ where
value.parse::<u32>().map_err(serde::de::Error::custom)
}

impl BlockId {
pub fn to_detailed_string(&self) -> String {
match self {
BlockId::Head => String::from("head"),
BlockId::Finalized => String::from("finalized"),
BlockId::Slot(slot) => slot.to_string(),
BlockId::Hash(hash) => format!("0x{:x}", hash),
}
}
}

impl fmt::Display for BlockId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand All @@ -126,7 +149,16 @@ impl FromStr for BlockId {
_ => match s.parse::<u32>() {
Ok(num) => Ok(BlockId::Slot(num)),
Err(_) => {
Err("Invalid block ID. Expected 'head', 'finalized' or a number.".to_string())
if s.starts_with("0x") {
match H256::from_str(s) {
Ok(hash) => Ok(BlockId::Hash(hash)),
Err(_) => Err(format!("Invalid block ID hash: {s}")),
}
} else {
Err(
format!("Invalid block ID: {s}. Expected 'head', 'finalized', a hash or a number."),
)
}
}
},
}
Expand All @@ -136,14 +168,15 @@ impl FromStr for BlockId {
impl From<&Topic> for String {
fn from(value: &Topic) -> Self {
match value {
Topic::ChainReorg => String::from("chain_reorg"),
Topic::Head => String::from("head"),
Topic::FinalizedCheckpoint => String::from("finalized_checkpoint"),
}
}
}

impl From<HeadBlockEventData> for BlockData {
fn from(event_data: HeadBlockEventData) -> Self {
impl From<HeadEventData> for BlockData {
fn from(event_data: HeadEventData) -> Self {
Self {
root: event_data.block,
slot: event_data.slot,
Expand Down
18 changes: 11 additions & 7 deletions src/clients/blobscan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ use backoff::ExponentialBackoff;
use chrono::TimeDelta;
use reqwest::{Client, Url};

use crate::{clients::common::ClientResult, json_get, json_put};
use crate::{
clients::{blobscan::types::ReorgedSlotsResponse, common::ClientResult},
json_get, json_put,
};

use self::{
jwt_manager::{Config as JWTManagerConfig, JWTManager},
types::{
Blob, Block, BlockchainSyncState, BlockchainSyncStateRequest, BlockchainSyncStateResponse,
IndexRequest, ReorgedSlotRequest, Transaction,
IndexRequest, ReorgedSlotsRequest, Transaction,
},
};

Expand Down Expand Up @@ -64,14 +67,15 @@ impl BlobscanClient {
json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn handle_reorged_slot(&self, slot: u32) -> ClientResult<()> {
let url = self.base_url.join("indexer/reorged-slot")?;
pub async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
let url = self.base_url.join("indexer/reorged-slots")?;
let token = self.jwt_manager.get_token()?;
let req = ReorgedSlotRequest {
new_head_slot: slot,
let req = ReorgedSlotsRequest {
reorged_slots: slots.to_owned(),
};

json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
json_put!(&self.client, url, ReorgedSlotsResponse, token, &req)
.map(|res: Option<ReorgedSlotsResponse>| res.unwrap().total_updated_slots)
}

pub async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
Expand Down
10 changes: 8 additions & 2 deletions src/clients/blobscan/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,14 @@ pub struct IndexRequest {

#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ReorgedSlotRequest {
pub new_head_slot: u32,
pub struct ReorgedSlotsRequest {
pub reorged_slots: Vec<u32>,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ReorgedSlotsResponse {
pub total_updated_slots: u32,
}

impl fmt::Debug for Blob {
Expand Down
7 changes: 5 additions & 2 deletions src/clients/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ macro_rules! json_get {
/// Make a PUT request sending JSON.
/// if JSON deser fails, emit a `WARN` level tracing event
macro_rules! json_put {
($client:expr, $url:expr, $auth_token:expr, $body:expr) => {{
($client:expr, $url:expr, $auth_token:expr, $body:expr) => {
json_put!($client, $url, (), $auth_token, $body)
};
($client:expr, $url:expr, $expected:ty, $auth_token:expr, $body:expr) => {{
let url = $url.clone();
let body = format!("{:?}", $body);

Expand Down Expand Up @@ -123,7 +126,7 @@ macro_rules! json_put {
};

let text = resp.text().await?;
let result: $crate::clients::common::ClientResponse<_> = text.parse()?;
let result: $crate::clients::common::ClientResponse<$expected> = text.parse()?;

if result.is_err() {
tracing::warn!(
Expand Down
15 changes: 9 additions & 6 deletions src/env.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
use envy::Error::MissingValue;
use serde::Deserialize;

use crate::types::Network;

#[derive(Deserialize, Debug)]
pub struct Environment {
#[serde(default = "default_network")]
pub network_name: Network,
#[serde(default = "default_blobscan_api_endpoint")]
pub blobscan_api_endpoint: String,
#[serde(default = "default_beacon_node_endpoint")]
pub beacon_node_endpoint: String,
#[serde(default = "default_execution_node_endpoint")]
pub execution_node_endpoint: String,
pub secret_key: String,
#[serde(default = "default_dencun_fork_slot")]
pub dencun_fork_slot: u32,
pub dencun_fork_slot: Option<u32>,
pub sentry_dsn: Option<String>,
}

fn default_network() -> Network {
Network::Devnet
}

fn default_blobscan_api_endpoint() -> String {
"http://localhost:3001".to_string()
}
Expand All @@ -27,10 +34,6 @@ fn default_execution_node_endpoint() -> String {
"http://localhost:8545".to_string()
}

fn default_dencun_fork_slot() -> u32 {
0
}

impl Environment {
pub fn from_env() -> Result<Self, envy::Error> {
match envy::from_env::<Environment>() {
Expand Down
Loading
Loading