Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(indexer): add --to-slot arg #69

Merged
merged 5 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ pub struct Args {
#[arg(short, long)]
pub from_slot: Option<BlockId>,

/// Slot to stop indexing at
#[arg(short, long)]
pub to_slot: Option<BlockId>,

/// Number of threads used for parallel indexing
#[arg(short, long)]
pub num_threads: Option<u32>,
Expand Down
117 changes: 62 additions & 55 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ pub struct Indexer {
num_threads: u32,
slots_checkpoint: Option<u32>,
disable_sync_checkpoint_save: bool,
}

#[derive(Debug, Default)]
pub struct RunOptions {
pub disable_sync_historical: bool,
disable_sync_historical: bool,
}

impl Indexer {
Expand All @@ -60,6 +56,7 @@ impl Indexer {
.get() as u32,
};
let disable_sync_checkpoint_save = args.disable_sync_checkpoint_save;
let disable_sync_historical = args.disable_sync_historical;

let dencun_fork_slot = env
.dencun_fork_slot
Expand All @@ -71,19 +68,15 @@ impl Indexer {
slots_checkpoint,
dencun_fork_slot,
disable_sync_checkpoint_save,
disable_sync_historical,
})
}

pub async fn run(
&mut self,
custom_start_block_id: Option<BlockId>,
opts: Option<RunOptions>,
start_block_id: Option<BlockId>,
end_block_id: Option<BlockId>,
) -> IndexerResult<()> {
let opts = match opts {
Some(opts) => opts,
None => RunOptions::default(),
};

let sync_state = match self.context.blobscan_client().get_sync_state().await {
Ok(state) => state,
Err(error) => {
Expand All @@ -93,7 +86,7 @@ impl Indexer {
}
};

let current_lower_block_id = match custom_start_block_id.clone() {
let current_lower_block_id = match start_block_id.clone() {
Some(block_id) => block_id,
None => match &sync_state {
Some(state) => match state.last_lower_synced_slot {
Expand All @@ -106,7 +99,7 @@ impl Indexer {
None => BlockId::Head,
},
};
let current_upper_block_id = match custom_start_block_id {
let current_upper_block_id = match start_block_id {
Some(block_id) => block_id,
None => match &sync_state {
Some(state) => match state.last_upper_synced_slot {
Expand All @@ -129,18 +122,47 @@ impl Indexer {

let (tx, mut rx) = mpsc::channel(32);
let tx1 = tx.clone();
let mut total_tasks = 0;

if !opts.disable_sync_historical {
self._start_historical_sync_task(tx1, current_lower_block_id);
if end_block_id.is_none() {
self._start_realtime_sync_task(tx, current_upper_block_id);
total_tasks += 1;
}

self._start_realtime_sync_task(tx, current_upper_block_id);
if !self.disable_sync_historical {
let historical_start_block_id = current_lower_block_id;
let historical_end_block_id = match end_block_id {
Some(block_id) => block_id,
None => BlockId::Slot(self.dencun_fork_slot),
};

self._start_historical_sync_task(
tx1,
historical_start_block_id,
historical_end_block_id,
);

total_tasks += 1;
}

let mut completed_tasks = 0;

while let Some(message) = rx.recv().await {
if let Err(error) = message {
error!(target = "indexer", ?error, "Indexer error occurred");
match message {
IndexerTaskResult::Done(task_name) => {
info!(target = "indexer", task = task_name, "Task completed.");

return Err(error.into());
completed_tasks += 1;

if completed_tasks == total_tasks {
return Ok(());
}
}
IndexerTaskResult::Error(error) => {
error!(target = "indexer", ?error, "Indexer error occurred");

return Err(error.into());
}
}
}

Expand All @@ -151,35 +173,27 @@ impl Indexer {
&self,
tx: mpsc::Sender<IndexerTaskResult>,
start_block_id: BlockId,
) -> JoinHandle<IndexerTaskResult> {
end_block_id: BlockId,
) -> JoinHandle<IndexerResult<()>> {
let task_name = "historical_sync".to_string();
let mut synchronizer = self._create_synchronizer();
let target_lowest_slot = self.dencun_fork_slot;

tokio::spawn(async move {
if let BlockId::Slot(slot) = start_block_id {
if slot <= target_lowest_slot {
debug!(
target = "indexer:historical_sync",
"Skip sync. Dencun fork slot reached"
);

return Ok(());
}
}

let result = synchronizer
.run(&start_block_id, &BlockId::Slot(target_lowest_slot))
.await;
let result = synchronizer.run(&start_block_id, &end_block_id).await;

if let Err(error) = result {
// TODO: Find a better way to handle this error
tx.send(Err(IndexingTaskError::FailedIndexingTask {
task_name: "historical_sync".to_string(),
error: error.into(),
}))
tx.send(IndexerTaskResult::Error(
IndexingTaskError::FailedIndexingTask {
task_name,
error: error.into(),
},
))
.await
.unwrap();
};
} else {
tx.send(IndexerTaskResult::Done(task_name)).await.unwrap();
}

Ok(())
})
Expand All @@ -189,7 +203,7 @@ impl Indexer {
&self,
tx: mpsc::Sender<IndexerTaskResult>,
start_block_id: BlockId,
) -> JoinHandle<IndexerTaskResult> {
) -> JoinHandle<IndexerResult<()>> {
let task_name = "realtime_sync".to_string();
let target = format!("indexer:{task_name}");
let task_context = self.context.clone();
Expand Down Expand Up @@ -263,14 +277,6 @@ impl Indexer {
};

synchronizer.run(initial_block_id, head_block_id).await?;

blobscan_client
.update_sync_state(BlockchainSyncState {
last_finalized_block: None,
last_lower_synced_slot: None,
last_upper_synced_slot: Some(head_block_data.slot),
})
.await?;
}
"finalized_checkpoint" => {
let finalized_checkpoint_data =
Expand Down Expand Up @@ -321,13 +327,14 @@ impl Indexer {

if let Err(error) = result {
// TODO: Find a better way to handle this error
tx.send(Err(IndexingTaskError::FailedIndexingTask {
task_name,
error,
}))
tx.send(IndexerTaskResult::Error(
IndexingTaskError::FailedIndexingTask { task_name, error },
))
.await
.unwrap();
};
} else {
tx.send(IndexerTaskResult::Done(task_name)).await.unwrap();
}

Ok(())
})
Expand Down
5 changes: 4 additions & 1 deletion src/indexer/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ use super::error::{IndexerError, IndexingTaskError};

pub type IndexerResult<T> = Result<T, IndexerError>;

pub type IndexerTaskResult = Result<(), IndexingTaskError>;
pub enum IndexerTaskResult {
Done(String),
Error(IndexingTaskError),
}
91 changes: 6 additions & 85 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use anyhow::{anyhow, Result as AnyhowResult};
use args::Args;
use clap::Parser;
use env::Environment;
use indexer::{Indexer, RunOptions};
use url::Url;
use utils::telemetry::{get_subscriber, init_subscriber};
use indexer::Indexer;
use utils::{
banner::print_banner,
telemetry::{get_subscriber, init_subscriber},
};

mod args;
mod clients;
Expand All @@ -16,84 +18,6 @@ mod slots_processor;
mod synchronizer;
mod utils;

fn remove_credentials_from_url(url_string: &str) -> Option<String> {
match Url::parse(url_string) {
Ok(mut url) => {
url.set_username("******").unwrap();
url.set_password(None).unwrap();
Some(url.into())
}
Err(_) => None,
}
}

pub fn print_banner(args: &Args, env: &Environment) {
println!("____ _ _ ");
println!("| __ )| | ___ | |__ ___ ___ __ _ _ __ ");
println!("| _ \\| |/ _ \\| '_ \\/ __|/ __/ _` | '_ \\ ");
println!("| |_) | | (_) | |_) \\__ \\ (_| (_| | | | |");
println!("|____/|_|\\___/|_.__/|___/\\___\\__,_|_| |_|\n");
println!("Blobscan indexer (EIP-4844 blob indexer) - blobscan.com");
println!("=======================================================");

println!("Network: {:?}", env.network_name);
if let Some(dencun_fork_slot) = env.dencun_fork_slot {
println!("Dencun fork slot: {dencun_fork_slot}");
} else {
println!("Dencun fork slot: {}", env.network_name.dencun_fork_slot());
}

if let Some(from_slot) = args.from_slot.clone() {
println!("Custom start slot: {}", from_slot.to_detailed_string());
}

if let Some(num_threads) = args.num_threads {
println!("Number of threads: {}", num_threads);
} else {
println!("Number of threads: auto");
}

if let Some(slots_per_save) = args.slots_per_save {
println!("Slots checkpoint size: {}", slots_per_save);
} else {
println!("Slots checkpoint size: 1000");
}

println!(
"Disable sync checkpoint saving: {}",
if args.disable_sync_checkpoint_save {
"yes"
} else {
"no"
}
);

println!(
"Disable historical sync: {}",
if args.disable_sync_historical {
"yes"
} else {
"no"
}
);

println!("Blobscan API endpoint: {}", env.blobscan_api_endpoint);
println!(
"CL endpoint: {:?}",
remove_credentials_from_url(env.beacon_node_endpoint.as_str())
);
println!(
"EL endpoint: {:?}",
remove_credentials_from_url(env.execution_node_endpoint.as_str())
);

if let Some(sentry_dsn) = env.sentry_dsn.clone() {
println!("Sentry DSN: {}", sentry_dsn);
}

println!("\n");
}

async fn run() -> AnyhowResult<()> {
dotenv::dotenv().ok();
let env = match Environment::from_env() {
Expand All @@ -117,14 +41,11 @@ async fn run() -> AnyhowResult<()> {
init_subscriber(subscriber);

let args = Args::parse();
let run_opts = Some(RunOptions {
disable_sync_historical: args.disable_sync_historical,
});

print_banner(&args, &env);

Indexer::try_new(&env, &args)?
.run(args.from_slot, run_opts)
.run(args.from_slot, args.to_slot)
.await
.map_err(|err| anyhow!(err))
}
Expand Down
7 changes: 3 additions & 4 deletions src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,9 @@ impl Synchronizer {
async fn _sync_slots(&mut self, from_slot: u32, to_slot: u32) -> Result<(), SynchronizerError> {
let is_reverse_sync = to_slot < from_slot;
let unprocessed_slots = to_slot.abs_diff(from_slot) + 1;
let slots_per_thread = std::cmp::max(
self.min_slots_per_thread,
unprocessed_slots / self.num_threads,
);
let min_slots_per_thread = std::cmp::min(unprocessed_slots, self.min_slots_per_thread);
let slots_per_thread =
std::cmp::max(min_slots_per_thread, unprocessed_slots / self.num_threads);
let num_threads = std::cmp::max(1, unprocessed_slots / slots_per_thread);
let remaining_slots = unprocessed_slots % num_threads;

Expand Down
Loading
Loading