Skip to content

Commit

Permalink
fix: fix: process skipped blocks in the new canonical chain after a r…
Browse files Browse the repository at this point in the history
…eorg, if any exist
  • Loading branch information
PJColombo committed Feb 8, 2025
1 parent b6947ef commit b9b4e57
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 81 deletions.
44 changes: 44 additions & 0 deletions src/clients/beacon/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ use std::{fmt, str::FromStr};
use alloy::primitives::{Bytes, B256};
use serde::{Deserialize, Serialize};

use crate::clients::common::ClientError;

use super::CommonBeaconClient;

#[derive(Serialize, Debug, Clone, PartialEq)]
pub enum BlockId {
Head,
Expand Down Expand Up @@ -189,3 +193,43 @@ impl From<BlockHeaderResponse> for BlockHeader {
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum BlockIdResolutionError {
#[error("Block with id '{0}' not found")]
BlockNotFound(BlockId),
#[error("Failed to resolve block id '{block_id}'")]
FailedBlockIdResolution {
block_id: BlockId,
#[source]
error: ClientError,
},
}

pub trait BlockIdResolution {
async fn resolve_to_slot(
&self,
beacon_client: &dyn CommonBeaconClient,
) -> Result<u32, BlockIdResolutionError>;
}

impl BlockIdResolution for BlockId {
async fn resolve_to_slot(
&self,
beacon_client: &dyn CommonBeaconClient,
) -> Result<u32, BlockIdResolutionError> {
match self {
BlockId::Slot(slot) => Ok(*slot),
_ => match beacon_client
.get_block_header(self.clone().into())
.await
.map_err(|err| BlockIdResolutionError::FailedBlockIdResolution {
block_id: self.clone(),
error: err,
})? {
Some(header) => Ok(header.slot),
None => Err(BlockIdResolutionError::BlockNotFound(self.clone())),
},
}
}
}
16 changes: 14 additions & 2 deletions src/slots_processor/error.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::clients::common::ClientError;
use alloy::primitives::B256;

use crate::{clients::common::ClientError, synchronizer::error::SynchronizerError};

#[derive(Debug, thiserror::Error)]
pub enum SlotProcessingError {
#[error(transparent)]
ClientError(#[from] crate::clients::common::ClientError),
#[error(transparent)]
Provider(#[from] alloy::transports::TransportError),

#[error(transparent)]
Other(#[from] anyhow::Error),
}
Expand All @@ -22,8 +23,19 @@ pub enum SlotsProcessorError {
failed_slot: u32,
error: SlotProcessingError,
},
#[error("Failed to process reorg. old slot {old_slot}, new slot {new_slot}, new head block root {new_head_block_root}, old head block root {old_head_block_root}")]
FailedReorgProcessing {
old_slot: u32,
new_slot: u32,
new_head_block_root: B256,
old_head_block_root: B256,
#[source]
error: anyhow::Error,
},
#[error("Failed to handle reorged slots")]
ReorgedFailure(#[from] ClientError),
#[error("Failed to handle forwarded blocks")]
ForwardedBlocksFailure(#[from] SynchronizerError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
153 changes: 113 additions & 40 deletions src/slots_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use alloy::{
};
use anyhow::{anyhow, Context as AnyhowContext, Result};

use crate::clients::beacon::types::BlockHeader;
use tracing::{debug, info};

use crate::{
clients::{
beacon::types::BlockHeader,
blobscan::types::{Blob, BlobscanBlock, Block, Transaction},
common::ClientError,
},
Expand All @@ -20,6 +20,23 @@ use self::helpers::{create_tx_hash_versioned_hashes_mapping, create_versioned_ha
pub mod error;
mod helpers;

pub struct BlockData {
pub root: B256,
pub parent_root: B256,
pub slot: u32,
pub execution_block_hash: B256,
}

impl From<&BlockData> for BlockHeader {
fn from(block: &BlockData) -> Self {
BlockHeader {
root: block.root,
parent_root: block.parent_root,
slot: block.slot,
}
}
}

pub struct SlotsProcessor<T> {
context: Box<dyn CommonContext<T>>,
pub last_processed_block: Option<BlockHeader>,
Expand Down Expand Up @@ -68,7 +85,13 @@ impl SlotsProcessor<ReqwestTransport> {
if prev_block_header.root != block_header.parent_root {
self.process_reorg(&prev_block_header, &block_header)
.await
.map_err(|err| SlotsProcessorError::ReorgedFailure(err))?;
.map_err(|error| SlotsProcessorError::FailedReorgProcessing {
old_slot: prev_block_header.slot,
new_slot: block_header.slot,
new_head_block_root: block_header.root,
old_head_block_root: prev_block_header.root,
error,
})?;
}
}

Expand All @@ -85,25 +108,40 @@ impl SlotsProcessor<ReqwestTransport> {
&mut self,
slot: u32,
) -> Result<Option<BlockHeader>, SlotProcessingError> {
let beacon_client = self.context.beacon_client();
let blobscan_client = self.context.blobscan_client();
let provider = self.context.provider();

let beacon_block_header = Some(match beacon_client.get_block_header(slot.into()).await? {
let beacon_block_header = match self
.context
.beacon_client()
.get_block_header(slot.into())
.await?
{
Some(header) => header,
None => {
debug!(slot, "Skipping as there is no beacon block header");

return Ok(None);
}
});
};

self.process_block(&beacon_block_header).await?;

Ok(Some(beacon_block_header))
}

async fn process_block(
&self,
beacon_block_header: &BlockHeader,
) -> Result<(), SlotProcessingError> {
let beacon_client = self.context.beacon_client();
let blobscan_client = self.context.blobscan_client();
let provider = self.context.provider();
let slot = beacon_block_header.slot;

let beacon_block = match beacon_client.get_block(slot.into()).await? {
Some(block) => block,
None => {
debug!(slot = slot, "Skipping as there is no beacon block");

return Ok(None);
return Ok(());
}
};

Expand All @@ -115,7 +153,7 @@ impl SlotsProcessor<ReqwestTransport> {
"Skipping as beacon block doesn't contain execution payload"
);

return Ok(beacon_block_header);
return Ok(());
}
};

Expand All @@ -130,7 +168,7 @@ impl SlotsProcessor<ReqwestTransport> {
"Skipping as beacon block doesn't contain blob kzg commitments"
);

return Ok(beacon_block_header);
return Ok(());
}

let execution_block_hash = execution_payload.block_hash;
Expand Down Expand Up @@ -160,15 +198,15 @@ impl SlotsProcessor<ReqwestTransport> {
if blobs.is_empty() {
debug!(slot, "Skipping as blobs sidecar is empty");

return Ok(beacon_block_header);
return Ok(());
} else {
blobs
}
}
None => {
debug!(slot, "Skipping as there is no blobs sidecar");

return Ok(beacon_block_header);
return Ok(());
}
};

Expand Down Expand Up @@ -217,19 +255,22 @@ impl SlotsProcessor<ReqwestTransport> {

info!(slot, block_number, "Block indexed successfully");

Ok(beacon_block_header)
Ok(())
}

/// Handles reorgs by rewinding the blobscan blocks to the common ancestor and forwarding to the new head.
async fn process_reorg(
&mut self,
old_head_header: &BlockHeader,
new_head_header: &BlockHeader,
) -> Result<(), ClientError> {
) -> Result<(), anyhow::Error> {
let mut current_old_slot = old_head_header.slot;

let mut rewinded_execution_blocks: Vec<B256> = vec![];
let mut rewinded_blocks: Vec<B256> = vec![];

loop {
// We iterate over blocks by slot and not block root as blobscan blocks don't
// have parent root we can use to traverse the chain
let old_blobscan_block = match self
.context
.blobscan_client()
Expand All @@ -240,56 +281,77 @@ impl SlotsProcessor<ReqwestTransport> {
None => {
current_old_slot -= 1;

// TODO: use fork slot instead of 0 as a stop condition to avoid long loops
if current_old_slot == 0 {
return Err(anyhow!(
"No blobscan block found for old head slot {}",
old_head_header.slot
)
.into());
return Err(anyhow!("No common block found").into());
}

continue;
}
};

let forwarded_execution_blocks = self
.get_canonical_execution_blocks(new_head_header.root, &old_blobscan_block)
let canonical_block_path = self
.get_canonical_block_path(&old_blobscan_block, new_head_header.root)
.await?;

rewinded_execution_blocks.push(old_blobscan_block.hash);

if !forwarded_execution_blocks.is_empty() {
let rewinded_blocks_count = rewinded_execution_blocks.len();
let forwarded_blocks_count = forwarded_execution_blocks.len();
let canonical_block_path = canonical_block_path.into_iter().rev().collect::<Vec<_>>();

// If a path exists, we've found the common ancient block
// and can proceed with handling the reorg.
if !canonical_block_path.is_empty() {
let rewinded_blocks_count = rewinded_blocks.len();
let forwarded_blocks_count = canonical_block_path.len();

let canonical_block_headers: Vec<BlockHeader> = canonical_block_path
.iter()
.map(|block| block.into())
.collect::<Vec<_>>();

// If the new canonical block path includes blocks beyond the new head block,
// they were skipped and must be processed.
for block in canonical_block_headers.iter() {
if block.slot != new_head_header.slot {
self.process_block(block)
.await
.with_context(|| format!("Failed to sync forwarded block"))?;
}
}

info!(
new_slot = new_head_header.slot,
old_slot = old_head_header.slot,
"Reorg detected! rewinded blocks: {rewinded_blocks_count}, forwarded blocks: {forwarded_blocks_count}",
);

let forwarded_blocks = canonical_block_path
.iter()
.map(|block| block.execution_block_hash)
.collect::<Vec<_>>();

self.context
.blobscan_client()
.handle_reorg(rewinded_execution_blocks, forwarded_execution_blocks)
.handle_reorg(rewinded_blocks, forwarded_blocks)
.await?;

return Ok(());
}

rewinded_blocks.push(old_blobscan_block.hash);
}
}

async fn get_canonical_execution_blocks(
/// Returns the path of blocks with execution payload from the head block to the provided block.
async fn get_canonical_block_path(
&mut self,
canonical_block_root: B256,
blobscan_block: &BlobscanBlock,
) -> Result<Vec<B256>, ClientError> {
head_block_root: B256,
) -> Result<Vec<BlockData>, ClientError> {
let beacon_client = self.context.beacon_client();
let mut canonical_execution_blocks: Vec<B256> = vec![];
let mut canonical_execution_blocks: Vec<BlockData> = vec![];

let mut canonical_block = match beacon_client.get_block(canonical_block_root.into()).await?
{
let mut canonical_block = match beacon_client.get_block(head_block_root.into()).await? {
Some(block) => block,
None => {
return Ok(canonical_execution_blocks);
return Ok(vec![]);
}
};

Expand All @@ -299,28 +361,39 @@ impl SlotsProcessor<ReqwestTransport> {
}
}

let mut current_canonical_block_root = head_block_root;

while canonical_block.message.parent_root != B256::ZERO {
let canonical_block_parent_root = canonical_block.message.parent_root;

if canonical_block.message.slot < blobscan_block.slot {
return Ok(vec![]);
}

if let Some(execution_payload) = canonical_block.message.body.execution_payload {
if let Some(execution_payload) = &canonical_block.message.body.execution_payload {
if execution_payload.block_hash == blobscan_block.hash {
return Ok(canonical_execution_blocks);
}

canonical_execution_blocks.push(execution_payload.block_hash);
canonical_execution_blocks.push(BlockData {
root: current_canonical_block_root,
parent_root: canonical_block_parent_root,
slot: canonical_block.message.slot,
execution_block_hash: execution_payload.block_hash,
});
}

canonical_block = match beacon_client
.get_block(canonical_block.message.parent_root.into())
.get_block(canonical_block_parent_root.into())
.await?
{
Some(block) => block,
None => {
return Ok(canonical_execution_blocks);
}
};

current_canonical_block_root = canonical_block_parent_root;
}

Ok(vec![])
Expand Down
Loading

0 comments on commit b9b4e57

Please sign in to comment.