Skip to content

Commit

Permalink
Merge pull request #53 from Blobscan/support-dual-indexing
Browse files Browse the repository at this point in the history
feat: add reorgs support + indexer restructuring
  • Loading branch information
PJColombo authored Feb 21, 2024
2 parents ed88527 + 192e08d commit d72630c
Show file tree
Hide file tree
Showing 25 changed files with 1,217 additions and 613 deletions.
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{
"editor.formatOnSave": true
"editor.formatOnSave": true,
"rust-analyzer.linkedProjects": ["./Cargo.toml"],
"rust-analyzer.showUnlinkedFileNotification": false
}
45 changes: 45 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ ethers = "1.0.2"
futures = "0.3.25"
hex = "0.4.3"
reqwest = { version = "0.11.13", features = ["json"] }
reqwest-eventsource = "0.5.0"
url = { version = "2.3.1", features = ["serde"] }
serde = { version = "1.0.150", features = ["derive"] }
tokio = { version = "1.23.0", features = ["full"] }
jsonwebtoken = "8.3.0"
backoff = { version = "0.4.0", features = ["tokio"] }
chrono = "0.4.24"
serde_json = "1.0.96"
clap = { version = "4.3.0", features = ["derive"] }


# logging
Expand All @@ -29,9 +34,5 @@ tracing-log = "0.1.1"
# error handling
anyhow = { version = "1.0.70", features = ["backtrace"] }
thiserror = "1.0.40"
backoff = { version = "0.4.0", features = ["tokio"] }
chrono = "0.4.24"
serde_json = "1.0.96"
clap = { version = "4.3.0", features = ["derive"] }
sentry = { version = "0.31.2", features = ["debug-images"] }
sentry-tracing = "0.31.2"
8 changes: 5 additions & 3 deletions src/args.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use clap::Parser;

use crate::clients::beacon::types::BlockId;

/// Blobscan's indexer for the EIP-4844 upgrade.
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// Slot to start indexing from
#[arg(short, long)]
pub from_slot: Option<u32>,
pub from_slot: Option<BlockId>,

/// Number of threads used for parallel indexing
#[arg(short, long)]
pub num_threads: Option<u32>,

/// Amount of slots to be processed before saving latest slot in the database
#[arg(short, long, default_value_t = 1000)]
pub slots_per_save: u32,
#[arg(short, long)]
pub slots_per_save: Option<u32>,
}
66 changes: 50 additions & 16 deletions src/clients/beacon/mod.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,87 @@
use anyhow::Context as AnyhowContext;
use backoff::ExponentialBackoff;
use reqwest::{Client, Url};
use std::time::Duration;
use reqwest_eventsource::EventSource;

use crate::{clients::common::ClientResult, json_get};
use crate::{
clients::{beacon::types::BlockHeaderResponse, common::ClientResult},
json_get,
};

use self::types::{Blob, BlobsResponse, BlockMessage as Block, BlockResponse};
use self::types::{Blob, BlobsResponse, Block, BlockHeader, BlockId, BlockResponse, Topic};

pub mod types;

#[derive(Debug, Clone)]
pub struct BeaconClient {
base_url: Url,
client: Client,
exp_backoff: Option<ExponentialBackoff>,
}

pub struct Config {
pub base_url: String,
pub timeout: Option<Duration>,
pub exp_backoff: Option<ExponentialBackoff>,
}

impl BeaconClient {
pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
let base_url = Url::parse(&format!("{}/eth/", config.base_url))
.with_context(|| "Failed to parse base URL")?;
let exp_backoff = config.exp_backoff;

Ok(Self { base_url, client })
Ok(Self {
base_url,
client,
exp_backoff,
})
}

pub async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
let path = format!("v2/beacon/blocks/{block_id}");
let url = self.base_url.join(path.as_str())?;

json_get!(&self.client, url, BlockResponse, self.exp_backoff.clone()).map(|res| match res {
Some(r) => Some(r.data),
None => None,
})
}

