Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add disabling args #68

Merged
merged 4 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/args.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use clap::Parser;
use clap::{ArgAction, Parser};

use crate::clients::beacon::types::BlockId;

Expand All @@ -17,4 +17,12 @@ pub struct Args {
/// Amount of slots to be processed before saving latest slot in the database
#[arg(short, long)]
pub slots_per_save: Option<u32>,

/// Disable slot checkpoint saving when syncing
#[arg(short = 'c', long, action = ArgAction::SetTrue)]
pub disable_sync_checkpoint_save: bool,

/// Disable historical synchronization
#[arg(short = 'd', long, action = ArgAction::SetTrue)]
pub disable_sync_historical: bool,
}
27 changes: 25 additions & 2 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub struct Indexer {
dencun_fork_slot: u32,
num_threads: u32,
slots_checkpoint: Option<u32>,
disable_sync_checkpoint_save: bool,
}

#[derive(Debug, Default)]
pub struct RunOptions {
pub disable_sync_historical: bool,
}

impl Indexer {
Expand All @@ -53,6 +59,8 @@ impl Indexer {
.map_err(|err| anyhow!("Failed to get number of available threads: {:?}", err))?
.get() as u32,
};
let disable_sync_checkpoint_save = args.disable_sync_checkpoint_save;

let dencun_fork_slot = env
.dencun_fork_slot
.unwrap_or(env.network_name.dencun_fork_slot());
Expand All @@ -62,10 +70,20 @@ impl Indexer {
num_threads,
slots_checkpoint,
dencun_fork_slot,
disable_sync_checkpoint_save,
})
}

