Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track filters in state for eth_getFilterChanges #448

Merged
merged 12 commits into from
Dec 17, 2024
4 changes: 2 additions & 2 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use std::time::Duration;

use alloy::primitives::{Address, Bytes, B256, U256};
use alloy::rpc::types::{Filter, Log, SyncStatus};
use alloy::rpc::types::{Filter, FilterChanges, Log, SyncStatus};
use eyre::Result;
use tracing::{info, warn};

Expand Down Expand Up @@ -131,7 +131,7 @@ impl<N: NetworkSpec, C: Consensus<N::TransactionResponse>> Client<N, C> {
self.node.get_logs(filter).await
}

pub async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
pub async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
self.node.get_filter_changes(filter_id).await
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/client/node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use alloy::primitives::{Address, Bytes, B256, U256};
use alloy::rpc::types::{Filter, Log, SyncInfo, SyncStatus};
use alloy::rpc::types::{Filter, FilterChanges, Log, SyncInfo, SyncStatus};
use eyre::{eyre, Result};

use crate::consensus::Consensus;
Expand Down Expand Up @@ -166,7 +166,7 @@ impl<N: NetworkSpec, C: Consensus<N::TransactionResponse>> Node<N, C> {
format!("helios-{}", helios_version)
}

pub async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
pub async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
self.execution.get_filter_changes(filter_id).await
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fmt::Display, net::SocketAddr, sync::Arc};
use alloy::network::{ReceiptResponse, TransactionResponse};
use alloy::primitives::{Address, Bytes, B256, U256, U64};
use alloy::rpc::json_rpc::RpcObject;
use alloy::rpc::types::{Filter, Log, SyncStatus};
use alloy::rpc::types::{Filter, FilterChanges, Log, SyncStatus};
use eyre::Result;
use jsonrpsee::{
core::{async_trait, server::Methods},
Expand Down Expand Up @@ -124,7 +124,7 @@ trait EthRpc<TX: TransactionResponse + RpcObject, TXR: RpcObject, R: ReceiptResp
#[method(name = "getLogs")]
async fn get_logs(&self, filter: Filter) -> Result<Vec<Log>, ErrorObjectOwned>;
#[method(name = "getFilterChanges")]
async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>, ErrorObjectOwned>;
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges, ErrorObjectOwned>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add the newPendingTransactionFilter and newBlockFilter methods to rpc?

Copy link
Contributor Author

@eshaan7 eshaan7 Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods are there in the RPC but their corresponding Wasm bindings are missing, for which I had already created an issue (#451).

#[method(name = "getFilterLogs")]
async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>, ErrorObjectOwned>;
#[method(name = "uninstallFilter")]
Expand Down Expand Up @@ -333,7 +333,7 @@ impl<N: NetworkSpec, C: Consensus<N::TransactionResponse>>
convert_err(self.node.get_logs(&filter).await)
}

async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>, ErrorObjectOwned> {
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges, ErrorObjectOwned> {
convert_err(self.node.get_filter_changes(filter_id).await)
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/execution/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub enum ExecutionError {
BlockNotFound(BlockTag),
#[error("receipts root mismatch for block: {0}")]
BlockReceiptsRootMismatch(BlockTag),
#[error("filter not found: 0x{0:x}")]
FilterNotFound(U256),
}

/// Errors that can occur during evm.rs calls
Expand Down
88 changes: 74 additions & 14 deletions core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet};
use alloy::network::ReceiptResponse;
use alloy::primitives::{keccak256, Address, B256, U256};
use alloy::rlp::encode;
use alloy::rpc::types::{Filter, Log};
use alloy::rpc::types::{Filter, FilterChanges, Log};
use constants::{BLOB_BASE_FEE_UPDATE_FRACTION, MIN_BASE_FEE_PER_BLOB_GAS};
use eyre::Result;
use futures::future::try_join_all;
Expand All @@ -18,7 +18,7 @@ use self::constants::MAX_SUPPORTED_LOGS_NUMBER;
use self::errors::ExecutionError;
use self::proof::{encode_account, verify_proof};
use self::rpc::ExecutionRpc;
use self::state::State;
use self::state::{FilterType, State};
use self::types::Account;

pub mod constants;
Expand Down Expand Up @@ -337,15 +337,53 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
Ok(logs)
}

pub async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
let logs = self.rpc.get_filter_changes(filter_id).await?;
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(
ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(),
);
}
self.verify_logs(&logs).await?;
Ok(logs)
pub async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
let filter_type = self.state.get_filter(&filter_id).await;

