diff --git a/src/args.rs b/src/args.rs index 1add8d4..9713383 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,4 +1,4 @@ -use clap::Parser; +use clap::{ArgAction, Parser}; use crate::clients::beacon::types::BlockId; @@ -17,4 +17,12 @@ pub struct Args { /// Amount of slots to be processed before saving latest slot in the database #[arg(short, long)] pub slots_per_save: Option, + + /// Disable slot checkpoint saving when syncing + #[arg(short = 'c', long, action = ArgAction::SetTrue)] + pub disable_sync_checkpoint_save: bool, + + /// Disable historical synchronization + #[arg(short = 'd', long, action = ArgAction::SetTrue)] + pub disable_sync_historical: bool, } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 46c88aa..72ff22d 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -33,6 +33,12 @@ pub struct Indexer { dencun_fork_slot: u32, num_threads: u32, slots_checkpoint: Option, + disable_sync_checkpoint_save: bool, +} + +#[derive(Debug, Default)] +pub struct RunOptions { + pub disable_sync_historical: bool, } impl Indexer { @@ -53,6 +59,8 @@ impl Indexer { .map_err(|err| anyhow!("Failed to get number of available threads: {:?}", err))? .get() as u32, }; + let disable_sync_checkpoint_save = args.disable_sync_checkpoint_save; + let dencun_fork_slot = env .dencun_fork_slot .unwrap_or(env.network_name.dencun_fork_slot()); @@ -62,10 +70,20 @@ impl Indexer { num_threads, slots_checkpoint, dencun_fork_slot, + disable_sync_checkpoint_save, }) } - pub async fn run(&mut self, custom_start_block_id: Option) -> IndexerResult<()> { + pub async fn run( + &mut self, + custom_start_block_id: Option, + opts: Option, + ) -> IndexerResult<()> { + let opts = match opts { + Some(opts) => opts, + None => RunOptions::default(), + }; + let sync_state = match self.context.blobscan_client().get_sync_state().await { Ok(state) => state, Err(error) => { @@ -112,7 +130,10 @@ impl Indexer { let (tx, mut rx) = mpsc::channel(32); let tx1 = tx.clone(); - self._start_historical_sync_task(tx1, current_lower_block_id); + if !opts.disable_sync_historical { + self._start_historical_sync_task(tx1, current_lower_block_id); + } + self._start_realtime_sync_task(tx, current_upper_block_id); while let Some(message) = rx.recv().await { @@ -315,6 +336,8 @@ impl Indexer { fn _create_synchronizer(&self) -> Synchronizer { let mut synchronizer_builder = SynchronizerBuilder::new(); + synchronizer_builder.with_disable_checkpoint_save(self.disable_sync_checkpoint_save); + synchronizer_builder.with_num_threads(self.num_threads); if let Some(slots_checkpoint) = self.slots_checkpoint { diff --git a/src/main.rs b/src/main.rs index cf3d8cd..6889367 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result as AnyhowResult}; use args::Args; use clap::Parser; use env::Environment; -use indexer::Indexer; +use indexer::{Indexer, RunOptions}; use url::Url; use utils::telemetry::{get_subscriber, init_subscriber}; @@ -59,6 +59,24 @@ pub fn print_banner(args: &Args, env: &Environment) { println!("Slots checkpoint size: 1000"); } + println!( + "Disable sync checkpoint saving: {}", + if args.disable_sync_checkpoint_save { + "yes" + } else { + "no" + } + ); + + println!( + "Disable historical sync: {}", + if args.disable_sync_historical { + "yes" + } else { + "no" + } + ); + println!("Blobscan API endpoint: {}", env.blobscan_api_endpoint); println!( "CL endpoint: {:?}", @@ -99,11 +117,14 @@ async fn run() -> AnyhowResult<()> { init_subscriber(subscriber); let args = Args::parse(); + let run_opts = Some(RunOptions { + disable_sync_historical: args.disable_sync_historical, + }); print_banner(&args, &env); Indexer::try_new(&env, &args)? - .run(args.from_slot) + .run(args.from_slot, run_opts) .await .map_err(|err| anyhow!(err)) } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 8eea12b..bbf693e 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -18,6 +18,7 @@ pub struct SynchronizerBuilder { num_threads: u32, min_slots_per_thread: u32, slots_checkpoint: u32, + disable_checkpoint_save: bool, } pub struct Synchronizer { @@ -25,6 +26,7 @@ pub struct Synchronizer { num_threads: u32, min_slots_per_thread: u32, slots_checkpoint: u32, + disable_checkpoint_save: bool, } impl Default for SynchronizerBuilder { @@ -33,6 +35,7 @@ impl Default for SynchronizerBuilder { num_threads: 1, min_slots_per_thread: 50, slots_checkpoint: 1000, + disable_checkpoint_save: false, } } } @@ -42,6 +45,12 @@ impl SynchronizerBuilder { SynchronizerBuilder::default() } + pub fn with_disable_checkpoint_save(&mut self, disable_checkpoint_save: bool) -> &mut Self { + self.disable_checkpoint_save = disable_checkpoint_save; + + self + } + pub fn with_num_threads(&mut self, num_threads: u32) -> &mut Self { self.num_threads = num_threads; @@ -59,6 +68,7 @@ impl SynchronizerBuilder { num_threads: self.num_threads, min_slots_per_thread: self.min_slots_per_thread, slots_checkpoint: self.slots_checkpoint, + disable_checkpoint_save: self.disable_checkpoint_save, } } } @@ -206,41 +216,43 @@ impl Synchronizer { let last_lower_synced_slot = if is_reverse_sync { last_slot } else { None }; let last_upper_synced_slot = if is_reverse_sync { None } else { last_slot }; - if let Err(error) = self - .context - .blobscan_client() - .update_sync_state(BlockchainSyncState { - last_finalized_block: None, - last_lower_synced_slot, - last_upper_synced_slot, - }) - .await - { - let new_synced_slot = match last_lower_synced_slot { - Some(slot) => slot, - None => match last_upper_synced_slot { + if !self.disable_checkpoint_save { + if let Err(error) = self + .context + .blobscan_client() + .update_sync_state(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot, + last_upper_synced_slot, + }) + .await + { + let new_synced_slot = match last_lower_synced_slot { Some(slot) => slot, - None => { - return Err(SynchronizerError::Other(anyhow!( - "Failed to get new last synced slot: last_lower_synced_slot and last_upper_synced_slot are both None" - ))) - } - }, - }; - - return Err(SynchronizerError::FailedSlotCheckpointSave { - slot: new_synced_slot, - error, - }); - } + None => match last_upper_synced_slot { + Some(slot) => slot, + None => { + return Err(SynchronizerError::Other(anyhow!( + "Failed to get new last synced slot: last_lower_synced_slot and last_upper_synced_slot are both None" + ))) + } + }, + }; + + return Err(SynchronizerError::FailedSlotCheckpointSave { + slot: new_synced_slot, + error, + }); + } - if unprocessed_slots >= self.slots_checkpoint { - debug!( - target = "synchronizer", - new_last_lower_synced_slot = last_lower_synced_slot, - new_last_upper_synced_slot = last_upper_synced_slot, - "Checkpoint reached. Last synced slot saved…" - ); + if unprocessed_slots >= self.slots_checkpoint { + debug!( + target = "synchronizer", + new_last_lower_synced_slot = last_lower_synced_slot, + new_last_upper_synced_slot = last_upper_synced_slot, + "Checkpoint reached. Last synced slot saved…" + ); + } } current_slot = if is_reverse_sync {