pub async fn run(&mut self, custom_start_block_id: Option<BlockId>) -> IndexerResult<()> {
pub async fn run(
&mut self,
custom_start_block_id: Option<BlockId>,
opts: Option<RunOptions>,
) -> IndexerResult<()> {
let opts = match opts {
Some(opts) => opts,
None => RunOptions::default(),
};

let sync_state = match self.context.blobscan_client().get_sync_state().await {
Ok(state) => state,
Err(error) => {
Expand Down Expand Up @@ -112,7 +130,10 @@ impl Indexer {
let (tx, mut rx) = mpsc::channel(32);
let tx1 = tx.clone();

self._start_historical_sync_task(tx1, current_lower_block_id);
if !opts.disable_sync_historical {
self._start_historical_sync_task(tx1, current_lower_block_id);
}

self._start_realtime_sync_task(tx, current_upper_block_id);

while let Some(message) = rx.recv().await {
Expand Down Expand Up @@ -315,6 +336,8 @@ impl Indexer {
fn _create_synchronizer(&self) -> Synchronizer {
let mut synchronizer_builder = SynchronizerBuilder::new();

synchronizer_builder.with_disable_checkpoint_save(self.disable_sync_checkpoint_save);

synchronizer_builder.with_num_threads(self.num_threads);

if let Some(slots_checkpoint) = self.slots_checkpoint {
Expand Down
25 changes: 23 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result as AnyhowResult};
use args::Args;
use clap::Parser;
use env::Environment;
use indexer::Indexer;
use indexer::{Indexer, RunOptions};
use url::Url;
use utils::telemetry::{get_subscriber, init_subscriber};

Expand Down Expand Up @@ -59,6 +59,24 @@ pub fn print_banner(args: &Args, env: &Environment) {
println!("Slots checkpoint size: 1000");
}

println!(
"Disable sync checkpoint saving: {}",
if args.disable_sync_checkpoint_save {
"yes"
} else {
"no"
}
);

println!(
"Disable historical sync: {}",
if args.disable_sync_historical {
"yes"
} else {
"no"
}
);

println!("Blobscan API endpoint: {}", env.blobscan_api_endpoint);
println!(
"CL endpoint: {:?}",
Expand Down Expand Up @@ -99,11 +117,14 @@ async fn run() -> AnyhowResult<()> {
init_subscriber(subscriber);

let args = Args::parse();
let run_opts = Some(RunOptions {
disable_sync_historical: args.disable_sync_historical,
});

print_banner(&args, &env);

Indexer::try_new(&env, &args)?
.run(args.from_slot)
.run(args.from_slot, run_opts)
.await
.map_err(|err| anyhow!(err))
}
Expand Down
78 changes: 45 additions & 33 deletions src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ pub struct SynchronizerBuilder {
num_threads: u32,
min_slots_per_thread: u32,
slots_checkpoint: u32,
disable_checkpoint_save: bool,
}

pub struct Synchronizer {
context: Context,
num_threads: u32,
min_slots_per_thread: u32,
slots_checkpoint: u32,
disable_checkpoint_save: bool,
}

impl Default for SynchronizerBuilder {
Expand All @@ -33,6 +35,7 @@ impl Default for SynchronizerBuilder {
num_threads: 1,
min_slots_per_thread: 50,
slots_checkpoint: 1000,
disable_checkpoint_save: false,
}
}
}
Expand All @@ -42,6 +45,12 @@ impl SynchronizerBuilder {
SynchronizerBuilder::default()
}

pub fn with_disable_checkpoint_save(&mut self, disable_checkpoint_save: bool) -> &mut Self {
self.disable_checkpoint_save = disable_checkpoint_save;

self
}

pub fn with_num_threads(&mut self, num_threads: u32) -> &mut Self {
self.num_threads = num_threads;

Expand All @@ -59,6 +68,7 @@ impl SynchronizerBuilder {
num_threads: self.num_threads,
min_slots_per_thread: self.min_slots_per_thread,
slots_checkpoint: self.slots_checkpoint,
disable_checkpoint_save: self.disable_checkpoint_save,
}
}
}
Expand Down Expand Up @@ -206,41 +216,43 @@ impl Synchronizer {
let last_lower_synced_slot = if is_reverse_sync { last_slot } else { None };
let last_upper_synced_slot = if is_reverse_sync { None } else { last_slot };

if let Err(error) = self
.context
.blobscan_client()
.update_sync_state(BlockchainSyncState {
last_finalized_block: None,
last_lower_synced_slot,
last_upper_synced_slot,
})
.await
{
let new_synced_slot = match last_lower_synced_slot {
Some(slot) => slot,
None => match last_upper_synced_slot {
if !self.disable_checkpoint_save {
if let Err(error) = self
.context
.blobscan_client()
.update_sync_state(BlockchainSyncState {
last_finalized_block: None,
last_lower_synced_slot,
last_upper_synced_slot,
})
.await
{
let new_synced_slot = match last_lower_synced_slot {
Some(slot) => slot,
None => {
return Err(SynchronizerError::Other(anyhow!(
"Failed to get new last synced slot: last_lower_synced_slot and last_upper_synced_slot are both None"
)))
}
},
};

return Err(SynchronizerError::FailedSlotCheckpointSave {
slot: new_synced_slot,
error,
});
}
None => match last_upper_synced_slot {
Some(slot) => slot,
None => {
return Err(SynchronizerError::Other(anyhow!(
"Failed to get new last synced slot: last_lower_synced_slot and last_upper_synced_slot are both None"
)))
}
},
};

return Err(SynchronizerError::FailedSlotCheckpointSave {
slot: new_synced_slot,
error,
});
}

if unprocessed_slots >= self.slots_checkpoint {
debug!(
target = "synchronizer",
new_last_lower_synced_slot = last_lower_synced_slot,
new_last_upper_synced_slot = last_upper_synced_slot,
"Checkpoint reached. Last synced slot saved…"
);
if unprocessed_slots >= self.slots_checkpoint {
debug!(
target = "synchronizer",
new_last_lower_synced_slot = last_lower_synced_slot,
new_last_upper_synced_slot = last_upper_synced_slot,
"Checkpoint reached. Last synced slot saved…"
);
}
}

current_slot = if is_reverse_sync {
Expand Down
Loading