Ok(match &filter_type {
None => {
// only concerned with filters created via helios
return Err(ExecutionError::FilterNotFound(filter_id).into());
}
Some(FilterType::Logs) => {
// underlying RPC takes care of keeping track of changes
let filter_changes = self.rpc.get_filter_changes(filter_id).await?;
let logs = filter_changes.as_logs().unwrap_or(&[]);
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(ExecutionError::TooManyLogsToProve(
logs.len(),
MAX_SUPPORTED_LOGS_NUMBER,
)
.into());
}
self.verify_logs(logs).await?;
FilterChanges::Logs(logs.to_vec())
}
Some(FilterType::NewBlock(last_block_num)) => {
let blocks = self
.state
.get_blocks_after(BlockTag::Number(*last_block_num))
.await;
if !blocks.is_empty() {
// keep track of the last block number in state
// so next call can filter starting from the prev call's (last block number + 1)
self.state
.push_filter(
filter_id,
FilterType::NewBlock(blocks.last().unwrap().number.to()),
)
.await;
}
let block_hashes = blocks.into_iter().map(|b| b.hash).collect();
FilterChanges::Hashes(block_hashes)
}
Some(FilterType::PendingTransactions) => {
// underlying RPC takes care of keeping track of changes
let filter_changes = self.rpc.get_filter_changes(filter_id).await?;
let tx_hashes = filter_changes.as_hashes().unwrap_or(&[]);
FilterChanges::Hashes(tx_hashes.to_vec())
}
})
}

pub async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>> {
Expand All @@ -360,6 +398,8 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
}

pub async fn uninstall_filter(&self, filter_id: U256) -> Result<bool> {
// remove the filter from the state
self.state.remove_filter(&filter_id).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it is a log filter? Wouldn't we need to uninstall the filter in the rpc if it is a log filter?

Copy link
Contributor Author

@eshaan7 eshaan7 Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what the next line (L403) does. Regardless of filter type, we always call both state.remove_filter and rpc.uninstall_filter.

https://github.com/eshaan7/helios/blob/d01f43245c87e07222cde003cd8f57348b8dad8e/core/src/execution/mod.rs#L403

self.rpc.uninstall_filter(filter_id).await
}

Expand All @@ -378,15 +418,35 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
} else {
filter
};
self.rpc.new_filter(&filter).await
let filter_id = self.rpc.new_filter(&filter).await?;

// record the filter in the state
self.state.push_filter(filter_id, FilterType::Logs).await;

Ok(filter_id)
}

pub async fn new_block_filter(&self) -> Result<U256> {
self.rpc.new_block_filter().await
let filter_id = self.rpc.new_block_filter().await?;

// record the filter in the state
let latest_block_num = self.state.latest_block_number().await.unwrap_or(1);
self.state
.push_filter(filter_id, FilterType::NewBlock(latest_block_num))
.await;

Ok(filter_id)
}

pub async fn new_pending_transaction_filter(&self) -> Result<U256> {
self.rpc.new_pending_transaction_filter().await
let filter_id = self.rpc.new_pending_transaction_filter().await?;

// record the filter in the state
self.state
.push_filter(filter_id, FilterType::PendingTransactions)
.await;

Ok(filter_id)
}

async fn verify_logs(&self, logs: &[Log]) -> Result<()> {
Expand Down
10 changes: 6 additions & 4 deletions core/src/execution/rpc/http_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use alloy::eips::BlockNumberOrTag;
use alloy::primitives::{Address, B256, U256};
use alloy::providers::{Provider, ProviderBuilder, RootProvider};
use alloy::rpc::client::ClientBuilder;
use alloy::rpc::types::{BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, Log};
use alloy::rpc::types::{
BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, FilterChanges, Log,
};
use alloy::transports::http::Http;
use alloy::transports::layers::{RetryBackoffLayer, RetryBackoffService};
use async_trait::async_trait;
Expand Down Expand Up @@ -150,10 +152,10 @@ impl<N: NetworkSpec> ExecutionRpc<N> for HttpRpc<N> {
.map_err(|e| RpcError::new("get_logs", e))?)
}

