Skip to content

Commit

Permalink
Merge pull request #63 from Blobscan/fix/reorg-blocks-handling
Browse files Browse the repository at this point in the history
fix: revamp reorg handling logic
  • Loading branch information
PJColombo authored Mar 10, 2024
2 parents da6ee5d + 6a5cd30 commit 0d974fc
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 113 deletions.
16 changes: 6 additions & 10 deletions src/clients/beacon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ impl BeaconClient {
}

pub async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
let block_id = match block_id {
BlockId::Hash(hash) => format!("0x{:x}", hash),
BlockId::Slot(slot) => slot.to_string(),
block_id => block_id.to_string(),
};

let path = format!("v2/beacon/blocks/{block_id}");
let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

json_get!(&self.client, url, BlockResponse, self.exp_backoff.clone()).map(|res| match res {
Expand All @@ -54,7 +48,7 @@ impl BeaconClient {
}

pub async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
let path = format!("v1/beacon/headers/{block_id}");
let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

json_get!(
Expand All @@ -70,7 +64,9 @@ impl BeaconClient {
}

pub async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
let path = format!("v1/beacon/blob_sidecars/{block_id}");
let path = format!("v1/beacon/blob_sidecars/{}", {
block_id.to_detailed_string()
});
let url = self.base_url.join(path.as_str())?;

json_get!(&self.client, url, BlobsResponse, self.exp_backoff.clone()).map(|res| match res {
Expand All @@ -79,7 +75,7 @@ impl BeaconClient {
})
}

pub fn subscribe_to_events(&self, topics: Vec<Topic>) -> ClientResult<EventSource> {
pub fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource> {
let topics = topics
.iter()
.map(|topic| topic.into())
Expand Down
41 changes: 37 additions & 4 deletions src/clients/beacon/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ pub enum BlockId {
}

#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub enum Topic {
Head,
FinalizedCheckpoint,
ChainReorg,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -85,7 +87,17 @@ pub struct BlockHeaderMessage {
}

#[derive(Deserialize, Debug)]
pub struct HeadBlockEventData {
pub struct ChainReorgEventData {
pub old_head_block: H256,
pub new_head_block: H256,
#[serde(deserialize_with = "deserialize_number")]
pub slot: u32,
#[serde(deserialize_with = "deserialize_number")]
pub depth: u32,
}

#[derive(Deserialize, Debug)]
pub struct HeadEventData {
#[serde(deserialize_with = "deserialize_number")]
pub slot: u32,
pub block: H256,
Expand All @@ -105,6 +117,17 @@ where
value.parse::<u32>().map_err(serde::de::Error::custom)
}

impl BlockId {
pub fn to_detailed_string(&self) -> String {
match self {
BlockId::Head => String::from("head"),
BlockId::Finalized => String::from("finalized"),
BlockId::Slot(slot) => slot.to_string(),
BlockId::Hash(hash) => format!("0x{:x}", hash),
}
}
}

impl fmt::Display for BlockId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand All @@ -126,7 +149,16 @@ impl FromStr for BlockId {
_ => match s.parse::<u32>() {
Ok(num) => Ok(BlockId::Slot(num)),
Err(_) => {
Err("Invalid block ID. Expected 'head', 'finalized' or a number.".to_string())
if s.starts_with("0x") {
match H256::from_str(s) {
Ok(hash) => Ok(BlockId::Hash(hash)),
Err(_) => Err(format!("Invalid block ID hash: {s}")),
}
} else {
Err(
format!("Invalid block ID: {s}. Expected 'head', 'finalized', a hash or a number."),
)
}
}
},
}
Expand All @@ -136,14 +168,15 @@ impl FromStr for BlockId {
impl From<&Topic> for String {
fn from(value: &Topic) -> Self {
match value {
Topic::ChainReorg => String::from("chain_reorg"),
Topic::Head => String::from("head"),
Topic::FinalizedCheckpoint => String::from("finalized_checkpoint"),
}
}
}

impl From<HeadBlockEventData> for BlockData {
fn from(event_data: HeadBlockEventData) -> Self {
impl From<HeadEventData> for BlockData {
fn from(event_data: HeadEventData) -> Self {
Self {
root: event_data.block,
slot: event_data.slot,
Expand Down
18 changes: 11 additions & 7 deletions src/clients/blobscan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ use backoff::ExponentialBackoff;
use chrono::TimeDelta;
use reqwest::{Client, Url};

use crate::{clients::common::ClientResult, json_get, json_put};
use crate::{
clients::{blobscan::types::ReorgedSlotsResponse, common::ClientResult},
json_get, json_put,
};

use self::{
jwt_manager::{Config as JWTManagerConfig, JWTManager},
types::{
Blob, Block, BlockchainSyncState, BlockchainSyncStateRequest, BlockchainSyncStateResponse,
IndexRequest, ReorgedSlotRequest, Transaction,
IndexRequest, ReorgedSlotsRequest, Transaction,
},
};

Expand Down Expand Up @@ -64,14 +67,15 @@ impl BlobscanClient {
json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn handle_reorged_slot(&self, slot: u32) -> ClientResult<()> {
let url = self.base_url.join("indexer/reorged-slot")?;
pub async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
let url = self.base_url.join("indexer/reorged-slots")?;
let token = self.jwt_manager.get_token()?;
let req = ReorgedSlotRequest {
new_head_slot: slot,
let req = ReorgedSlotsRequest {
reorged_slots: slots.to_owned(),
};

json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
json_put!(&self.client, url, ReorgedSlotsResponse, token, &req)
.map(|res: Option<ReorgedSlotsResponse>| res.unwrap().total_updated_slots)
}

pub async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
Expand Down
10 changes: 8 additions & 2 deletions src/clients/blobscan/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,14 @@ pub struct IndexRequest {

#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ReorgedSlotRequest {
pub new_head_slot: u32,
pub struct ReorgedSlotsRequest {
pub reorged_slots: Vec<u32>,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ReorgedSlotsResponse {
pub total_updated_slots: u32,
}

impl fmt::Debug for Blob {
Expand Down
7 changes: 5 additions & 2 deletions src/clients/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ macro_rules! json_get {
/// Make a PUT request sending JSON.
/// if JSON deser fails, emit a `WARN` level tracing event
macro_rules! json_put {
($client:expr, $url:expr, $auth_token:expr, $body:expr) => {{
($client:expr, $url:expr, $auth_token:expr, $body:expr) => {
json_put!($client, $url, (), $auth_token, $body)
};
($client:expr, $url:expr, $expected:ty, $auth_token:expr, $body:expr) => {{
let url = $url.clone();
let body = format!("{:?}", $body);

Expand Down Expand Up @@ -123,7 +126,7 @@ macro_rules! json_put {
};

let text = resp.text().await?;
let result: $crate::clients::common::ClientResponse<_> = text.parse()?;
let result: $crate::clients::common::ClientResponse<$expected> = text.parse()?;

if result.is_err() {
tracing::warn!(
Expand Down
15 changes: 9 additions & 6 deletions src/env.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
use envy::Error::MissingValue;
use serde::Deserialize;

use crate::network::Network;

#[derive(Deserialize, Debug)]
pub struct Environment {
#[serde(default = "default_network")]
pub network_name: Network,
#[serde(default = "default_blobscan_api_endpoint")]
pub blobscan_api_endpoint: String,
#[serde(default = "default_beacon_node_endpoint")]
pub beacon_node_endpoint: String,
#[serde(default = "default_execution_node_endpoint")]
pub execution_node_endpoint: String,
pub secret_key: String,
#[serde(default = "default_dencun_fork_slot")]
pub dencun_fork_slot: u32,
pub dencun_fork_slot: Option<u32>,
pub sentry_dsn: Option<String>,
}

fn default_network() -> Network {
Network::Devnet
}

fn default_blobscan_api_endpoint() -> String {
"http://localhost:3001".to_string()
}
Expand All @@ -27,10 +34,6 @@ fn default_execution_node_endpoint() -> String {
"http://localhost:8545".to_string()
}

fn default_dencun_fork_slot() -> u32 {
0
}

impl Environment {
pub fn from_env() -> Result<Self, envy::Error> {
match envy::from_env::<Environment>() {
Expand Down
Loading

0 comments on commit 0d974fc

Please sign in to comment.