From 6784165593752b3c5ef91f5f360bb999e8e0a6f4 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Mon, 6 May 2024 10:45:11 +0200 Subject: [PATCH 1/4] feat: allow to parametrize sync checkpoints enabling --- src/args.rs | 4 ++++ src/indexer/mod.rs | 8 ++++++++ src/synchronizer/mod.rs | 18 ++++++++++++++++-- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/args.rs b/src/args.rs index 1add8d4..92084c8 100644 --- a/src/args.rs +++ b/src/args.rs @@ -17,4 +17,8 @@ pub struct Args { /// Amount of slots to be processed before saving latest slot in the database #[arg(short, long)] pub slots_per_save: Option, + + /// Disable slot checkpoint saving + #[arg(short, long)] + pub disable_checkpoints: Option, } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 46c88aa..83050b9 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -33,6 +33,7 @@ pub struct Indexer { dencun_fork_slot: u32, num_threads: u32, slots_checkpoint: Option, + disable_checkpoints: Option, } impl Indexer { @@ -53,6 +54,8 @@ impl Indexer { .map_err(|err| anyhow!("Failed to get number of available threads: {:?}", err))? .get() as u32, }; + let disable_checkpoints = args.disable_checkpoints; + let dencun_fork_slot = env .dencun_fork_slot .unwrap_or(env.network_name.dencun_fork_slot()); @@ -62,6 +65,7 @@ impl Indexer { num_threads, slots_checkpoint, dencun_fork_slot, + disable_checkpoints, }) } @@ -315,6 +319,10 @@ impl Indexer { fn _create_synchronizer(&self) -> Synchronizer { let mut synchronizer_builder = SynchronizerBuilder::new(); + if let Some(disable_checkpoints) = self.disable_checkpoints { + synchronizer_builder.with_disable_checkpoints(disable_checkpoints); + } + synchronizer_builder.with_num_threads(self.num_threads); if let Some(slots_checkpoint) = self.slots_checkpoint { diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 8eea12b..4c5d4e8 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -18,6 +18,7 @@ pub struct SynchronizerBuilder { num_threads: u32, min_slots_per_thread: u32, slots_checkpoint: u32, + disable_checkpoints: bool, } pub struct Synchronizer { @@ -25,6 +26,7 @@ pub struct Synchronizer { num_threads: u32, min_slots_per_thread: u32, slots_checkpoint: u32, + disable_checkpoints: bool, } impl Default for SynchronizerBuilder { @@ -33,6 +35,7 @@ impl Default for SynchronizerBuilder { num_threads: 1, min_slots_per_thread: 50, slots_checkpoint: 1000, + disable_checkpoints: false, } } } @@ -42,6 +45,12 @@ impl SynchronizerBuilder { SynchronizerBuilder::default() } + pub fn with_disable_checkpoints(&mut self, disable_checkpoints: bool) -> &mut Self { + self.disable_checkpoints = disable_checkpoints; + + self + } + pub fn with_num_threads(&mut self, num_threads: u32) -> &mut Self { self.num_threads = num_threads; @@ -59,6 +68,7 @@ impl SynchronizerBuilder { num_threads: self.num_threads, min_slots_per_thread: self.min_slots_per_thread, slots_checkpoint: self.slots_checkpoint, + disable_checkpoints: self.disable_checkpoints, } } } @@ -73,8 +83,12 @@ impl Synchronizer { let mut final_slot = self._resolve_to_slot(final_block_id).await?; loop { - self._sync_slots_by_checkpoints(initial_slot, final_slot) - .await?; + if self.disable_checkpoints { + self._sync_slots(initial_slot, final_slot).await?; + } else { + self._sync_slots_by_checkpoints(initial_slot, final_slot) + .await?; + } let latest_final_slot = self._resolve_to_slot(final_block_id).await?; From 7e919013d47bed6615d53e1c97a1c7880a4be2d9 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Mon, 6 May 2024 12:34:36 +0200 Subject: [PATCH 2/4] feat(indexer): add support for disabling historical sync --- src/args.rs | 4 ++++ src/indexer/mod.rs | 28 ++++++++++++++++++++++++++-- src/main.rs | 9 +++++++-- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/src/args.rs b/src/args.rs index 92084c8..44acbef 100644 --- a/src/args.rs +++ b/src/args.rs @@ -21,4 +21,8 @@ pub struct Args { /// Disable slot checkpoint saving #[arg(short, long)] pub disable_checkpoints: Option, + + /// Disable histotircal synchronization + #[arg(short, long)] + pub disable_historical_sync: Option, } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 83050b9..afbad82 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -36,6 +36,18 @@ pub struct Indexer { disable_checkpoints: Option, } +pub struct RunOptions { + pub disable_historical_sync: Option, +} + +impl Default for RunOptions { + fn default() -> Self { + Self { + disable_historical_sync: Some(true), + } + } +} + impl Indexer { pub fn try_new(env: &Environment, args: &Args) -> IndexerResult { let context = match Context::try_new(ContextConfig::from(env)) { @@ -69,7 +81,16 @@ impl Indexer { }) } - pub async fn run(&mut self, custom_start_block_id: Option) -> IndexerResult<()> { + pub async fn run( + &mut self, + custom_start_block_id: Option, + opts: Option, + ) -> IndexerResult<()> { + let opts = match opts { + Some(opts) => opts, + None => RunOptions::default(), + }; + let sync_state = match self.context.blobscan_client().get_sync_state().await { Ok(state) => state, Err(error) => { @@ -116,7 +137,10 @@ impl Indexer { let (tx, mut rx) = mpsc::channel(32); let tx1 = tx.clone(); - self._start_historical_sync_task(tx1, current_lower_block_id); + if opts.disable_historical_sync.is_some_and(|disable| !disable) { + self._start_historical_sync_task(tx1, current_lower_block_id); + } + self._start_realtime_sync_task(tx, current_upper_block_id); while let Some(message) = rx.recv().await { diff --git a/src/main.rs b/src/main.rs index cf3d8cd..42d1a17 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result as AnyhowResult}; use args::Args; use clap::Parser; use env::Environment; -use indexer::Indexer; +use indexer::{Indexer, RunOptions}; use url::Url; use utils::telemetry::{get_subscriber, init_subscriber}; @@ -99,11 +99,16 @@ async fn run() -> AnyhowResult<()> { init_subscriber(subscriber); let args = Args::parse(); + let run_opts = args + .disable_historical_sync + .map(|disable_historical_sync| RunOptions { + disable_historical_sync: Some(disable_historical_sync), + }); print_banner(&args, &env); Indexer::try_new(&env, &args)? - .run(args.from_slot) + .run(args.from_slot, run_opts) .await .map_err(|err| anyhow!(err)) } From 6c3130bbf367ec9ceb2e41aab67c53905ae229f9 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Mon, 6 May 2024 18:28:25 +0200 Subject: [PATCH 3/4] fix: disable slot checkpoint save instead of syncing by checkpoints + rename disable args --- src/args.rs | 14 +++---- src/indexer/mod.rs | 23 ++++------- src/main.rs | 8 ++-- src/synchronizer/mod.rs | 88 ++++++++++++++++++++--------------------- 4 files changed, 60 insertions(+), 73 deletions(-) diff --git a/src/args.rs b/src/args.rs index 44acbef..9713383 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,4 +1,4 @@ -use clap::Parser; +use clap::{ArgAction, Parser}; use crate::clients::beacon::types::BlockId; @@ -18,11 +18,11 @@ pub struct Args { #[arg(short, long)] pub slots_per_save: Option, - /// Disable slot checkpoint saving - #[arg(short, long)] - pub disable_checkpoints: Option, + /// Disable slot checkpoint saving when syncing + #[arg(short = 'c', long, action = ArgAction::SetTrue)] + pub disable_sync_checkpoint_save: bool, - /// Disable histotircal synchronization - #[arg(short, long)] - pub disable_historical_sync: Option, + /// Disable historical synchronization + #[arg(short = 'd', long, action = ArgAction::SetTrue)] + pub disable_sync_historical: bool, } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index afbad82..72ff22d 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -33,19 +33,12 @@ pub struct Indexer { dencun_fork_slot: u32, num_threads: u32, slots_checkpoint: Option, - disable_checkpoints: Option, + disable_sync_checkpoint_save: bool, } +#[derive(Debug, Default)] pub struct RunOptions { - pub disable_historical_sync: Option, -} - -impl Default for RunOptions { - fn default() -> Self { - Self { - disable_historical_sync: Some(true), - } - } + pub disable_sync_historical: bool, } impl Indexer { @@ -66,7 +59,7 @@ impl Indexer { .map_err(|err| anyhow!("Failed to get number of available threads: {:?}", err))? .get() as u32, }; - let disable_checkpoints = args.disable_checkpoints; + let disable_sync_checkpoint_save = args.disable_sync_checkpoint_save; let dencun_fork_slot = env .dencun_fork_slot @@ -77,7 +70,7 @@ impl Indexer { num_threads, slots_checkpoint, dencun_fork_slot, - disable_checkpoints, + disable_sync_checkpoint_save, }) } @@ -137,7 +130,7 @@ impl Indexer { let (tx, mut rx) = mpsc::channel(32); let tx1 = tx.clone(); - if opts.disable_historical_sync.is_some_and(|disable| !disable) { + if !opts.disable_sync_historical { self._start_historical_sync_task(tx1, current_lower_block_id); } @@ -343,9 +336,7 @@ impl Indexer { fn _create_synchronizer(&self) -> Synchronizer { let mut synchronizer_builder = SynchronizerBuilder::new(); - if let Some(disable_checkpoints) = self.disable_checkpoints { - synchronizer_builder.with_disable_checkpoints(disable_checkpoints); - } + synchronizer_builder.with_disable_checkpoint_save(self.disable_sync_checkpoint_save); synchronizer_builder.with_num_threads(self.num_threads); diff --git a/src/main.rs b/src/main.rs index 42d1a17..4368a71 100644 --- a/src/main.rs +++ b/src/main.rs @@ -99,11 +99,9 @@ async fn run() -> AnyhowResult<()> { init_subscriber(subscriber); let args = Args::parse(); - let run_opts = args - .disable_historical_sync - .map(|disable_historical_sync| RunOptions { - disable_historical_sync: Some(disable_historical_sync), - }); + let run_opts = Some(RunOptions { + disable_sync_historical: args.disable_sync_historical, + }); print_banner(&args, &env); diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 4c5d4e8..bbf693e 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -18,7 +18,7 @@ pub struct SynchronizerBuilder { num_threads: u32, min_slots_per_thread: u32, slots_checkpoint: u32, - disable_checkpoints: bool, + disable_checkpoint_save: bool, } pub struct Synchronizer { @@ -26,7 +26,7 @@ pub struct Synchronizer { num_threads: u32, min_slots_per_thread: u32, slots_checkpoint: u32, - disable_checkpoints: bool, + disable_checkpoint_save: bool, } impl Default for SynchronizerBuilder { @@ -35,7 +35,7 @@ impl Default for SynchronizerBuilder { num_threads: 1, min_slots_per_thread: 50, slots_checkpoint: 1000, - disable_checkpoints: false, + disable_checkpoint_save: false, } } } @@ -45,8 +45,8 @@ impl SynchronizerBuilder { SynchronizerBuilder::default() } - pub fn with_disable_checkpoints(&mut self, disable_checkpoints: bool) -> &mut Self { - self.disable_checkpoints = disable_checkpoints; + pub fn with_disable_checkpoint_save(&mut self, disable_checkpoint_save: bool) -> &mut Self { + self.disable_checkpoint_save = disable_checkpoint_save; self } @@ -68,7 +68,7 @@ impl SynchronizerBuilder { num_threads: self.num_threads, min_slots_per_thread: self.min_slots_per_thread, slots_checkpoint: self.slots_checkpoint, - disable_checkpoints: self.disable_checkpoints, + disable_checkpoint_save: self.disable_checkpoint_save, } } } @@ -83,12 +83,8 @@ impl Synchronizer { let mut final_slot = self._resolve_to_slot(final_block_id).await?; loop { - if self.disable_checkpoints { - self._sync_slots(initial_slot, final_slot).await?; - } else { - self._sync_slots_by_checkpoints(initial_slot, final_slot) - .await?; - } + self._sync_slots_by_checkpoints(initial_slot, final_slot) + .await?; let latest_final_slot = self._resolve_to_slot(final_block_id).await?; @@ -220,41 +216,43 @@ impl Synchronizer { let last_lower_synced_slot = if is_reverse_sync { last_slot } else { None }; let last_upper_synced_slot = if is_reverse_sync { None } else { last_slot }; - if let Err(error) = self - .context - .blobscan_client() - .update_sync_state(BlockchainSyncState { - last_finalized_block: None, - last_lower_synced_slot, - last_upper_synced_slot, - }) - .await - { - let new_synced_slot = match last_lower_synced_slot { - Some(slot) => slot, - None => match last_upper_synced_slot { + if !self.disable_checkpoint_save { + if let Err(error) = self + .context + .blobscan_client() + .update_sync_state(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot, + last_upper_synced_slot, + }) + .await + { + let new_synced_slot = match last_lower_synced_slot { Some(slot) => slot, - None => { - return Err(SynchronizerError::Other(anyhow!( - "Failed to get new last synced slot: last_lower_synced_slot and last_upper_synced_slot are both None" - ))) - } - }, - }; - - return Err(SynchronizerError::FailedSlotCheckpointSave { - slot: new_synced_slot, - error, - }); - } + None => match last_upper_synced_slot { + Some(slot) => slot, + None => { + return Err(SynchronizerError::Other(anyhow!( + "Failed to get new last synced slot: last_lower_synced_slot and last_upper_synced_slot are both None" + ))) + } + }, + }; + + return Err(SynchronizerError::FailedSlotCheckpointSave { + slot: new_synced_slot, + error, + }); + } - if unprocessed_slots >= self.slots_checkpoint { - debug!( - target = "synchronizer", - new_last_lower_synced_slot = last_lower_synced_slot, - new_last_upper_synced_slot = last_upper_synced_slot, - "Checkpoint reached. Last synced slot saved…" - ); + if unprocessed_slots >= self.slots_checkpoint { + debug!( + target = "synchronizer", + new_last_lower_synced_slot = last_lower_synced_slot, + new_last_upper_synced_slot = last_upper_synced_slot, + "Checkpoint reached. Last synced slot saved…" + ); + } } current_slot = if is_reverse_sync { From 7a37a15490eeaa498a6d46324e0a26f855f06d1c Mon Sep 17 00:00:00 2001 From: PJColombo Date: Mon, 6 May 2024 18:40:28 +0200 Subject: [PATCH 4/4] chore: add new args to banner --- src/main.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/main.rs b/src/main.rs index 4368a71..6889367 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,6 +59,24 @@ pub fn print_banner(args: &Args, env: &Environment) { println!("Slots checkpoint size: 1000"); } + println!( + "Disable sync checkpoint saving: {}", + if args.disable_sync_checkpoint_save { + "yes" + } else { + "no" + } + ); + + println!( + "Disable historical sync: {}", + if args.disable_sync_historical { + "yes" + } else { + "no" + } + ); + println!("Blobscan API endpoint: {}", env.blobscan_api_endpoint); println!( "CL endpoint: {:?}",