Skip to content

Commit

Permalink
fix: improve reorg handling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
PJColombo committed Feb 3, 2025
1 parent bff7882 commit b6947ef
Show file tree
Hide file tree
Showing 9 changed files with 378 additions and 200 deletions.
6 changes: 5 additions & 1 deletion src/clients/beacon/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub struct BlockBody {
#[derive(Deserialize, Debug)]
pub struct BlockMessage {
pub body: BlockBody,
pub parent_root: B256,
#[serde(deserialize_with = "deserialize_number")]
pub slot: u32,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -62,7 +65,7 @@ pub struct BlockHeaderResponse {
pub data: BlockHeaderData,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct BlockHeader {
pub root: B256,
pub parent_root: B256,
Expand Down Expand Up @@ -90,6 +93,7 @@ pub struct BlockHeaderMessage {
pub struct HeadEventData {
#[serde(deserialize_with = "deserialize_number")]
pub slot: u32,
#[allow(dead_code)]
pub block: B256,
}

Expand Down
30 changes: 30 additions & 0 deletions src/clients/blobscan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::fmt::Debug;

use alloy::primitives::B256;
use async_trait::async_trait;
use backoff::ExponentialBackoff;
use chrono::TimeDelta;
use reqwest::{Client, Url};

#[cfg(test)]
use mockall::automock;
use types::{BlobscanBlock, ReorgedBlocksRequestBody};

use crate::{
clients::{blobscan::types::ReorgedSlotsResponse, common::ClientResult},
Expand Down Expand Up @@ -37,7 +39,13 @@ pub trait CommonBlobscanClient: Send + Sync + Debug {
transactions: Vec<Transaction>,
blobs: Vec<Blob>,
) -> ClientResult<()>;
async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>>;
async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32>;
async fn handle_reorg(
&self,
rewinded_blocks: Vec<B256>,
forwarded_blocks: Vec<B256>,
) -> ClientResult<()>;
async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()>;
async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>>;
}
Expand Down Expand Up @@ -93,6 +101,12 @@ impl CommonBlobscanClient for BlobscanClient {
json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>> {
let url = self.base_url.join(&format!("block/{}?slot=true", slot))?;

json_get!(&self.client, url, BlobscanBlock, self.exp_backoff.clone())
}

async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult<u32> {
let url = self.base_url.join("indexer/reorged-slots")?;
let token = self.jwt_manager.get_token()?;
Expand All @@ -104,6 +118,22 @@ impl CommonBlobscanClient for BlobscanClient {
.map(|res: Option<ReorgedSlotsResponse>| res.unwrap().total_updated_slots)
}

async fn handle_reorg(
&self,
rewinded_blocks: Vec<B256>,
forwarded_blocks: Vec<B256>,
) -> ClientResult<()> {
let url = self.base_url.join("indexer/reorged-blocks")?;
let token = self.jwt_manager.get_token()?;

let req = ReorgedBlocksRequestBody {
forwarded_blocks,
rewinded_blocks,
};

json_put!(&self.client, url, ReorgedBlocksRequestBody, token, &req).map(|_| ())
}

async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
let url = self.base_url.join("blockchain-sync-state")?;
let token = self.jwt_manager.get_token()?;
Expand Down
14 changes: 14 additions & 0 deletions src/clients/blobscan/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ use serde::{Deserialize, Serialize};

use crate::{clients::beacon::types::Blob as BeaconBlob, utils::web3::calculate_versioned_hash};

#[derive(Serialize, Deserialize, Debug)]
pub struct BlobscanBlock {
pub hash: B256,
pub number: u32,
pub slot: u32,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Block {
Expand Down Expand Up @@ -92,6 +99,13 @@ pub struct ReorgedSlotsRequest {
pub reorged_slots: Vec<u32>,
}

#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ReorgedBlocksRequestBody {
pub forwarded_blocks: Vec<B256>,
pub rewinded_blocks: Vec<B256>,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ReorgedSlotsResponse {
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub enum IndexerError {
#[error(transparent)]
SyncingTaskError(#[from] IndexingError),
#[error("failed to retrieve blobscan's sync state")]
BlobscanSyncStateRetrievalError(#[source] ClientError),
BlobscanSyncStateRetrievalError(#[from] ClientError),
#[error("failed to send syncing task message")]
SyncingTaskMessageSendFailure(#[from] SendError<IndexerTaskMessage>),
}
Expand Down
131 changes: 23 additions & 108 deletions src/indexer/event_handlers/head.rs
Original file line number Diff line number Diff line change
@@ -1,147 +1,62 @@
use std::cmp;

use alloy::{primitives::B256, transports::Transport};
use tracing::info;

use crate::{
clients::{
beacon::types::{BlockHeader, BlockId, HeadEventData},
blobscan::types::BlockchainSyncState,
common::ClientError,
},
context::CommonContext,
clients::beacon::types::{BlockId, HeadEventData},
synchronizer::{error::SynchronizerError, CommonSynchronizer},
};

#[derive(Debug, thiserror::Error)]
pub enum HeadEventHandlerError {
#[error(transparent)]
EventDeserializationFailure(#[from] serde_json::Error),
#[error("failed to retrieve header for block \"{0}\"")]
BlockHeaderRetrievalError(BlockId, #[source] ClientError),
#[error("header for block \"{0}\" not found")]
BlockHeaderNotFound(BlockId),
#[error("failed to index head block")]
BlockSyncedError(#[from] SynchronizerError),
#[error("failed to handle reorged slots")]
BlobscanReorgedSlotsFailure(#[source] ClientError),
#[error("failed to update blobscan's sync state")]
BlobscanSyncStateUpdateError(#[source] ClientError),
}

pub struct HeadEventHandler<T> {
context: Box<dyn CommonContext<T>>,
pub struct HeadEventHandler {
synchronizer: Box<dyn CommonSynchronizer>,
start_block_id: BlockId,
last_block_hash: Option<B256>,
is_first_event: bool,
custom_start_block_id: Option<BlockId>,
}

impl<T> HeadEventHandler<T>
where
T: Transport + Send + Sync + 'static,
{
impl HeadEventHandler {
pub fn new(
context: Box<dyn CommonContext<T>>,
synchronizer: Box<dyn CommonSynchronizer>,
start_block_id: BlockId,
custom_start_block_id: Option<BlockId>,
) -> Self {
HeadEventHandler {
context,
synchronizer,
start_block_id,
last_block_hash: None,
is_first_event: true,
custom_start_block_id,
}
}

pub async fn handle(&mut self, event_data: String) -> Result<(), HeadEventHandlerError> {
let head_block_data = serde_json::from_str::<HeadEventData>(&event_data)?;
let head_slot = head_block_data.slot;

let head_block_slot = head_block_data.slot;
let head_block_hash = head_block_data.block;
// If this is the first event being processed, ensure the synchronizer is fully up to date
if self.is_first_event {
self.is_first_event = false;

let head_block_id = BlockId::Slot(head_block_data.slot);
let initial_block_id = if self.last_block_hash.is_none() {
self.start_block_id.clone()
} else {
head_block_id.clone()
};
let start_block_id = self.custom_start_block_id.clone().or(self
.synchronizer
.get_last_synced_block()
.map(|block| (block.slot + 1).into()));

let head_block_header = self.get_block_header(head_block_id).await?;
if let Some(start_block_id) = start_block_id {
if self.custom_start_block_id.is_some() {
self.synchronizer.clear_last_synced_block();
}

if let Some(last_block_hash) = self.last_block_hash {
if last_block_hash != head_block_header.parent_root {
let parent_block_header = self
.get_block_header(head_block_header.parent_root.into())
self.synchronizer
.sync_blocks(start_block_id, head_slot.into())
.await?;
let parent_block_slot = parent_block_header.slot;
let reorg_start_slot = parent_block_slot + 1;
let reorg_final_slot = head_block_slot;
let reorged_slots = (reorg_start_slot..reorg_final_slot).collect::<Vec<u32>>();

let result: Result<(), HeadEventHandlerError> = async {
let total_updated_slots = self.context
.blobscan_client()
.handle_reorged_slots(reorged_slots.as_slice())
.await
.map_err(HeadEventHandlerError::BlobscanReorgedSlotsFailure)?;


info!(slot=head_block_slot, "Reorganization detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots);

// Re-index parent block as it may be mark as reorged and not indexed
self.synchronizer
.run(
parent_block_slot.into(),
(parent_block_slot + 1).into(),
)
.await?;

Ok(())
}
.await;

if let Err(err) = result {
// If an error occurred while handling the reorg try to update the latest synced slot to the last known slot before the reorg
self.context
.blobscan_client()
.update_sync_state(BlockchainSyncState {
last_finalized_block: None,
last_lower_synced_slot: None,
last_upper_synced_slot: Some(cmp::max(parent_block_slot - 1, 0)),
})
.await
.map_err(HeadEventHandlerError::BlobscanSyncStateUpdateError)?;

return Err(err);
}
}
}

self.synchronizer
.run(initial_block_id, (head_block_slot + 1).into())
.await?;

self.last_block_hash = Some(head_block_hash);
self.synchronizer.sync_block(head_slot.into()).await?;

Ok(())
}

async fn get_block_header(
&self,
block_id: BlockId,
) -> Result<BlockHeader, HeadEventHandlerError> {
match self
.context
.beacon_client()
.get_block_header(block_id.clone())
.await
.map_err(|err| {
HeadEventHandlerError::BlockHeaderRetrievalError(block_id.clone(), err)
})? {
Some(block) => Ok(block.into()),
None => Err(HeadEventHandlerError::BlockHeaderNotFound(block_id.clone())),
}
}
}

// #[cfg(test)]
Expand Down
Loading

0 comments on commit b6947ef

Please sign in to comment.