From 9ccfacc106c005008fed78dec3fb8206f9d35162 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sat, 9 Mar 2024 14:51:15 +0100 Subject: [PATCH 01/11] feat: add dencun fork slots for testnets (holesky, goerli and sepolia) + allow to set the network to be indexed --- src/env.rs | 15 +++++++++------ src/indexer/mod.rs | 5 ++++- src/main.rs | 14 ++++++++++++-- src/types.rs | 11 +++++++++++ src/utils/web3.rs | 11 +++++++++++ 5 files changed, 47 insertions(+), 9 deletions(-) create mode 100644 src/types.rs diff --git a/src/env.rs b/src/env.rs index adcb48c..280ef7f 100644 --- a/src/env.rs +++ b/src/env.rs @@ -1,8 +1,12 @@ use envy::Error::MissingValue; use serde::Deserialize; +use crate::types::Network; + #[derive(Deserialize, Debug)] pub struct Environment { + #[serde(default = "default_network")] + pub network_name: Network, #[serde(default = "default_blobscan_api_endpoint")] pub blobscan_api_endpoint: String, #[serde(default = "default_beacon_node_endpoint")] @@ -10,11 +14,14 @@ pub struct Environment { #[serde(default = "default_execution_node_endpoint")] pub execution_node_endpoint: String, pub secret_key: String, - #[serde(default = "default_dencun_fork_slot")] - pub dencun_fork_slot: u32, + pub dencun_fork_slot: Option, pub sentry_dsn: Option, } +fn default_network() -> Network { + Network::Devnet +} + fn default_blobscan_api_endpoint() -> String { "http://localhost:3001".to_string() } @@ -27,10 +34,6 @@ fn default_execution_node_endpoint() -> String { "http://localhost:8545".to_string() } -fn default_dencun_fork_slot() -> u32 { - 0 -} - impl Environment { pub fn from_env() -> Result { match envy::from_env::() { diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 613c97c..9234489 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -15,6 +15,7 @@ use crate::{ context::{Config as ContextConfig, Context}, env::Environment, synchronizer::{Synchronizer, SynchronizerBuilder}, + utils::web3::get_network_dencun_fork_slot, }; use self::{ @@ -49,7 +50,9 @@ impl Indexer { .map_err(|err| anyhow!("Failed to get number of available threads: {:?}", err))? .get() as u32, }; - let dencun_fork_slot = env.dencun_fork_slot; + let dencun_fork_slot = env + .dencun_fork_slot + .unwrap_or(get_network_dencun_fork_slot(&env.network_name)); Ok(Self { context, diff --git a/src/main.rs b/src/main.rs index ac83c90..c4f8a9e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,8 @@ use indexer::Indexer; use url::Url; use utils::telemetry::{get_subscriber, init_subscriber}; +use crate::utils::web3::get_network_dencun_fork_slot; + mod args; mod clients; mod context; @@ -13,6 +15,7 @@ mod env; mod indexer; mod slots_processor; mod synchronizer; +mod types; mod utils; fn remove_credentials_from_url(url_string: &str) -> Option { @@ -35,6 +38,14 @@ pub fn print_banner(args: &Args, env: &Environment) { println!("Blobscan indexer (EIP-4844 blob indexer) - blobscan.com"); println!("======================================================="); + println!("Network: {:?}", env.network_name); + if let Some(dencun_fork_slot) = env.dencun_fork_slot { + println!("Dencun fork slot: {dencun_fork_slot}"); + } else { + let default_dencun_fork_slot = get_network_dencun_fork_slot(&env.network_name); + println!("Dencun fork slot: {default_dencun_fork_slot}"); + } + if let Some(from_slot) = args.from_slot.clone() { println!("Start slot: {}", from_slot); } else { @@ -44,7 +55,7 @@ pub fn print_banner(args: &Args, env: &Environment) { if let Some(num_threads) = args.num_threads { println!("Number of threads: {}", num_threads); } else { - println!("Number of threads: 1"); + println!("Number of threads: auto"); } if let Some(slots_per_save) = args.slots_per_save { @@ -53,7 +64,6 @@ pub fn print_banner(args: &Args, env: &Environment) { println!("Slots checkpoint size: 1000"); } - println!("Dencun fork slot: {}", env.dencun_fork_slot); println!("Blobscan API endpoint: {}", env.blobscan_api_endpoint); println!( "CL endpoint: {:?}", diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..a6e3993 --- /dev/null +++ b/src/types.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum Network { + Mainnet, + Goerli, + Sepolia, + Holesky, + Devnet, +} diff --git a/src/utils/web3.rs b/src/utils/web3.rs index bf4b170..eaa53e5 100644 --- a/src/utils/web3.rs +++ b/src/utils/web3.rs @@ -4,6 +4,8 @@ use anyhow::{Context, Result}; use ethers::core::k256::sha2::{Digest, Sha256}; use ethers::{prelude::*, types::H256}; +use crate::types::Network; + const BLOB_COMMITMENT_VERSION_KZG: u8 = 0x01; pub fn sha256(value: &str) -> Result { @@ -64,3 +66,12 @@ pub fn get_tx_versioned_hashes(tx: &Transaction) -> Result>> { None => Ok(None), } } + +pub fn get_network_dencun_fork_slot(network: &Network) -> u32 { + match network { + Network::Goerli => 7413760, + Network::Sepolia => 4243456, + Network::Holesky => 950272, + _ => 0, + } +} From 555addb1aba04606e0dc3410a6c31bed7f1b7c6c Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sat, 9 Mar 2024 15:40:25 +0100 Subject: [PATCH 02/11] feat(indexer): listen and process chain reorg events --- src/clients/beacon/types.rs | 17 ++++++++++++++--- src/indexer/mod.rs | 22 +++++++++++++++++++--- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/src/clients/beacon/types.rs b/src/clients/beacon/types.rs index d27dcfe..03c99a8 100644 --- a/src/clients/beacon/types.rs +++ b/src/clients/beacon/types.rs @@ -14,9 +14,11 @@ pub enum BlockId { } #[derive(Serialize, Debug)] +#[serde(rename_all = "snake_case")] pub enum Topic { Head, FinalizedCheckpoint, + ChainReorg, } #[derive(Deserialize, Debug)] @@ -85,7 +87,15 @@ pub struct BlockHeaderMessage { } #[derive(Deserialize, Debug)] -pub struct HeadBlockEventData { +pub struct ChainReorgEventData { + pub old_head_block: H256, + pub new_head_block: H256, + #[serde(deserialize_with = "deserialize_number")] + pub slot: u32, +} + +#[derive(Deserialize, Debug)] +pub struct HeadEventData { #[serde(deserialize_with = "deserialize_number")] pub slot: u32, pub block: H256, @@ -136,14 +146,15 @@ impl FromStr for BlockId { impl From<&Topic> for String { fn from(value: &Topic) -> Self { match value { + Topic::ChainReorg => String::from("chain_reorg"), Topic::Head => String::from("head"), Topic::FinalizedCheckpoint => String::from("finalized_checkpoint"), } } } -impl From for BlockData { - fn from(event_data: HeadBlockEventData) -> Self { +impl From for BlockData { + fn from(event_data: HeadEventData) -> Self { Self { root: event_data.block, slot: event_data.slot, diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 9234489..095c71b 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -9,7 +9,9 @@ use tracing::{debug, error, info}; use crate::{ args::Args, clients::{ - beacon::types::{BlockId, FinalizedCheckpointEventData, HeadBlockEventData, Topic}, + beacon::types::{ + BlockId, ChainReorgEventData, FinalizedCheckpointEventData, HeadEventData, Topic, + }, blobscan::types::BlockchainSyncState, }, context::{Config as ContextConfig, Context}, @@ -158,7 +160,7 @@ impl Indexer { let blobscan_client = task_context.blobscan_client(); let mut event_source = task_context .beacon_client() - .subscribe_to_events(vec![Topic::Head, Topic::FinalizedCheckpoint])?; + .subscribe_to_events(vec![Topic::ChainReorg, Topic::Head, Topic::FinalizedCheckpoint])?; let mut is_initial_sync_to_head = true; while let Some(event) = event_source.next().await { @@ -168,9 +170,23 @@ impl Indexer { } Ok(Event::Message(event)) => { match event.event.as_str() { + "chain_reorg" => { + let reorg_block_data = + serde_json::from_str::(&event.data)?; + + let reorged_slot = reorg_block_data.slot; + + blobscan_client.handle_reorged_slot(reorged_slot).await?; + blobscan_client.update_sync_state(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot: None, + last_upper_synced_slot: Some(reorged_slot), + }).await?; + + }, "head" => { let head_block_data = - serde_json::from_str::(&event.data)?; + serde_json::from_str::(&event.data)?; let head_block_id = &BlockId::Slot(head_block_data.slot); let initial_block_id = if is_initial_sync_to_head { From 7998addca4aa2ba2453f675dd8aae37f964cca4d Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 10 Mar 2024 01:25:34 +0100 Subject: [PATCH 03/11] feat(macros): allow to define expected returned type on `put` request macro --- src/clients/macros.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/clients/macros.rs b/src/clients/macros.rs index 609034b..662b9b2 100644 --- a/src/clients/macros.rs +++ b/src/clients/macros.rs @@ -95,7 +95,10 @@ macro_rules! json_get { /// Make a PUT request sending JSON. /// if JSON deser fails, emit a `WARN` level tracing event macro_rules! json_put { - ($client:expr, $url:expr, $auth_token:expr, $body:expr) => {{ + ($client:expr, $url:expr, $auth_token:expr, $body:expr) => { + json_put!($client, $url, (), $auth_token, $body) + }; + ($client:expr, $url:expr, $expected:ty, $auth_token:expr, $body:expr) => {{ let url = $url.clone(); let body = format!("{:?}", $body); @@ -123,7 +126,7 @@ macro_rules! json_put { }; let text = resp.text().await?; - let result: $crate::clients::common::ClientResponse<_> = text.parse()?; + let result: $crate::clients::common::ClientResponse<$expected> = text.parse()?; if result.is_err() { tracing::warn!( From 88bcfb4311af3c572aa06b68bb81a5e6dd290097 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 10 Mar 2024 01:27:22 +0100 Subject: [PATCH 04/11] fix: revert reorged slot handling logic on synchronizer --- src/slots_processor/mod.rs | 58 ++++---------------------------------- src/synchronizer/mod.rs | 18 ++++-------- 2 files changed, 10 insertions(+), 66 deletions(-) diff --git a/src/slots_processor/mod.rs b/src/slots_processor/mod.rs index f6cfcd3..c0b436e 100644 --- a/src/slots_processor/mod.rs +++ b/src/slots_processor/mod.rs @@ -19,7 +19,6 @@ mod helpers; pub struct SlotsProcessor { context: Context, - last_block: Option, } #[derive(Debug, Clone)] @@ -38,11 +37,8 @@ impl From for BlockData { } impl SlotsProcessor { - pub fn new(context: Context, last_block: Option) -> SlotsProcessor { - Self { - context, - last_block, - } + pub fn new(context: Context) -> SlotsProcessor { + Self { context } } pub async fn process_slots( @@ -54,7 +50,7 @@ impl SlotsProcessor { if is_reverse_processing { for current_slot in (final_slot..=initial_slot).rev() { - let result = self.process_slot(current_slot, Some(false)).await; + let result = self.process_slot(current_slot).await; if let Err(error) = result { return Err(SlotsProcessorError::FailedSlotsProcessing { @@ -67,7 +63,7 @@ impl SlotsProcessor { } } else { for current_slot in initial_slot..=final_slot { - let result = self.process_slot(current_slot, Some(true)).await; + let result = self.process_slot(current_slot).await; if let Err(error) = result { return Err(SlotsProcessorError::FailedSlotsProcessing { @@ -83,17 +79,7 @@ impl SlotsProcessor { Ok(()) } - pub async fn process_slot( - &mut self, - slot: u32, - enable_reorg_detection: Option, - ) -> Result<(), SlotProcessingError> { - if let Some(enable_reorg_detection) = enable_reorg_detection { - if enable_reorg_detection { - self._detect_and_handle_reorg(slot).await?; - } - } - + pub async fn process_slot(&mut self, slot: u32) -> Result<(), SlotProcessingError> { let beacon_client = self.context.beacon_client(); let blobscan_client = self.context.blobscan_client(); let provider = self.context.provider(); @@ -227,38 +213,4 @@ impl SlotsProcessor { Ok(()) } - - pub fn get_last_block(&self) -> Option { - self.last_block.clone() - } - - async fn _detect_and_handle_reorg(&mut self, slot: u32) -> Result<(), SlotProcessingError> { - let beacon_client = self.context.beacon_client(); - let blobscan_client = self.context.blobscan_client(); - - let beacon_block_header = match beacon_client.get_block_header(&BlockId::Slot(slot)).await? - { - Some(block_header) => block_header, - None => { - debug!( - target = "slots_processor", - slot, "Skipping as there is no beacon block header" - ); - - return Ok(()); - } - }; - - if let Some(block) = &self.last_block { - if beacon_block_header.header.message.parent_root != block.root { - info!(target = "slots_processor", slot, "Block reorg detected"); - - blobscan_client.handle_reorged_slot(slot).await?; - } - } - - self.last_block = Some(beacon_block_header.into()); - - Ok(()) - } } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 1228802..2e71876 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -6,7 +6,7 @@ use tracing::{debug, debug_span, info, Instrument}; use crate::{ clients::{beacon::types::BlockId, blobscan::types::BlockchainSyncState, common::ClientError}, context::Context, - slots_processor::{error::SlotsProcessorError, BlockData, SlotsProcessor}, + slots_processor::{error::SlotsProcessorError, SlotsProcessor}, }; use self::error::{SlotsChunksErrors, SynchronizerError}; @@ -25,7 +25,6 @@ pub struct Synchronizer { num_threads: u32, min_slots_per_thread: u32, slots_checkpoint: u32, - last_synced_block: Option, } impl Default for SynchronizerBuilder { @@ -60,7 +59,6 @@ impl SynchronizerBuilder { num_threads: self.num_threads, min_slots_per_thread: self.min_slots_per_thread, slots_checkpoint: self.slots_checkpoint, - last_synced_block: None, } } } @@ -98,11 +96,10 @@ impl Synchronizer { let num_threads = std::cmp::max(1, unprocessed_slots / slots_per_thread); let remaining_slots = unprocessed_slots % num_threads; - let mut handles: Vec, SlotsProcessorError>>> = vec![]; + let mut handles: Vec>> = vec![]; for i in 0..num_threads { - let mut slots_processor = - SlotsProcessor::new(self.context.clone(), self.last_synced_block.clone()); + let mut slots_processor = SlotsProcessor::new(self.context.clone()); let thread_total_slots = slots_per_thread + if i == num_threads - 1 { remaining_slots @@ -132,7 +129,7 @@ impl Synchronizer { .process_slots(thread_initial_slot, thread_final_slot) .await?; - Ok(slots_processor.get_last_block()) + Ok(()) } .instrument(synchronizer_thread_span), ); @@ -143,14 +140,11 @@ impl Synchronizer { let handle_outputs = join_all(handles).await; let mut errors = vec![]; - let mut last_synced_block: Option = None; for handle in handle_outputs { match handle { Ok(thread_result) => match thread_result { - Ok(thread_last_block) => { - last_synced_block = thread_last_block; - } + Ok(()) => {} Err(error) => errors.push(error), }, Err(error) => { @@ -162,8 +156,6 @@ impl Synchronizer { } if errors.is_empty() { - self.last_synced_block = last_synced_block; - Ok(()) } else { Err(SynchronizerError::FailedParallelSlotsProcessing { From 08c3c8878ed16e01a7afcca0b7f7a90fa03049b6 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 10 Mar 2024 01:28:06 +0100 Subject: [PATCH 05/11] fix(indexer): update reorg logic to integrate with updated reorg handling endpoint --- src/clients/beacon/mod.rs | 2 +- src/clients/beacon/types.rs | 2 ++ src/clients/blobscan/mod.rs | 18 ++++++----- src/clients/blobscan/types.rs | 10 ++++-- src/indexer/mod.rs | 58 ++++++++++++++++++++++++++--------- 5 files changed, 65 insertions(+), 25 deletions(-) diff --git a/src/clients/beacon/mod.rs b/src/clients/beacon/mod.rs index ea52ddc..a60a006 100644 --- a/src/clients/beacon/mod.rs +++ b/src/clients/beacon/mod.rs @@ -79,7 +79,7 @@ impl BeaconClient { }) } - pub fn subscribe_to_events(&self, topics: Vec) -> ClientResult { + pub fn subscribe_to_events(&self, topics: &Vec) -> ClientResult { let topics = topics .iter() .map(|topic| topic.into()) diff --git a/src/clients/beacon/types.rs b/src/clients/beacon/types.rs index 03c99a8..f9dbcd3 100644 --- a/src/clients/beacon/types.rs +++ b/src/clients/beacon/types.rs @@ -92,6 +92,8 @@ pub struct ChainReorgEventData { pub new_head_block: H256, #[serde(deserialize_with = "deserialize_number")] pub slot: u32, + #[serde(deserialize_with = "deserialize_number")] + pub depth: u32, } #[derive(Deserialize, Debug)] diff --git a/src/clients/blobscan/mod.rs b/src/clients/blobscan/mod.rs index 7b7e5de..a0307b7 100644 --- a/src/clients/blobscan/mod.rs +++ b/src/clients/blobscan/mod.rs @@ -2,13 +2,16 @@ use backoff::ExponentialBackoff; use chrono::TimeDelta; use reqwest::{Client, Url}; -use crate::{clients::common::ClientResult, json_get, json_put}; +use crate::{ + clients::{blobscan::types::ReorgedSlotsResponse, common::ClientResult}, + json_get, json_put, +}; use self::{ jwt_manager::{Config as JWTManagerConfig, JWTManager}, types::{ Blob, Block, BlockchainSyncState, BlockchainSyncStateRequest, BlockchainSyncStateResponse, - IndexRequest, ReorgedSlotRequest, Transaction, + IndexRequest, ReorgedSlotsRequest, Transaction, }, }; @@ -64,14 +67,15 @@ impl BlobscanClient { json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) } - pub async fn handle_reorged_slot(&self, slot: u32) -> ClientResult<()> { - let url = self.base_url.join("indexer/reorged-slot")?; + pub async fn handle_reorged_slots(&self, slots: Vec) -> ClientResult { + let url = self.base_url.join("indexer/reorged-slots")?; let token = self.jwt_manager.get_token()?; - let req = ReorgedSlotRequest { - new_head_slot: slot, + let req = ReorgedSlotsRequest { + reorged_slots: slots, }; - json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) + json_put!(&self.client, url, ReorgedSlotsResponse, token, &req) + .map(|res: Option| res.unwrap().total_updated_slots) } pub async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> { diff --git a/src/clients/blobscan/types.rs b/src/clients/blobscan/types.rs index aa6c7cc..95ce669 100644 --- a/src/clients/blobscan/types.rs +++ b/src/clients/blobscan/types.rs @@ -87,8 +87,14 @@ pub struct IndexRequest { #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] -pub struct ReorgedSlotRequest { - pub new_head_slot: u32, +pub struct ReorgedSlotsRequest { + pub reorged_slots: Vec, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ReorgedSlotsResponse { + pub total_updated_slots: u32, } impl fmt::Debug for Blob { diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 095c71b..9a6f677 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, Context as AnyhowContext}; use futures::StreamExt; use reqwest_eventsource::Event; use tokio::{sync::mpsc, task::JoinHandle}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::{ args::Args, @@ -151,6 +151,8 @@ impl Indexer { tx: mpsc::Sender, start_block_id: BlockId, ) -> JoinHandle { + let task_name = "realtime_sync".to_string(); + let target = format!("indexer:{task_name}"); let task_context = self.context.clone(); let mut synchronizer = self._create_synchronizer(); @@ -158,31 +160,56 @@ impl Indexer { let result: Result<(), IndexerError> = async { let beacon_client = task_context.beacon_client(); let blobscan_client = task_context.blobscan_client(); + let topics = vec![ + Topic::ChainReorg, + Topic::Head, + Topic::FinalizedCheckpoint, + ]; let mut event_source = task_context .beacon_client() - .subscribe_to_events(vec![Topic::ChainReorg, Topic::Head, Topic::FinalizedCheckpoint])?; + .subscribe_to_events(&topics)?; let mut is_initial_sync_to_head = true; while let Some(event) = event_source.next().await { match event { Ok(Event::Open) => { - debug!(target = "indexer", "Listening for head and finalized block events…") + let events = topics + .iter() + .map(|topic| String::from(topic)) + .collect::>() + .join(", "); + debug!(target, events, "Listening to beacon events…") } Ok(Event::Message(event)) => { - match event.event.as_str() { + let event_name = event.event.as_str(); + + match event_name { "chain_reorg" => { let reorg_block_data = serde_json::from_str::(&event.data)?; - - let reorged_slot = reorg_block_data.slot; - - blobscan_client.handle_reorged_slot(reorged_slot).await?; - blobscan_client.update_sync_state(BlockchainSyncState { - last_finalized_block: None, - last_lower_synced_slot: None, - last_upper_synced_slot: Some(reorged_slot), - }).await?; + let slot = reorg_block_data.slot; + let old_head_block = reorg_block_data.old_head_block; + let target_depth = reorg_block_data.depth; + + let mut current_reorged_block = old_head_block; + let mut reorged_slots: Vec = vec![]; + + for current_depth in 0..target_depth { + let reorged_block_head = match beacon_client.get_block_header(&BlockId::Hash(current_reorged_block)).await? { + Some(block) => block, + None => { + warn!(target, event=event_name, slot=slot, "Found {current_depth} out of {target_depth} reorged blocks only"); + break + } + }; + reorged_slots.push(reorged_block_head.header.message.slot); + current_reorged_block = reorged_block_head.header.message.parent_root; + } + + let total_updated_slots = blobscan_client.handle_reorged_slots(reorged_slots).await?; + + info!(target, event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found reorged slots: {current_reorged_block}. Total slots marked as reorged: {total_updated_slots}"); }, "head" => { let head_block_data = @@ -212,6 +239,7 @@ impl Indexer { &event.data, )?; let block_hash = finalized_checkpoint_data.block; + let full_block_hash = format!("0x{:x}", block_hash); let last_finalized_block_number = beacon_client .get_block(&BlockId::Hash(block_hash)) @@ -234,7 +262,7 @@ impl Indexer { }) .await?; - info!(target = "indexer", "Finalized block {full_block_hash} detected and stored"); + info!(target, event=event_name, execution_block=last_finalized_block_number, "New finalized block detected"); }, unexpected_event_id => { return Err(IndexerError::UnexpectedEvent { event: unexpected_event_id.to_string() }) @@ -256,7 +284,7 @@ impl Indexer { if let Err(error) = result { // TODO: Find a better way to handle this error tx.send(Err(IndexingTaskError::FailedIndexingTask { - task_name: "realtime_head_block_sync".to_string(), + task_name, error, })) .await From 460163b7ffd740b47ab6f6d3ca020eb7fafe4602 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 10 Mar 2024 01:35:57 +0100 Subject: [PATCH 06/11] fix(indexer): fallback to head block only if both lower and upper synced slots do not exist --- src/indexer/mod.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 9a6f677..b4f6c9c 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -79,7 +79,10 @@ impl Indexer { None => match &sync_state { Some(state) => match state.last_lower_synced_slot { Some(slot) => BlockId::Slot(slot - 1), - None => BlockId::Head, + None => match state.last_upper_synced_slot { + Some(slot) => BlockId::Slot(slot - 1), + None => BlockId::Head, + } }, None => BlockId::Head, }, @@ -89,7 +92,10 @@ impl Indexer { None => match &sync_state { Some(state) => match state.last_upper_synced_slot { Some(slot) => BlockId::Slot(slot + 1), - None => BlockId::Head, + None => match state.last_lower_synced_slot { + Some(slot) => BlockId::Slot(slot + 1), + None => BlockId::Head, + } }, None => BlockId::Head, }, From efef86daf64d21dcd4f504e0d7c62ee03745f614 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 10 Mar 2024 01:40:07 +0100 Subject: [PATCH 07/11] style(clippy): resolve lint issues --- src/clients/beacon/mod.rs | 2 +- src/clients/blobscan/mod.rs | 4 ++-- src/indexer/mod.rs | 14 +++++++------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/clients/beacon/mod.rs b/src/clients/beacon/mod.rs index a60a006..b42d415 100644 --- a/src/clients/beacon/mod.rs +++ b/src/clients/beacon/mod.rs @@ -79,7 +79,7 @@ impl BeaconClient { }) } - pub fn subscribe_to_events(&self, topics: &Vec) -> ClientResult { + pub fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult { let topics = topics .iter() .map(|topic| topic.into()) diff --git a/src/clients/blobscan/mod.rs b/src/clients/blobscan/mod.rs index a0307b7..d695360 100644 --- a/src/clients/blobscan/mod.rs +++ b/src/clients/blobscan/mod.rs @@ -67,11 +67,11 @@ impl BlobscanClient { json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) } - pub async fn handle_reorged_slots(&self, slots: Vec) -> ClientResult { + pub async fn handle_reorged_slots(&self, slots: &[u32]) -> ClientResult { let url = self.base_url.join("indexer/reorged-slots")?; let token = self.jwt_manager.get_token()?; let req = ReorgedSlotsRequest { - reorged_slots: slots, + reorged_slots: slots.to_owned(), }; json_put!(&self.client, url, ReorgedSlotsResponse, token, &req) diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index b4f6c9c..8655bf1 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -45,6 +45,7 @@ impl Indexer { return Err(error.into()); } }; + let slots_checkpoint = args.slots_per_save; let num_threads = match args.num_threads { Some(num_threads) => num_threads, @@ -82,7 +83,7 @@ impl Indexer { None => match state.last_upper_synced_slot { Some(slot) => BlockId::Slot(slot - 1), None => BlockId::Head, - } + }, }, None => BlockId::Head, }, @@ -95,7 +96,7 @@ impl Indexer { None => match state.last_lower_synced_slot { Some(slot) => BlockId::Slot(slot + 1), None => BlockId::Head, - } + }, }, None => BlockId::Head, }, @@ -181,7 +182,7 @@ impl Indexer { Ok(Event::Open) => { let events = topics .iter() - .map(|topic| String::from(topic)) + .map(|topic| topic.into()) .collect::>() .join(", "); debug!(target, events, "Listening to beacon events…") @@ -200,7 +201,7 @@ impl Indexer { let mut current_reorged_block = old_head_block; let mut reorged_slots: Vec = vec![]; - for current_depth in 0..target_depth { + for current_depth in 1..=target_depth { let reorged_block_head = match beacon_client.get_block_header(&BlockId::Hash(current_reorged_block)).await? { Some(block) => block, None => { @@ -213,9 +214,9 @@ impl Indexer { current_reorged_block = reorged_block_head.header.message.parent_root; } - let total_updated_slots = blobscan_client.handle_reorged_slots(reorged_slots).await?; + let total_updated_slots = blobscan_client.handle_reorged_slots(&reorged_slots).await?; - info!(target, event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found reorged slots: {current_reorged_block}. Total slots marked as reorged: {total_updated_slots}"); + info!(target, event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); }, "head" => { let head_block_data = @@ -245,7 +246,6 @@ impl Indexer { &event.data, )?; let block_hash = finalized_checkpoint_data.block; - let full_block_hash = format!("0x{:x}", block_hash); let last_finalized_block_number = beacon_client .get_block(&BlockId::Hash(block_hash)) From d7059919fb705747d0e1ead2ddb8c9322e4c81ed Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 10 Mar 2024 03:54:00 +0100 Subject: [PATCH 08/11] style: add cosmetic changes --- src/indexer/mod.rs | 3 ++- src/main.rs | 4 +--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 8655bf1..750d0cb 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,6 +1,7 @@ use std::thread; use anyhow::{anyhow, Context as AnyhowContext}; + use futures::StreamExt; use reqwest_eventsource::Event; use tokio::{sync::mpsc, task::JoinHandle}; @@ -216,7 +217,7 @@ impl Indexer { let total_updated_slots = blobscan_client.handle_reorged_slots(&reorged_slots).await?; - info!(target, event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); + info!(target, event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); }, "head" => { let head_block_data = diff --git a/src/main.rs b/src/main.rs index c4f8a9e..5e2d12b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,9 +47,7 @@ pub fn print_banner(args: &Args, env: &Environment) { } if let Some(from_slot) = args.from_slot.clone() { - println!("Start slot: {}", from_slot); - } else { - println!("Start slot: 0"); + println!("Custom start slot: {}", from_slot.to_detailed_string()); } if let Some(num_threads) = args.num_threads { From b591dd9af9980c5cbd62c9da575a15d81935ad79 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 10 Mar 2024 03:54:16 +0100 Subject: [PATCH 09/11] fix(beacon_client): abstract full format display for block ids into a function --- src/clients/beacon/mod.rs | 14 +++++--------- src/clients/beacon/types.rs | 22 +++++++++++++++++++++- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/clients/beacon/mod.rs b/src/clients/beacon/mod.rs index b42d415..28c0a3e 100644 --- a/src/clients/beacon/mod.rs +++ b/src/clients/beacon/mod.rs @@ -38,13 +38,7 @@ impl BeaconClient { } pub async fn get_block(&self, block_id: &BlockId) -> ClientResult> { - let block_id = match block_id { - BlockId::Hash(hash) => format!("0x{:x}", hash), - BlockId::Slot(slot) => slot.to_string(), - block_id => block_id.to_string(), - }; - - let path = format!("v2/beacon/blocks/{block_id}"); + let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() }); let url = self.base_url.join(path.as_str())?; json_get!(&self.client, url, BlockResponse, self.exp_backoff.clone()).map(|res| match res { @@ -54,7 +48,7 @@ impl BeaconClient { } pub async fn get_block_header(&self, block_id: &BlockId) -> ClientResult> { - let path = format!("v1/beacon/headers/{block_id}"); + let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() }); let url = self.base_url.join(path.as_str())?; json_get!( @@ -70,7 +64,9 @@ impl BeaconClient { } pub async fn get_blobs(&self, block_id: &BlockId) -> ClientResult>> { - let path = format!("v1/beacon/blob_sidecars/{block_id}"); + let path = format!("v1/beacon/blob_sidecars/{}", { + block_id.to_detailed_string() + }); let url = self.base_url.join(path.as_str())?; json_get!(&self.client, url, BlobsResponse, self.exp_backoff.clone()).map(|res| match res { diff --git a/src/clients/beacon/types.rs b/src/clients/beacon/types.rs index f9dbcd3..c165978 100644 --- a/src/clients/beacon/types.rs +++ b/src/clients/beacon/types.rs @@ -117,6 +117,17 @@ where value.parse::().map_err(serde::de::Error::custom) } +impl BlockId { + pub fn to_detailed_string(&self) -> String { + match self { + BlockId::Head => String::from("head"), + BlockId::Finalized => String::from("finalized"), + BlockId::Slot(slot) => slot.to_string(), + BlockId::Hash(hash) => format!("0x{:x}", hash), + } + } +} + impl fmt::Display for BlockId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -138,7 +149,16 @@ impl FromStr for BlockId { _ => match s.parse::() { Ok(num) => Ok(BlockId::Slot(num)), Err(_) => { - Err("Invalid block ID. Expected 'head', 'finalized' or a number.".to_string()) + if s.starts_with("0x") { + match H256::from_str(s) { + Ok(hash) => Ok(BlockId::Hash(hash)), + Err(_) => Err(format!("Invalid block ID hash: {s}")), + } + } else { + Err( + format!("Invalid block ID: {s}. Expected 'head', 'finalized', a hash or a number."), + ) + } } }, } From 4af12bc16da7fbee572f8ee9760062664548ac8a Mon Sep 17 00:00:00 2001 From: "elessar.eth" Date: Sun, 10 Mar 2024 14:07:33 +0100 Subject: [PATCH 10/11] feat: add mainnet dencun fork slot Co-authored-by: Gabi --- src/utils/web3.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/utils/web3.rs b/src/utils/web3.rs index eaa53e5..d63c031 100644 --- a/src/utils/web3.rs +++ b/src/utils/web3.rs @@ -69,6 +69,7 @@ pub fn get_tx_versioned_hashes(tx: &Transaction) -> Result>> { pub fn get_network_dencun_fork_slot(network: &Network) -> u32 { match network { + Network::Mainnet => 8626176, Network::Goerli => 7413760, Network::Sepolia => 4243456, Network::Holesky => 950272, From 6a5cd30a2005e7a6b10ea9320eb9b541846433db Mon Sep 17 00:00:00 2001 From: PJColombo Date: Sun, 10 Mar 2024 14:28:46 +0100 Subject: [PATCH 11/11] refactor(network): integrate dencun fork slot getter function into the `Network` struct --- src/env.rs | 2 +- src/indexer/mod.rs | 5 +++-- src/main.rs | 7 ++----- src/network.rs | 23 +++++++++++++++++++++++ src/types.rs | 11 ----------- src/utils/web3.rs | 12 ------------ 6 files changed, 29 insertions(+), 31 deletions(-) create mode 100644 src/network.rs delete mode 100644 src/types.rs diff --git a/src/env.rs b/src/env.rs index 280ef7f..4a5237f 100644 --- a/src/env.rs +++ b/src/env.rs @@ -1,7 +1,7 @@ use envy::Error::MissingValue; use serde::Deserialize; -use crate::types::Network; +use crate::network::Network; #[derive(Deserialize, Debug)] pub struct Environment { diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 750d0cb..6bc15a4 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -18,7 +18,6 @@ use crate::{ context::{Config as ContextConfig, Context}, env::Environment, synchronizer::{Synchronizer, SynchronizerBuilder}, - utils::web3::get_network_dencun_fork_slot, }; use self::{ @@ -56,7 +55,7 @@ impl Indexer { }; let dencun_fork_slot = env .dencun_fork_slot - .unwrap_or(get_network_dencun_fork_slot(&env.network_name)); + .unwrap_or(env.network_name.dencun_fork_slot()); Ok(Self { context, @@ -218,6 +217,8 @@ impl Indexer { let total_updated_slots = blobscan_client.handle_reorged_slots(&reorged_slots).await?; info!(target, event=event_name, slot=slot, "Reorganization of depth {target_depth} detected. Found the following reorged slots: {:#?}. Total slots marked as reorged: {total_updated_slots}", reorged_slots); + + panic!("Chain reorg detected"); }, "head" => { let head_block_data = diff --git a/src/main.rs b/src/main.rs index 5e2d12b..cf3d8cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,16 +6,14 @@ use indexer::Indexer; use url::Url; use utils::telemetry::{get_subscriber, init_subscriber}; -use crate::utils::web3::get_network_dencun_fork_slot; - mod args; mod clients; mod context; mod env; mod indexer; +mod network; mod slots_processor; mod synchronizer; -mod types; mod utils; fn remove_credentials_from_url(url_string: &str) -> Option { @@ -42,8 +40,7 @@ pub fn print_banner(args: &Args, env: &Environment) { if let Some(dencun_fork_slot) = env.dencun_fork_slot { println!("Dencun fork slot: {dencun_fork_slot}"); } else { - let default_dencun_fork_slot = get_network_dencun_fork_slot(&env.network_name); - println!("Dencun fork slot: {default_dencun_fork_slot}"); + println!("Dencun fork slot: {}", env.network_name.dencun_fork_slot()); } if let Some(from_slot) = args.from_slot.clone() { diff --git a/src/network.rs b/src/network.rs new file mode 100644 index 0000000..b1046eb --- /dev/null +++ b/src/network.rs @@ -0,0 +1,23 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum Network { + Mainnet, + Goerli, + Sepolia, + Holesky, + Devnet, +} + +impl Network { + pub fn dencun_fork_slot(&self) -> u32 { + match self { + Network::Mainnet => 8626176, + Network::Goerli => 7413760, + Network::Sepolia => 4243456, + Network::Holesky => 950272, + Network::Devnet => 0, + } + } +} diff --git a/src/types.rs b/src/types.rs deleted file mode 100644 index a6e3993..0000000 --- a/src/types.rs +++ /dev/null @@ -1,11 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(rename_all = "lowercase")] -pub enum Network { - Mainnet, - Goerli, - Sepolia, - Holesky, - Devnet, -} diff --git a/src/utils/web3.rs b/src/utils/web3.rs index d63c031..bf4b170 100644 --- a/src/utils/web3.rs +++ b/src/utils/web3.rs @@ -4,8 +4,6 @@ use anyhow::{Context, Result}; use ethers::core::k256::sha2::{Digest, Sha256}; use ethers::{prelude::*, types::H256}; -use crate::types::Network; - const BLOB_COMMITMENT_VERSION_KZG: u8 = 0x01; pub fn sha256(value: &str) -> Result { @@ -66,13 +64,3 @@ pub fn get_tx_versioned_hashes(tx: &Transaction) -> Result>> { None => Ok(None), } } - -pub fn get_network_dencun_fork_slot(network: &Network) -> u32 { - match network { - Network::Mainnet => 8626176, - Network::Goerli => 7413760, - Network::Sepolia => 4243456, - Network::Holesky => 950272, - _ => 0, - } -}