Skip to content

Commit

Permalink
perf: run historical and realtime sync in parallel + refactor indexer…
Browse files Browse the repository at this point in the history
…'s and synchronizer's error logging
  • Loading branch information
PJColombo committed Feb 10, 2024
1 parent 5886540 commit 135abfc
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 155 deletions.
2 changes: 1 addition & 1 deletion src/clients/blobscan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl BlobscanClient {
json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
}

pub async fn get_synced_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
pub async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
let url = self.base_url.join("blockchain-sync-state")?;
json_get!(
&self.client,
Expand Down
2 changes: 1 addition & 1 deletion src/clients/blobscan/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct BlockchainSyncStateRequest {
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct BlockchainSyncStateResponse {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_lower_synced_slot: Option<u32>,
Expand All @@ -68,7 +69,6 @@ pub struct BlockchainSyncStateResponse {
}

#[derive(Debug)]

pub struct BlockchainSyncState {
pub last_lower_synced_slot: Option<u32>,
pub last_upper_synced_slot: Option<u32>,
Expand Down
122 changes: 0 additions & 122 deletions src/indexer.rs

This file was deleted.

15 changes: 15 additions & 0 deletions src/indexer/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use crate::{clients::common::ClientError, synchronizer::error::SynchronizerError};

#[derive(Debug, thiserror::Error)]
pub enum IndexerError {
#[error(transparent)]
ClientError(#[from] ClientError),
#[error(transparent)]
Other(#[from] anyhow::Error),
#[error(transparent)]
ReqwestEventSourceError(#[from] reqwest_eventsource::Error),
#[error("{0}")]
SerdeError(#[from] serde_json::Error),
#[error(transparent)]
SynchronizerError(#[from] SynchronizerError),
}
187 changes: 187 additions & 0 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use futures::StreamExt;
use reqwest_eventsource::Event;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::{debug, error};

use crate::{
args::Args,
clients::{
beacon::types::{BlockId, HeadBlockEventData, Topic},
blobscan::types::BlockchainSyncState,
},
context::{Config as ContextConfig, Context},
env::Environment,
synchronizer::{Synchronizer, SynchronizerBuilder},
};

use self::{error::IndexerError, types::IndexerResult};

pub mod error;
pub mod types;

pub struct Indexer {
context: Context,
num_threads: Option<u32>,
slots_checkpoint: Option<u32>,
}

impl Indexer {
pub fn try_new(env: &Environment, args: &Args) -> IndexerResult<Self> {
let context = match Context::try_new(ContextConfig::from(env)) {
Ok(c) => c,
Err(error) => {
error!(target = "indexer", ?error, "Failed to create context");

return Err(error.into());
}
};

Ok(Self {
context,
num_threads: args.num_threads,
slots_checkpoint: args.slots_per_save,
})
}

pub async fn run(&mut self, start_block_id: Option<BlockId>) -> IndexerResult<()> {

Check failure on line 46 in src/indexer/mod.rs

View workflow job for this annotation

GitHub Actions / lint

unused variable: `start_block_id`

error: unused variable: `start_block_id` --> src/indexer/mod.rs:46:33 | 46 | pub async fn run(&mut self, start_block_id: Option<BlockId>) -> IndexerResult<()> { | ^^^^^^^^^^^^^^ help: if this is intentional, prefix it with an underscore: `_start_block_id` | = note: `-D unused-variables` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(unused_variables)]`
let sync_state = match self.context.blobscan_client().get_sync_state().await {
Ok(state) => state,
Err(error) => {
error!(target = "indexer", ?error, "Failed to fetch sync state");

return Err(error.into());
}
};

let current_lower_block_id = match &sync_state {
Some(state) => match state.last_lower_synced_slot {
Some(slot) => BlockId::Slot(slot - 1),
None => BlockId::Head,
},
None => BlockId::Head,
};
let current_upper_block_id = match &sync_state {
Some(state) => match state.last_upper_synced_slot {
Some(slot) => BlockId::Slot(slot + 1),
None => BlockId::Head,
},
None => BlockId::Head,
};

let (tx, mut rx) = mpsc::channel(32);
let tx1 = tx.clone();

self._start_historical_sync_task(tx1, current_lower_block_id)?;
self._start_realtime_sync_task(tx, current_upper_block_id)?;

while let Some(message) = rx.recv().await {
if let Err(error) = message {
return Err(error.into());

Check failure on line 79 in src/indexer/mod.rs

View workflow job for this annotation

GitHub Actions / lint

useless conversion to the same type: `indexer::error::IndexerError`

error: useless conversion to the same type: `indexer::error::IndexerError` --> src/indexer/mod.rs:79:28 | 79 | return Err(error.into()); | ^^^^^^^^^^^^ help: consider removing `.into()`: `error` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_conversion = note: `-D clippy::useless-conversion` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::useless_conversion)]`
}
}

Ok(())
}

fn _start_historical_sync_task(
&self,
tx: mpsc::Sender<IndexerResult<()>>,
start_block_id: BlockId,
) -> IndexerResult<JoinHandle<()>> {
let mut synchronizer = self._create_synchronizer()?;

let handler = tokio::spawn(async move {
let result = synchronizer.run(&start_block_id, &BlockId::Slot(0)).await;

if let Err(error) = result {
// TODO: Find a better way to handle this error
tx.send(Err(error.into())).await.unwrap();
};
});

Ok(handler)
}

fn _start_realtime_sync_task(
&self,
tx: mpsc::Sender<IndexerResult<()>>,
start_block_id: BlockId,
) -> IndexerResult<JoinHandle<IndexerResult<()>>> {
let task_context = self.context.clone();
let mut synchronizer = self._create_synchronizer()?;

let handler = tokio::spawn(async move {
let result = async {
let blobscan_client = task_context.blobscan_client();
let mut event_source = task_context
.beacon_client()
.subscribe_to_events(vec![Topic::Head])?;
let mut current_block_id = start_block_id;

while let Some(event) = event_source.next().await {
match event {
Ok(Event::Open) => {
debug!(target = "indexer", "Listening for head block events…")
}
Ok(Event::Message(event)) => {
let head_block_data =
serde_json::from_str::<HeadBlockEventData>(&event.data)?;
let head_block_id = BlockId::Slot(head_block_data.slot);

synchronizer
.run(&current_block_id, &head_block_id)
.await
.map_err(|err| IndexerError::SynchronizerError(err.into()))?;

Check failure on line 134 in src/indexer/mod.rs

View workflow job for this annotation

GitHub Actions / lint

useless conversion to the same type: `synchronizer::error::SynchronizerError`

error: useless conversion to the same type: `synchronizer::error::SynchronizerError` --> src/indexer/mod.rs:134:80 | 134 | ... .map_err(|err| IndexerError::SynchronizerError(err.into()))?; | ^^^^^^^^^^ help: consider removing `.into()`: `err` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_conversion

blobscan_client
.update_sync_state(BlockchainSyncState {
last_lower_synced_slot: None,
last_upper_synced_slot: Some(head_block_data.slot),
})
.await?;

current_block_id = head_block_id;
}
Err(error) => {
event_source.close();

error!(
target = "indexer",
?error,
"Failed to received head block event"
);

return Err(error.into());
}
}
}

Ok(())
}
.await;

if let Err(error) = result {
// TODO: Find a better way to handle this error
tx.send(Err(error)).await.unwrap();
};

Ok(())
});

Ok(handler)
}

fn _create_synchronizer(&self) -> IndexerResult<Synchronizer> {
let mut synchronizer_builder = SynchronizerBuilder::new()?;

if let Some(num_threads) = self.num_threads {
synchronizer_builder.with_num_threads(num_threads);
}

if let Some(slots_checkpoint) = self.slots_checkpoint {
synchronizer_builder.with_slots_checkpoint(slots_checkpoint);
}

Ok(synchronizer_builder.build(self.context.clone()))
}
}
3 changes: 3 additions & 0 deletions src/indexer/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use super::error::IndexerError;

pub type IndexerResult<T> = Result<T, IndexerError>;
11 changes: 6 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Result as AnyhowResult};
use args::Args;
use clap::Parser;
use env::Environment;
Expand Down Expand Up @@ -46,7 +46,7 @@ pub fn print_banner(args: &Args, env: &Environment) {
println!("\n");
}

async fn run() -> Result<()> {
async fn run() -> AnyhowResult<()> {
dotenv::dotenv().ok();
let env = match Environment::from_env() {
Ok(env) => env,
Expand All @@ -72,9 +72,10 @@ async fn run() -> Result<()> {

print_banner(&args, &env);

let mut indexer = Indexer::try_new(&env, &args)?;

indexer.run(args.from_slot).await
Indexer::try_new(&env, &args)?
.run(args.from_slot)
.await
.map_err(|err| anyhow!(err))
}

#[tokio::main]
Expand Down
Loading

0 comments on commit 135abfc

Please sign in to comment.