diff --git a/src/args.rs b/src/args.rs index 9713383..6fb19db 100644 --- a/src/args.rs +++ b/src/args.rs @@ -10,6 +10,10 @@ pub struct Args { #[arg(short, long)] pub from_slot: Option, + /// Slot to stop indexing at + #[arg(short, long)] + pub to_slot: Option, + /// Number of threads used for parallel indexing #[arg(short, long)] pub num_threads: Option, diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 72ff22d..2a65512 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -34,11 +34,7 @@ pub struct Indexer { num_threads: u32, slots_checkpoint: Option, disable_sync_checkpoint_save: bool, -} - -#[derive(Debug, Default)] -pub struct RunOptions { - pub disable_sync_historical: bool, + disable_sync_historical: bool, } impl Indexer { @@ -60,6 +56,7 @@ impl Indexer { .get() as u32, }; let disable_sync_checkpoint_save = args.disable_sync_checkpoint_save; + let disable_sync_historical = args.disable_sync_historical; let dencun_fork_slot = env .dencun_fork_slot @@ -71,19 +68,15 @@ impl Indexer { slots_checkpoint, dencun_fork_slot, disable_sync_checkpoint_save, + disable_sync_historical, }) } pub async fn run( &mut self, - custom_start_block_id: Option, - opts: Option, + start_block_id: Option, + end_block_id: Option, ) -> IndexerResult<()> { - let opts = match opts { - Some(opts) => opts, - None => RunOptions::default(), - }; - let sync_state = match self.context.blobscan_client().get_sync_state().await { Ok(state) => state, Err(error) => { @@ -93,7 +86,7 @@ impl Indexer { } }; - let current_lower_block_id = match custom_start_block_id.clone() { + let current_lower_block_id = match start_block_id.clone() { Some(block_id) => block_id, None => match &sync_state { Some(state) => match state.last_lower_synced_slot { @@ -106,7 +99,7 @@ impl Indexer { None => BlockId::Head, }, }; - let current_upper_block_id = match custom_start_block_id { + let current_upper_block_id = match start_block_id { Some(block_id) => block_id, None => match &sync_state { Some(state) => match state.last_upper_synced_slot { @@ -129,18 +122,47 @@ impl Indexer { let (tx, mut rx) = mpsc::channel(32); let tx1 = tx.clone(); + let mut total_tasks = 0; - if !opts.disable_sync_historical { - self._start_historical_sync_task(tx1, current_lower_block_id); + if end_block_id.is_none() { + self._start_realtime_sync_task(tx, current_upper_block_id); + total_tasks += 1; } - self._start_realtime_sync_task(tx, current_upper_block_id); + if !self.disable_sync_historical { + let historical_start_block_id = current_lower_block_id; + let historical_end_block_id = match end_block_id { + Some(block_id) => block_id, + None => BlockId::Slot(self.dencun_fork_slot), + }; + + self._start_historical_sync_task( + tx1, + historical_start_block_id, + historical_end_block_id, + ); + + total_tasks += 1; + } + + let mut completed_tasks = 0; while let Some(message) = rx.recv().await { - if let Err(error) = message { - error!(target = "indexer", ?error, "Indexer error occurred"); + match message { + IndexerTaskResult::Done(task_name) => { + info!(target = "indexer", task = task_name, "Task completed."); - return Err(error.into()); + completed_tasks += 1; + + if completed_tasks == total_tasks { + return Ok(()); + } + } + IndexerTaskResult::Error(error) => { + error!(target = "indexer", ?error, "Indexer error occurred"); + + return Err(error.into()); + } } } @@ -151,35 +173,27 @@ impl Indexer { &self, tx: mpsc::Sender, start_block_id: BlockId, - ) -> JoinHandle { + end_block_id: BlockId, + ) -> JoinHandle> { + let task_name = "historical_sync".to_string(); let mut synchronizer = self._create_synchronizer(); - let target_lowest_slot = self.dencun_fork_slot; tokio::spawn(async move { - if let BlockId::Slot(slot) = start_block_id { - if slot <= target_lowest_slot { - debug!( - target = "indexer:historical_sync", - "Skip sync. Dencun fork slot reached" - ); - - return Ok(()); - } - } - - let result = synchronizer - .run(&start_block_id, &BlockId::Slot(target_lowest_slot)) - .await; + let result = synchronizer.run(&start_block_id, &end_block_id).await; if let Err(error) = result { // TODO: Find a better way to handle this error - tx.send(Err(IndexingTaskError::FailedIndexingTask { - task_name: "historical_sync".to_string(), - error: error.into(), - })) + tx.send(IndexerTaskResult::Error( + IndexingTaskError::FailedIndexingTask { + task_name, + error: error.into(), + }, + )) .await .unwrap(); - }; + } else { + tx.send(IndexerTaskResult::Done(task_name)).await.unwrap(); + } Ok(()) }) @@ -189,7 +203,7 @@ impl Indexer { &self, tx: mpsc::Sender, start_block_id: BlockId, - ) -> JoinHandle { + ) -> JoinHandle> { let task_name = "realtime_sync".to_string(); let target = format!("indexer:{task_name}"); let task_context = self.context.clone(); @@ -263,14 +277,6 @@ impl Indexer { }; synchronizer.run(initial_block_id, head_block_id).await?; - - blobscan_client - .update_sync_state(BlockchainSyncState { - last_finalized_block: None, - last_lower_synced_slot: None, - last_upper_synced_slot: Some(head_block_data.slot), - }) - .await?; } "finalized_checkpoint" => { let finalized_checkpoint_data = @@ -321,13 +327,14 @@ impl Indexer { if let Err(error) = result { // TODO: Find a better way to handle this error - tx.send(Err(IndexingTaskError::FailedIndexingTask { - task_name, - error, - })) + tx.send(IndexerTaskResult::Error( + IndexingTaskError::FailedIndexingTask { task_name, error }, + )) .await .unwrap(); - }; + } else { + tx.send(IndexerTaskResult::Done(task_name)).await.unwrap(); + } Ok(()) }) diff --git a/src/indexer/types.rs b/src/indexer/types.rs index 8c975e3..78facbe 100644 --- a/src/indexer/types.rs +++ b/src/indexer/types.rs @@ -2,4 +2,7 @@ use super::error::{IndexerError, IndexingTaskError}; pub type IndexerResult = Result; -pub type IndexerTaskResult = Result<(), IndexingTaskError>; +pub enum IndexerTaskResult { + Done(String), + Error(IndexingTaskError), +} diff --git a/src/main.rs b/src/main.rs index 6889367..c7048a1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,9 +2,11 @@ use anyhow::{anyhow, Result as AnyhowResult}; use args::Args; use clap::Parser; use env::Environment; -use indexer::{Indexer, RunOptions}; -use url::Url; -use utils::telemetry::{get_subscriber, init_subscriber}; +use indexer::Indexer; +use utils::{ + banner::print_banner, + telemetry::{get_subscriber, init_subscriber}, +}; mod args; mod clients; @@ -16,84 +18,6 @@ mod slots_processor; mod synchronizer; mod utils; -fn remove_credentials_from_url(url_string: &str) -> Option { - match Url::parse(url_string) { - Ok(mut url) => { - url.set_username("******").unwrap(); - url.set_password(None).unwrap(); - Some(url.into()) - } - Err(_) => None, - } -} - -pub fn print_banner(args: &Args, env: &Environment) { - println!("____ _ _ "); - println!("| __ )| | ___ | |__ ___ ___ __ _ _ __ "); - println!("| _ \\| |/ _ \\| '_ \\/ __|/ __/ _` | '_ \\ "); - println!("| |_) | | (_) | |_) \\__ \\ (_| (_| | | | |"); - println!("|____/|_|\\___/|_.__/|___/\\___\\__,_|_| |_|\n"); - println!("Blobscan indexer (EIP-4844 blob indexer) - blobscan.com"); - println!("======================================================="); - - println!("Network: {:?}", env.network_name); - if let Some(dencun_fork_slot) = env.dencun_fork_slot { - println!("Dencun fork slot: {dencun_fork_slot}"); - } else { - println!("Dencun fork slot: {}", env.network_name.dencun_fork_slot()); - } - - if let Some(from_slot) = args.from_slot.clone() { - println!("Custom start slot: {}", from_slot.to_detailed_string()); - } - - if let Some(num_threads) = args.num_threads { - println!("Number of threads: {}", num_threads); - } else { - println!("Number of threads: auto"); - } - - if let Some(slots_per_save) = args.slots_per_save { - println!("Slots checkpoint size: {}", slots_per_save); - } else { - println!("Slots checkpoint size: 1000"); - } - - println!( - "Disable sync checkpoint saving: {}", - if args.disable_sync_checkpoint_save { - "yes" - } else { - "no" - } - ); - - println!( - "Disable historical sync: {}", - if args.disable_sync_historical { - "yes" - } else { - "no" - } - ); - - println!("Blobscan API endpoint: {}", env.blobscan_api_endpoint); - println!( - "CL endpoint: {:?}", - remove_credentials_from_url(env.beacon_node_endpoint.as_str()) - ); - println!( - "EL endpoint: {:?}", - remove_credentials_from_url(env.execution_node_endpoint.as_str()) - ); - - if let Some(sentry_dsn) = env.sentry_dsn.clone() { - println!("Sentry DSN: {}", sentry_dsn); - } - - println!("\n"); -} - async fn run() -> AnyhowResult<()> { dotenv::dotenv().ok(); let env = match Environment::from_env() { @@ -117,14 +41,11 @@ async fn run() -> AnyhowResult<()> { init_subscriber(subscriber); let args = Args::parse(); - let run_opts = Some(RunOptions { - disable_sync_historical: args.disable_sync_historical, - }); print_banner(&args, &env); Indexer::try_new(&env, &args)? - .run(args.from_slot, run_opts) + .run(args.from_slot, args.to_slot) .await .map_err(|err| anyhow!(err)) } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index bbf693e..31f8040 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -99,10 +99,9 @@ impl Synchronizer { async fn _sync_slots(&mut self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> { let is_reverse_sync = to_slot < from_slot; let unprocessed_slots = to_slot.abs_diff(from_slot) + 1; - let slots_per_thread = std::cmp::max( - self.min_slots_per_thread, - unprocessed_slots / self.num_threads, - ); + let min_slots_per_thread = std::cmp::min(unprocessed_slots, self.min_slots_per_thread); + let slots_per_thread = + std::cmp::max(min_slots_per_thread, unprocessed_slots / self.num_threads); let num_threads = std::cmp::max(1, unprocessed_slots / slots_per_thread); let remaining_slots = unprocessed_slots % num_threads; diff --git a/src/utils/banner.rs b/src/utils/banner.rs new file mode 100644 index 0000000..a4243df --- /dev/null +++ b/src/utils/banner.rs @@ -0,0 +1,127 @@ +use url::Url; + +use crate::{args::Args, env::Environment}; + +fn mask_quik_node_url(url_string: &str) -> Option { + match Url::parse(url_string) { + Ok(mut url) => { + // Get the path segments as a vector of strings. + let mut segments: Vec<&str> = url.path_segments().map_or(vec![], |c| c.collect()); + + if !segments.is_empty() { + *segments.last_mut().unwrap() = "*******"; + + url.set_path(&segments.join("/")); + } + + if let Some(host) = url.host_str() { + // Split the host by '.' to isolate the subdomain part. + let parts: Vec<&str> = host.split('.').collect(); + if parts.len() > 2 { + // Join the parts back into a string with the subdomain masked. + let masked_host = format!( + "{}.{}.{}{}{}", + "*******", + parts[1], + parts[2], + if parts.len() > 3 { "." } else { "" }, + parts[3..].join(".") + ); + // Set the new host. + url.set_host(Some(&masked_host)).ok()?; + } + } + + // Return the modified URL as a string. + Some(url.to_string()) + } + // Return None if the URL parsing fails. + Err(_) => None, + } +} +pub fn remove_credentials_from_url(url_string: &str) -> Option { + match Url::parse(url_string) { + Ok(mut url) => { + if let Some(host) = url.host_str() { + if host.contains("quiknode.pro") { + return mask_quik_node_url(url_string); + } + } + url.set_username("******").unwrap(); + url.set_password(None).unwrap(); + Some(url.into()) + } + Err(_) => None, + } +} + +pub fn print_banner(args: &Args, env: &Environment) { + println!("____ _ _ "); + println!("| __ )| | ___ | |__ ___ ___ __ _ _ __ "); + println!("| _ \\| |/ _ \\| '_ \\/ __|/ __/ _` | '_ \\ "); + println!("| |_) | | (_) | |_) \\__ \\ (_| (_| | | | |"); + println!("|____/|_|\\___/|_.__/|___/\\___\\__,_|_| |_|\n"); + println!("Blobscan indexer (EIP-4844 blob indexer) - blobscan.com"); + println!("======================================================="); + + println!("Network: {:?}", env.network_name); + if let Some(dencun_fork_slot) = env.dencun_fork_slot { + println!("Dencun fork slot: {dencun_fork_slot}"); + } else { + println!("Dencun fork slot: {}", env.network_name.dencun_fork_slot()); + } + + if let Some(from_slot) = args.from_slot.clone() { + println!("Custom start slot: {}", from_slot.to_detailed_string()); + } + + if let Some(to_slot) = args.to_slot.clone() { + println!("Custom end slot: {}", to_slot.to_detailed_string()); + } + + if let Some(num_threads) = args.num_threads { + println!("Number of threads: {}", num_threads); + } else { + println!("Number of threads: auto"); + } + + if let Some(slots_per_save) = args.slots_per_save { + println!("Slots checkpoint size: {}", slots_per_save); + } else { + println!("Slots checkpoint size: 1000"); + } + + println!( + "Disable sync checkpoint saving: {}", + if args.disable_sync_checkpoint_save { + "yes" + } else { + "no" + } + ); + + println!( + "Disable historical sync: {}", + if args.disable_sync_historical { + "yes" + } else { + "no" + } + ); + + println!("Blobscan API endpoint: {}", env.blobscan_api_endpoint); + println!( + "CL endpoint: {:?}", + remove_credentials_from_url(env.beacon_node_endpoint.as_str()) + ); + println!( + "EL endpoint: {:?}", + remove_credentials_from_url(env.execution_node_endpoint.as_str()) + ); + + if let Some(sentry_dsn) = env.sentry_dsn.clone() { + println!("Sentry DSN: {}", sentry_dsn); + } + + println!("\n"); +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 111d209..7765f5a 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,2 +1,3 @@ +pub mod banner; pub mod telemetry; pub mod web3;