diff --git a/src/clients/beacon/types.rs b/src/clients/beacon/types.rs index 60964a9..4ff04ce 100644 --- a/src/clients/beacon/types.rs +++ b/src/clients/beacon/types.rs @@ -3,6 +3,10 @@ use std::{fmt, str::FromStr}; use alloy::primitives::{Bytes, B256}; use serde::{Deserialize, Serialize}; +use crate::clients::common::ClientError; + +use super::CommonBeaconClient; + #[derive(Serialize, Debug, Clone, PartialEq)] pub enum BlockId { Head, @@ -189,3 +193,43 @@ impl From for BlockHeader { } } } + +#[derive(Debug, thiserror::Error)] +pub enum BlockIdResolutionError { + #[error("Block with id '{0}' not found")] + BlockNotFound(BlockId), + #[error("Failed to resolve block id '{block_id}'")] + FailedBlockIdResolution { + block_id: BlockId, + #[source] + error: ClientError, + }, +} + +pub trait BlockIdResolution { + async fn resolve_to_slot( + &self, + beacon_client: &dyn CommonBeaconClient, + ) -> Result; +} + +impl BlockIdResolution for BlockId { + async fn resolve_to_slot( + &self, + beacon_client: &dyn CommonBeaconClient, + ) -> Result { + match self { + BlockId::Slot(slot) => Ok(*slot), + _ => match beacon_client + .get_block_header(self.clone().into()) + .await + .map_err(|err| BlockIdResolutionError::FailedBlockIdResolution { + block_id: self.clone(), + error: err, + })? { + Some(header) => Ok(header.slot), + None => Err(BlockIdResolutionError::BlockNotFound(self.clone())), + }, + } + } +} diff --git a/src/slots_processor/error.rs b/src/slots_processor/error.rs index 37c7ec2..6e808f0 100644 --- a/src/slots_processor/error.rs +++ b/src/slots_processor/error.rs @@ -1,4 +1,6 @@ -use crate::clients::common::ClientError; +use alloy::primitives::B256; + +use crate::{clients::common::ClientError, synchronizer::error::SynchronizerError}; #[derive(Debug, thiserror::Error)] pub enum SlotProcessingError { @@ -6,7 +8,6 @@ pub enum SlotProcessingError { ClientError(#[from] crate::clients::common::ClientError), #[error(transparent)] Provider(#[from] alloy::transports::TransportError), - #[error(transparent)] Other(#[from] anyhow::Error), } @@ -22,8 +23,19 @@ pub enum SlotsProcessorError { failed_slot: u32, error: SlotProcessingError, }, + #[error("Failed to process reorg. old slot {old_slot}, new slot {new_slot}, new head block root {new_head_block_root}, old head block root {old_head_block_root}")] + FailedReorgProcessing { + old_slot: u32, + new_slot: u32, + new_head_block_root: B256, + old_head_block_root: B256, + #[source] + error: anyhow::Error, + }, #[error("Failed to handle reorged slots")] ReorgedFailure(#[from] ClientError), + #[error("Failed to handle forwarded blocks")] + ForwardedBlocksFailure(#[from] SynchronizerError), #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/src/slots_processor/mod.rs b/src/slots_processor/mod.rs index 57b3ab8..4668a9f 100644 --- a/src/slots_processor/mod.rs +++ b/src/slots_processor/mod.rs @@ -3,11 +3,11 @@ use alloy::{ }; use anyhow::{anyhow, Context as AnyhowContext, Result}; +use crate::clients::beacon::types::BlockHeader; use tracing::{debug, info}; use crate::{ clients::{ - beacon::types::BlockHeader, blobscan::types::{Blob, BlobscanBlock, Block, Transaction}, common::ClientError, }, @@ -20,6 +20,23 @@ use self::helpers::{create_tx_hash_versioned_hashes_mapping, create_versioned_ha pub mod error; mod helpers; +pub struct BlockData { + pub root: B256, + pub parent_root: B256, + pub slot: u32, + pub execution_block_hash: B256, +} + +impl From<&BlockData> for BlockHeader { + fn from(block: &BlockData) -> Self { + BlockHeader { + root: block.root, + parent_root: block.parent_root, + slot: block.slot, + } + } +} + pub struct SlotsProcessor { context: Box>, pub last_processed_block: Option, @@ -68,7 +85,13 @@ impl SlotsProcessor { if prev_block_header.root != block_header.parent_root { self.process_reorg(&prev_block_header, &block_header) .await - .map_err(|err| SlotsProcessorError::ReorgedFailure(err))?; + .map_err(|error| SlotsProcessorError::FailedReorgProcessing { + old_slot: prev_block_header.slot, + new_slot: block_header.slot, + new_head_block_root: block_header.root, + old_head_block_root: prev_block_header.root, + error, + })?; } } @@ -85,25 +108,40 @@ impl SlotsProcessor { &mut self, slot: u32, ) -> Result, SlotProcessingError> { - let beacon_client = self.context.beacon_client(); - let blobscan_client = self.context.blobscan_client(); - let provider = self.context.provider(); - - let beacon_block_header = Some(match beacon_client.get_block_header(slot.into()).await? { + let beacon_block_header = match self + .context + .beacon_client() + .get_block_header(slot.into()) + .await? + { Some(header) => header, None => { debug!(slot, "Skipping as there is no beacon block header"); return Ok(None); } - }); + }; + + self.process_block(&beacon_block_header).await?; + + Ok(Some(beacon_block_header)) + } + + async fn process_block( + &self, + beacon_block_header: &BlockHeader, + ) -> Result<(), SlotProcessingError> { + let beacon_client = self.context.beacon_client(); + let blobscan_client = self.context.blobscan_client(); + let provider = self.context.provider(); + let slot = beacon_block_header.slot; let beacon_block = match beacon_client.get_block(slot.into()).await? { Some(block) => block, None => { debug!(slot = slot, "Skipping as there is no beacon block"); - return Ok(None); + return Ok(()); } }; @@ -115,7 +153,7 @@ impl SlotsProcessor { "Skipping as beacon block doesn't contain execution payload" ); - return Ok(beacon_block_header); + return Ok(()); } }; @@ -130,7 +168,7 @@ impl SlotsProcessor { "Skipping as beacon block doesn't contain blob kzg commitments" ); - return Ok(beacon_block_header); + return Ok(()); } let execution_block_hash = execution_payload.block_hash; @@ -160,7 +198,7 @@ impl SlotsProcessor { if blobs.is_empty() { debug!(slot, "Skipping as blobs sidecar is empty"); - return Ok(beacon_block_header); + return Ok(()); } else { blobs } @@ -168,7 +206,7 @@ impl SlotsProcessor { None => { debug!(slot, "Skipping as there is no blobs sidecar"); - return Ok(beacon_block_header); + return Ok(()); } }; @@ -217,19 +255,22 @@ impl SlotsProcessor { info!(slot, block_number, "Block indexed successfully"); - Ok(beacon_block_header) + Ok(()) } + /// Handles reorgs by rewinding the blobscan blocks to the common ancestor and forwarding to the new head. async fn process_reorg( &mut self, old_head_header: &BlockHeader, new_head_header: &BlockHeader, - ) -> Result<(), ClientError> { + ) -> Result<(), anyhow::Error> { let mut current_old_slot = old_head_header.slot; - let mut rewinded_execution_blocks: Vec = vec![]; + let mut rewinded_blocks: Vec = vec![]; loop { + // We iterate over blocks by slot and not block root as blobscan blocks don't + // have parent root we can use to traverse the chain let old_blobscan_block = match self .context .blobscan_client() @@ -240,56 +281,77 @@ impl SlotsProcessor { None => { current_old_slot -= 1; + // TODO: use fork slot instead of 0 as a stop condition to avoid long loops if current_old_slot == 0 { - return Err(anyhow!( - "No blobscan block found for old head slot {}", - old_head_header.slot - ) - .into()); + return Err(anyhow!("No common block found").into()); } continue; } }; - let forwarded_execution_blocks = self - .get_canonical_execution_blocks(new_head_header.root, &old_blobscan_block) + let canonical_block_path = self + .get_canonical_block_path(&old_blobscan_block, new_head_header.root) .await?; - - rewinded_execution_blocks.push(old_blobscan_block.hash); - - if !forwarded_execution_blocks.is_empty() { - let rewinded_blocks_count = rewinded_execution_blocks.len(); - let forwarded_blocks_count = forwarded_execution_blocks.len(); + let canonical_block_path = canonical_block_path.into_iter().rev().collect::>(); + + // If a path exists, we've found the common ancient block + // and can proceed with handling the reorg. + if !canonical_block_path.is_empty() { + let rewinded_blocks_count = rewinded_blocks.len(); + let forwarded_blocks_count = canonical_block_path.len(); + + let canonical_block_headers: Vec = canonical_block_path + .iter() + .map(|block| block.into()) + .collect::>(); + + // If the new canonical block path includes blocks beyond the new head block, + // they were skipped and must be processed. + for block in canonical_block_headers.iter() { + if block.slot != new_head_header.slot { + self.process_block(block) + .await + .with_context(|| format!("Failed to sync forwarded block"))?; + } + } info!( new_slot = new_head_header.slot, old_slot = old_head_header.slot, "Reorg detected! rewinded blocks: {rewinded_blocks_count}, forwarded blocks: {forwarded_blocks_count}", ); + + let forwarded_blocks = canonical_block_path + .iter() + .map(|block| block.execution_block_hash) + .collect::>(); + self.context .blobscan_client() - .handle_reorg(rewinded_execution_blocks, forwarded_execution_blocks) + .handle_reorg(rewinded_blocks, forwarded_blocks) .await?; return Ok(()); } + + rewinded_blocks.push(old_blobscan_block.hash); } } - async fn get_canonical_execution_blocks( + /// Returns the path of blocks with execution payload from the head block to the provided block. + async fn get_canonical_block_path( &mut self, - canonical_block_root: B256, blobscan_block: &BlobscanBlock, - ) -> Result, ClientError> { + head_block_root: B256, + ) -> Result, ClientError> { let beacon_client = self.context.beacon_client(); - let mut canonical_execution_blocks: Vec = vec![]; + let mut canonical_execution_blocks: Vec = vec![]; - let mut canonical_block = match beacon_client.get_block(canonical_block_root.into()).await? - { + let mut canonical_block = match beacon_client.get_block(head_block_root.into()).await? { Some(block) => block, None => { - return Ok(canonical_execution_blocks); + return Ok(vec![]); } }; @@ -299,21 +361,30 @@ impl SlotsProcessor { } } + let mut current_canonical_block_root = head_block_root; + while canonical_block.message.parent_root != B256::ZERO { + let canonical_block_parent_root = canonical_block.message.parent_root; + if canonical_block.message.slot < blobscan_block.slot { return Ok(vec![]); } - if let Some(execution_payload) = canonical_block.message.body.execution_payload { + if let Some(execution_payload) = &canonical_block.message.body.execution_payload { if execution_payload.block_hash == blobscan_block.hash { return Ok(canonical_execution_blocks); } - canonical_execution_blocks.push(execution_payload.block_hash); + canonical_execution_blocks.push(BlockData { + root: current_canonical_block_root, + parent_root: canonical_block_parent_root, + slot: canonical_block.message.slot, + execution_block_hash: execution_payload.block_hash, + }); } canonical_block = match beacon_client - .get_block(canonical_block.message.parent_root.into()) + .get_block(canonical_block_parent_root.into()) .await? { Some(block) => block, @@ -321,6 +392,8 @@ impl SlotsProcessor { return Ok(canonical_execution_blocks); } }; + + current_canonical_block_root = canonical_block_parent_root; } Ok(vec![]) diff --git a/src/synchronizer/error.rs b/src/synchronizer/error.rs index c9b18d3..2fe1a8d 100644 --- a/src/synchronizer/error.rs +++ b/src/synchronizer/error.rs @@ -1,4 +1,6 @@ -use crate::{clients::beacon::types::BlockId, slots_processor::error::SlotsProcessorError}; +use crate::{ + clients::beacon::types::BlockIdResolutionError, slots_processor::error::SlotsProcessorError, +}; #[derive(Debug, thiserror::Error)] pub enum SynchronizerError { @@ -10,19 +12,14 @@ pub enum SynchronizerError { final_slot: u32, chunk_errors: SlotsChunksErrors, }, - #[error("Failed to resolve block id {block_id} to a slot: {error}")] - FailedBlockIdResolution { - block_id: BlockId, - error: crate::clients::common::ClientError, - }, + #[error(transparent)] + FailedBlockIdResolution(#[from] BlockIdResolutionError), #[error("Failed to save slot checkpoint for slot {slot}: {error}")] FailedSlotCheckpointSave { slot: u32, error: crate::clients::common::ClientError, }, #[error(transparent)] - FailedSlotsProcessing(#[from] SlotsProcessorError), - #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 7cd5ef5..1f72226 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -12,9 +12,8 @@ use mockall::automock; use crate::{ clients::{ - beacon::types::{BlockHeader, BlockId}, + beacon::types::{BlockHeader, BlockId, BlockIdResolution}, blobscan::types::BlockchainSyncState, - common::ClientError, }, context::CommonContext, slots_processor::{error::SlotsProcessorError, SlotsProcessor}, @@ -318,31 +317,6 @@ impl Synchronizer { Ok(()) } - async fn resolve_to_slot(&self, block_id: BlockId) -> Result { - let beacon_client = self.context.beacon_client(); - - let resolved_block_id: Result = match block_id { - BlockId::Slot(slot) => Ok(slot), - _ => match beacon_client.get_block_header(block_id.clone()).await { - Ok(None) => { - let err = anyhow!("Block ID {} not found", block_id); - - Err(err.into()) - } - Ok(Some(block_header)) => Ok(block_header.slot), - Err(error) => Err(error), - }, - }; - - match resolved_block_id { - Ok(slot) => Ok(slot), - Err(error) => Err(SynchronizerError::FailedBlockIdResolution { - block_id: block_id.clone(), - error, - }), - } - } - pub fn clear_last_synced_block(&mut self) { self.last_synced_block = None; } @@ -359,7 +333,9 @@ impl CommonSynchronizer for Synchronizer { } async fn sync_block(&mut self, block_id: BlockId) -> Result<(), SynchronizerError> { - let final_slot = self.resolve_to_slot(block_id.clone()).await?; + let final_slot = block_id + .resolve_to_slot(self.context.beacon_client()) + .await?; self.process_slots_by_checkpoints(final_slot, final_slot + 1) .await?; @@ -372,8 +348,12 @@ impl CommonSynchronizer for Synchronizer { initial_block_id: BlockId, final_block_id: BlockId, ) -> Result<(), SynchronizerError> { - let initial_slot = self.resolve_to_slot(initial_block_id).await?; - let mut final_slot = self.resolve_to_slot(final_block_id.clone()).await?; + let initial_slot = initial_block_id + .resolve_to_slot(self.context.beacon_client()) + .await?; + let mut final_slot = final_block_id + .resolve_to_slot(self.context.beacon_client()) + .await?; if initial_slot == final_slot { return Ok(()); @@ -383,7 +363,9 @@ impl CommonSynchronizer for Synchronizer { self.process_slots_by_checkpoints(initial_slot, final_slot) .await?; - let latest_final_slot = self.resolve_to_slot(final_block_id.clone()).await?; + let latest_final_slot = final_block_id + .resolve_to_slot(self.context.beacon_client()) + .await?; if final_slot == latest_final_slot { return Ok(());