async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
Ok(self
.provider
.get_filter_changes(filter_id)
.get_filter_changes_dyn(filter_id)
.await
.map_err(|e| RpcError::new("get_filter_changes", e))?)
}
Expand Down Expand Up @@ -193,7 +195,7 @@ impl<N: NetworkSpec> ExecutionRpc<N> for HttpRpc<N> {
async fn new_pending_transaction_filter(&self) -> Result<U256> {
Ok(self
.provider
.new_pending_transactions_filter(true)
.new_pending_transactions_filter(false)
.await
.map_err(|e| RpcError::new("new_pending_transaction_filter", e))?)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/rpc/mock_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fs::read_to_string, path::PathBuf, str::FromStr};

use alloy::primitives::{Address, B256, U256};
use alloy::rpc::types::{
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, Log,
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, FilterChanges, Log,
};
use async_trait::async_trait;
use eyre::{eyre, Result};
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<N: NetworkSpec> ExecutionRpc<N> for MockRpc {
Ok(serde_json::from_str(&logs)?)
}

async fn get_filter_changes(&self, _filter_id: U256) -> Result<Vec<Log>> {
async fn get_filter_changes(&self, _filter_id: U256) -> Result<FilterChanges> {
let logs = read_to_string(self.path.join("logs.json"))?;
Ok(serde_json::from_str(&logs)?)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy::primitives::{Address, B256, U256};
use alloy::rpc::types::{
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, Log,
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, FilterChanges, Log,
};
use async_trait::async_trait;
use eyre::Result;
Expand Down Expand Up @@ -37,7 +37,7 @@ pub trait ExecutionRpc<N: NetworkSpec>: Send + Clone + Sync + 'static {
async fn get_block_receipts(&self, block: BlockTag) -> Result<Option<Vec<N::ReceiptResponse>>>;
async fn get_transaction(&self, tx_hash: B256) -> Result<Option<N::TransactionResponse>>;
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>>;
async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>>;
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges>;
async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>>;
async fn uninstall_filter(&self, filter_id: U256) -> Result<bool>;
async fn new_filter(&self, filter: &Filter) -> Result<U256>;
Expand Down
41 changes: 41 additions & 0 deletions core/src/execution/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@
.cloned()
}

pub async fn get_blocks_after(&self, tag: BlockTag) -> Vec<Block<N::TransactionResponse>> {
let start_block = self.get_block(tag).await;
if start_block.is_none() {
return vec![];
}
let start_block_num = start_block.unwrap().number.to::<u64>();
let blocks = self
.inner
.read()
.await
.blocks
.range((start_block_num + 1)..)
.map(|(_, v)| v.clone())
.collect();
blocks
}

// transaction fetch

pub async fn get_transaction(&self, hash: B256) -> Option<N::TransactionResponse> {
Expand Down Expand Up @@ -157,6 +174,20 @@
self.get_block(tag).await.map(|block| block.miner)
}

// filter

pub async fn push_filter(&self, id: U256, filter: FilterType) {
self.inner.write().await.filters.insert(id, filter);
}

pub async fn remove_filter(&self, id: &U256) -> bool {
self.inner.write().await.filters.remove(id).is_some()
}

pub async fn get_filter(&self, id: &U256) -> Option<FilterType> {
self.inner.read().await.filters.get(id).cloned()
}

// misc

pub async fn latest_block_number(&self) -> Option<u64> {
Expand All @@ -175,6 +206,7 @@
finalized_block: Option<Block<N::TransactionResponse>>,
hashes: HashMap<B256, u64>,
txs: HashMap<B256, TransactionLocation>,
filters: HashMap<U256, FilterType>,
history_length: u64,
rpc: R,
}
Expand All @@ -187,6 +219,7 @@
finalized_block: None,
hashes: HashMap::default(),
txs: HashMap::default(),
filters: HashMap::default(),
rpc,
}
}
Expand Down Expand Up @@ -256,7 +289,7 @@
}

fn prune_before(&mut self, n: u64) {
loop {

Check failure on line 292 in core/src/execution/state.rs

View workflow job for this annotation

GitHub Actions / clippy

this loop could be written as a `while let` loop
if let Some((oldest, _)) = self.blocks.first_key_value() {
let oldest = *oldest;
if oldest < n {
Expand All @@ -277,7 +310,7 @@

if let Some(block) = self.blocks.get(&n) {
let prev = n - 1;
if self.blocks.get(&prev).is_none() {

Check failure on line 313 in core/src/execution/state.rs

View workflow job for this annotation

GitHub Actions / clippy

unnecessary use of `get(&prev).is_none()`
let backfilled = self.rpc.get_block(block.parent_hash).await?;
if backfilled.is_hash_valid() && block.parent_hash == backfilled.hash {
info!("backfilled: block={}", backfilled.number);
Expand Down Expand Up @@ -320,3 +353,11 @@
block: u64,
index: usize,
}

#[derive(Clone)]
pub enum FilterType {
Logs,
// block number when the filter was created or last queried
NewBlock(u64),
PendingTransactions,
}
Loading