Skip to content

Commit

Permalink
refactor: simplify block ID usage by implementing the From trait and …
Browse files Browse the repository at this point in the history
…avoiding reference passing
  • Loading branch information
PJColombo committed Jan 30, 2025
1 parent bb13ab5 commit bff7882
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 32 deletions.
12 changes: 6 additions & 6 deletions src/clients/beacon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ pub struct Config {
#[async_trait]
#[cfg_attr(test, automock)]
pub trait CommonBeaconClient: Send + Sync + Debug {
async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>>;
async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>>;
async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>>;
async fn get_block(&self, block_id: BlockId) -> ClientResult<Option<Block>>;
async fn get_block_header(&self, block_id: BlockId) -> ClientResult<Option<BlockHeader>>;
async fn get_blobs(&self, block_id: BlockId) -> ClientResult<Option<Vec<Blob>>>;
fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource>;
}

Expand All @@ -57,7 +57,7 @@ impl BeaconClient {

#[async_trait]
impl CommonBeaconClient for BeaconClient {
async fn get_block(&self, block_id: &BlockId) -> ClientResult<Option<Block>> {
async fn get_block(&self, block_id: BlockId) -> ClientResult<Option<Block>> {
let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

Expand All @@ -67,7 +67,7 @@ impl CommonBeaconClient for BeaconClient {
})
}

async fn get_block_header(&self, block_id: &BlockId) -> ClientResult<Option<BlockHeader>> {
async fn get_block_header(&self, block_id: BlockId) -> ClientResult<Option<BlockHeader>> {
let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() });
let url = self.base_url.join(path.as_str())?;

Expand All @@ -83,7 +83,7 @@ impl CommonBeaconClient for BeaconClient {
})
}

async fn get_blobs(&self, block_id: &BlockId) -> ClientResult<Option<Vec<Blob>>> {
async fn get_blobs(&self, block_id: BlockId) -> ClientResult<Option<Vec<Blob>>> {
let path = format!("v1/beacon/blob_sidecars/{}", {
block_id.to_detailed_string()
});
Expand Down
6 changes: 6 additions & 0 deletions src/clients/beacon/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ impl From<B256> for BlockId {
}
}

impl From<u32> for BlockId {
fn from(value: u32) -> Self {
BlockId::Slot(value)
}
}

impl From<BlockHeaderResponse> for BlockHeader {
fn from(response: BlockHeaderResponse) -> Self {
BlockHeader {
Expand Down
5 changes: 2 additions & 3 deletions src/indexer/event_handlers/finalized_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use tracing::info;

use crate::{
clients::{
beacon::types::{BlockId, FinalizedCheckpointEventData},
blobscan::types::BlockchainSyncState,
beacon::types::FinalizedCheckpointEventData, blobscan::types::BlockchainSyncState,
common::ClientError,
},
context::CommonContext,
Expand Down Expand Up @@ -46,7 +45,7 @@ where
let last_finalized_block_number = match self
.context
.beacon_client()
.get_block(&BlockId::Hash(block_hash))
.get_block(block_hash.into())
.await
.map_err(|err| {
FinalizedCheckpointEventHandlerError::BlockRetrievalError(
Expand Down
14 changes: 7 additions & 7 deletions src/indexer/event_handlers/head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ where
head_block_id.clone()
};

let head_block_header = self.get_block_header(&head_block_id).await?;
let head_block_header = self.get_block_header(head_block_id).await?;

if let Some(last_block_hash) = self.last_block_hash {
if last_block_hash != head_block_header.parent_root {
let parent_block_header = self
.get_block_header(&head_block_header.parent_root.into())
.get_block_header(head_block_header.parent_root.into())
.await?;
let parent_block_slot = parent_block_header.slot;
let reorg_start_slot = parent_block_slot + 1;
Expand All @@ -91,8 +91,8 @@ where
// Re-index parent block as it may be mark as reorged and not indexed
self.synchronizer
.run(
&BlockId::Slot(parent_block_slot),
&BlockId::Slot(parent_block_slot + 1),
parent_block_slot.into(),
(parent_block_slot + 1).into(),
)
.await?;

Expand All @@ -118,7 +118,7 @@ where
}

self.synchronizer
.run(&initial_block_id, &BlockId::Slot(head_block_slot + 1))
.run(initial_block_id, (head_block_slot + 1).into())
.await?;

self.last_block_hash = Some(head_block_hash);
Expand All @@ -128,12 +128,12 @@ where

async fn get_block_header(
&self,
block_id: &BlockId,
block_id: BlockId,
) -> Result<BlockHeader, HeadEventHandlerError> {
match self
.context
.beacon_client()
.get_block_header(block_id)
.get_block_header(block_id.clone())
.await
.map_err(|err| {
HeadEventHandlerError::BlockHeaderRetrievalError(block_id.clone(), err)
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl Indexer<ReqwestTransport> {
let historical_syc_thread_span = tracing::info_span!("indexer:historical");

let result: Result<(), IndexerError> = async move {
let result = synchronizer.run(&start_block_id, &end_block_id).await;
let result = synchronizer.run(start_block_id, end_block_id).await;

if let Err(error) = result {
tx.send(IndexerTaskMessage::Error(
Expand Down
9 changes: 3 additions & 6 deletions src/slots_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use anyhow::{anyhow, Context as AnyhowContext, Result};
use tracing::{debug, info};

use crate::{
clients::{
beacon::types::BlockId,
blobscan::types::{Blob, Block, Transaction},
},
clients::blobscan::types::{Blob, Block, Transaction},
context::CommonContext,
};

Expand Down Expand Up @@ -59,7 +56,7 @@ impl SlotsProcessor<ReqwestTransport> {
let blobscan_client = self.context.blobscan_client();
let provider = self.context.provider();

let beacon_block = match beacon_client.get_block(&BlockId::Slot(slot)).await? {
let beacon_block = match beacon_client.get_block(slot.into()).await? {
Some(block) => block,
None => {
debug!(slot = slot, "Skipping as there is no beacon block");
Expand Down Expand Up @@ -113,7 +110,7 @@ impl SlotsProcessor<ReqwestTransport> {
// Fetch blobs and perform some checks

let blobs = match beacon_client
.get_blobs(&BlockId::Slot(slot))
.get_blobs(slot.into())
.await
.map_err(SlotProcessingError::ClientError)?
{
Expand Down
18 changes: 9 additions & 9 deletions src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub mod error;
pub trait CommonSynchronizer: Send + Sync {
async fn run(
&self,
initial_block_id: &BlockId,
final_block_id: &BlockId,
initial_block_id: BlockId,
final_block_id: BlockId,
) -> Result<(), SynchronizerError>;
}

Expand Down Expand Up @@ -286,12 +286,12 @@ impl Synchronizer<ReqwestTransport> {
Ok(())
}

async fn resolve_to_slot(&self, block_id: &BlockId) -> Result<u32, SynchronizerError> {
async fn resolve_to_slot(&self, block_id: BlockId) -> Result<u32, SynchronizerError> {
let beacon_client = self.context.beacon_client();

let resolved_block_id: Result<u32, ClientError> = match block_id {
BlockId::Slot(slot) => Ok(*slot),
_ => match beacon_client.get_block_header(block_id).await {
BlockId::Slot(slot) => Ok(slot),
_ => match beacon_client.get_block_header(block_id.clone()).await {
Ok(None) => {
let err = anyhow!("Block ID {} not found", block_id);

Expand All @@ -316,11 +316,11 @@ impl Synchronizer<ReqwestTransport> {
impl CommonSynchronizer for Synchronizer<ReqwestTransport> {
async fn run(
&self,
initial_block_id: &BlockId,
final_block_id: &BlockId,
initial_block_id: BlockId,
final_block_id: BlockId,
) -> Result<(), SynchronizerError> {
let initial_slot = self.resolve_to_slot(initial_block_id).await?;
let mut final_slot = self.resolve_to_slot(final_block_id).await?;
let mut final_slot = self.resolve_to_slot(final_block_id.clone()).await?;

if initial_slot == final_slot {
return Ok(());
Expand All @@ -330,7 +330,7 @@ impl CommonSynchronizer for Synchronizer<ReqwestTransport> {
self.sync_slots_by_checkpoints(initial_slot, final_slot)
.await?;

let latest_final_slot = self.resolve_to_slot(final_block_id).await?;
let latest_final_slot = self.resolve_to_slot(final_block_id.clone()).await?;

if final_slot == latest_final_slot {
return Ok(());
Expand Down

0 comments on commit bff7882

Please sign in to comment.