pub async fn get_block(&self, slot: Option<u32>) -> ClientResult<Option<Block>> {
let slot = match slot {
Some(slot) => slot.to_string(),
None => String::from("head"),
};
let path = format!("v2/beacon/blocks/{slot}");
pub async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
let path = format!("v1/beacon/headers/{block_id}");
let url = self.base_url.join(path.as_str())?;

json_get!(&self.client, url, BlockResponse).map(|res| match res {
Some(r) => Some(r.data.message),
json_get!(
&self.client,
url,
BlockHeaderResponse,
self.exp_backoff.clone()
)
.map(|res| match res {
Some(r) => Some(r.data),
None => None,
})
}

pub async fn get_blobs(&self, slot: u32) -> ClientResult<Option<Vec<Blob>>> {
let path = format!("v1/beacon/blob_sidecars/{slot}");
pub async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
let path = format!("v1/beacon/blob_sidecars/{block_id}");
let url = self.base_url.join(path.as_str())?;

json_get!(&self.client, url, BlobsResponse).map(|res| match res {
json_get!(&self.client, url, BlobsResponse, self.exp_backoff.clone()).map(|res| match res {
Some(r) => Some(r.data),
None => None,
})
}

pub fn subscribe_to_events(&self, topics: Vec<Topic>) -> ClientResult<EventSource> {
let topics = topics
.iter()
.map(|topic| topic.into())
.collect::<Vec<String>>()
.join("&");
let path = format!("v1/events?topics={topics}");
let url = self.base_url.join(&path)?;

Ok(EventSource::get(url))
}
}
106 changes: 104 additions & 2 deletions src/clients/beacon/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
use std::{fmt, str::FromStr};

use ethers::types::{Bytes, H256};
use serde::Deserialize;
use serde::{Deserialize, Serialize};

use crate::slots_processor::BlockData;

#[derive(Serialize, Debug, Clone)]
pub enum BlockId {
Head,
Finalized,
Slot(u32),
}

#[derive(Serialize, Debug)]
pub enum Topic {
Head,
FinalizedCheckpoint,
}

#[derive(Deserialize, Debug)]
pub struct ExecutionPayload {
Expand All @@ -13,8 +30,10 @@ pub struct BlockBody {
}
#[derive(Deserialize, Debug)]
pub struct BlockMessage {
pub slot: String,
#[serde(deserialize_with = "deserialize_slot")]
pub slot: u32,
pub body: BlockBody,
pub parent_root: H256,
}

#[derive(Deserialize, Debug)]
Expand All @@ -38,3 +57,86 @@ pub struct Blob {
pub struct BlobsResponse {
pub data: Vec<Blob>,
}

#[derive(Deserialize, Debug)]
pub struct BlockHeaderResponse {
pub data: BlockHeader,
}

#[derive(Deserialize, Debug)]
pub struct BlockHeader {
pub root: H256,
pub header: InnerBlockHeader,
}
#[derive(Deserialize, Debug)]
pub struct InnerBlockHeader {
pub message: BlockHeaderMessage,
}

#[derive(Deserialize, Debug)]
pub struct BlockHeaderMessage {
pub parent_root: H256,
#[serde(deserialize_with = "deserialize_slot")]
pub slot: u32,
}

#[derive(Deserialize, Debug)]
pub struct HeadBlockEventData {
#[serde(deserialize_with = "deserialize_slot")]
pub slot: u32,
pub block: H256,
}

fn deserialize_slot<'de, D>(deserializer: D) -> Result<u32, D::Error>
where
D: serde::Deserializer<'de>,
{
let slot = String::deserialize(deserializer)?;

slot.parse::<u32>().map_err(serde::de::Error::custom)
}

impl fmt::Display for BlockId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BlockId::Head => write!(f, "head"),
BlockId::Finalized => write!(f, "finalized"),
BlockId::Slot(slot) => write!(f, "{}", slot),
}
}
}

impl FromStr for BlockId {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"head" => Ok(BlockId::Head),
"finalized" => Ok(BlockId::Finalized),
_ => match s.parse::<u32>() {
Ok(num) => Ok(BlockId::Slot(num)),
Err(_) => {
Err("Invalid block ID. Expected 'head', 'finalized' or a number.".to_string())
}
},
}
}
}

impl From<&Topic> for String {
fn from(value: &Topic) -> Self {
match value {
Topic::Head => String::from("head"),
Topic::FinalizedCheckpoint => String::from("finalized_checkpoint"),
}
}
}

impl From<HeadBlockEventData> for BlockData {
fn from(event_data: HeadBlockEventData) -> Self {
Self {
root: event_data.block,
slot: event_data.slot,
}
}
}
Loading

0 comments on commit d72630c

Please sign in to comment.