Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track filters in state for eth_getFilterChanges #448

Merged
merged 12 commits into from
Dec 17, 2024
4 changes: 2 additions & 2 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use std::time::Duration;

use alloy::primitives::{Address, Bytes, B256, U256};
use alloy::rpc::types::{Filter, Log, SyncStatus};
use alloy::rpc::types::{Filter, FilterChanges, Log, SyncStatus};
use eyre::Result;
use tracing::{info, warn};

Expand Down Expand Up @@ -131,7 +131,7 @@ impl<N: NetworkSpec, C: Consensus<N::TransactionResponse>> Client<N, C> {
self.node.get_logs(filter).await
}

pub async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
pub async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
self.node.get_filter_changes(filter_id).await
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/client/node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use alloy::primitives::{Address, Bytes, B256, U256};
use alloy::rpc::types::{Filter, Log, SyncInfo, SyncStatus};
use alloy::rpc::types::{Filter, FilterChanges, Log, SyncInfo, SyncStatus};
use eyre::{eyre, Result};

use crate::consensus::Consensus;
Expand Down Expand Up @@ -166,7 +166,7 @@ impl<N: NetworkSpec, C: Consensus<N::TransactionResponse>> Node<N, C> {
format!("helios-{}", helios_version)
}

pub async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
pub async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
self.execution.get_filter_changes(filter_id).await
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fmt::Display, net::SocketAddr, sync::Arc};
use alloy::network::{ReceiptResponse, TransactionResponse};
use alloy::primitives::{Address, Bytes, B256, U256, U64};
use alloy::rpc::json_rpc::RpcObject;
use alloy::rpc::types::{Filter, Log, SyncStatus};
use alloy::rpc::types::{Filter, FilterChanges, Log, SyncStatus};
use eyre::Result;
use jsonrpsee::{
core::{async_trait, server::Methods},
Expand Down Expand Up @@ -122,7 +122,7 @@ trait EthRpc<TX: TransactionResponse + RpcObject, TXR: RpcObject, R: ReceiptResp
#[method(name = "getLogs")]
async fn get_logs(&self, filter: Filter) -> Result<Vec<Log>, ErrorObjectOwned>;
#[method(name = "getFilterChanges")]
async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>, ErrorObjectOwned>;
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges, ErrorObjectOwned>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add the newPendingTransactionFilter and newBlockFilter methods to rpc?

Copy link
Contributor Author

@eshaan7 eshaan7 Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods are there in the RPC but their corresponding Wasm bindings are missing, for which I had already created an issue (#451).

#[method(name = "getFilterLogs")]
async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>, ErrorObjectOwned>;
#[method(name = "uninstallFilter")]
Expand Down Expand Up @@ -327,7 +327,7 @@ impl<N: NetworkSpec, C: Consensus<N::TransactionResponse>>
convert_err(self.node.get_logs(&filter).await)
}

async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>, ErrorObjectOwned> {
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges, ErrorObjectOwned> {
convert_err(self.node.get_filter_changes(filter_id).await)
}

Expand Down
24 changes: 15 additions & 9 deletions core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet};
use alloy::network::ReceiptResponse;
use alloy::primitives::{keccak256, Address, B256, U256};
use alloy::rlp::encode;
use alloy::rpc::types::{Filter, Log};
use alloy::rpc::types::{Filter, FilterChanges, Log};
use eyre::Result;
use futures::future::try_join_all;
use revm::primitives::KECCAK_EMPTY;
Expand Down Expand Up @@ -296,15 +296,21 @@ impl<N: NetworkSpec, R: ExecutionRpc<N>> ExecutionClient<N, R> {
Ok(logs)
}

pub async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
let logs = self.rpc.get_filter_changes(filter_id).await?;
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(
ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(),
);
pub async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
let filter_changes = self.rpc.get_filter_changes(filter_id).await?;

if let FilterChanges::Logs(logs) = &filter_changes {
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(ExecutionError::TooManyLogsToProve(
logs.len(),
MAX_SUPPORTED_LOGS_NUMBER,
)
.into());
}
self.verify_logs(logs).await?;
}
self.verify_logs(&logs).await?;
Ok(logs)

Ok(filter_changes)
}

pub async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>> {
Expand Down
8 changes: 5 additions & 3 deletions core/src/execution/rpc/http_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use alloy::eips::BlockNumberOrTag;
use alloy::primitives::{Address, B256, U256};
use alloy::providers::{Provider, ProviderBuilder, RootProvider};
use alloy::rpc::client::ClientBuilder;
use alloy::rpc::types::{BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, Log};
use alloy::rpc::types::{
BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, FilterChanges, Log,
};
use alloy::transports::http::Http;
use alloy::transports::layers::{RetryBackoffLayer, RetryBackoffService};
use async_trait::async_trait;
Expand Down Expand Up @@ -150,10 +152,10 @@ impl<N: NetworkSpec> ExecutionRpc<N> for HttpRpc<N> {
.map_err(|e| RpcError::new("get_logs", e))?)
}

async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>> {
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges> {
Ok(self
.provider
.get_filter_changes(filter_id)
.get_filter_changes_dyn(filter_id)
.await
.map_err(|e| RpcError::new("get_filter_changes", e))?)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/rpc/mock_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fs::read_to_string, path::PathBuf};

use alloy::primitives::{Address, B256, U256};
use alloy::rpc::types::{
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, Log,
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, FilterChanges, Log,
};
use async_trait::async_trait;
use eyre::{eyre, Result};
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<N: NetworkSpec> ExecutionRpc<N> for MockRpc {
Ok(serde_json::from_str(&logs)?)
}

async fn get_filter_changes(&self, _filter_id: U256) -> Result<Vec<Log>> {
async fn get_filter_changes(&self, _filter_id: U256) -> Result<FilterChanges> {
let logs = read_to_string(self.path.join("logs.json"))?;
Ok(serde_json::from_str(&logs)?)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy::primitives::{Address, B256, U256};
use alloy::rpc::types::{
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, Log,
AccessList, BlockId, EIP1186AccountProofResponse, FeeHistory, Filter, FilterChanges, Log,
};
use async_trait::async_trait;
use eyre::Result;
Expand Down Expand Up @@ -37,7 +37,7 @@ pub trait ExecutionRpc<N: NetworkSpec>: Send + Clone + Sync + 'static {
async fn get_block_receipts(&self, block: BlockTag) -> Result<Option<Vec<N::ReceiptResponse>>>;
async fn get_transaction(&self, tx_hash: B256) -> Result<Option<N::TransactionResponse>>;
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>>;
async fn get_filter_changes(&self, filter_id: U256) -> Result<Vec<Log>>;
async fn get_filter_changes(&self, filter_id: U256) -> Result<FilterChanges>;
async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>>;
async fn uninstall_filter(&self, filter_id: U256) -> Result<bool>;
async fn get_new_filter(&self, filter: &Filter) -> Result<U256>;
Expand Down
2 changes: 1 addition & 1 deletion rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Helios provides a variety of RPC methods for interacting with the Ethereum netwo
| `eth_getTransactionByBlockNumberAndIndex` | `get_transaction_by_block_number_and_index` | Returns information about a transaction by block number and transaction index position. | `client.get_transaction_by_block_number_and_index(&self, block: BlockTag, index: u64)`
| `eth_getBlockReceipts` | `get_block_receipts` | Returns all transaction receipts of a block by number. | `client.get_block_receipts(&self, block: BlockTag)` |
| `eth_getLogs` | `get_logs` | Returns an array of logs matching the filter. | `client.get_logs(&self, filter: Filter)` |
| `eth_getFilterChanges` | `get_filter_changes` | Polling method for a filter, which returns an array of logs which occurred since last poll. | `client.get_filter_changes(&self, filter_id: H256)` |
| `eth_getFilterChanges` | `get_filter_changes` | Polling method for a filter, which returns an array of logs or transaction hashes or block hashes (depending on type of filter) which occurred since last poll. | `client.get_filter_changes(&self, filter_id: H256)` |
| `eth_getFilterLogs` | `get_filter_logs` | Returns an array of all logs matching filter with given id. | `client.get_filter_logs(&self, filter_id: H256)` |
| `eth_getStorageAt` | `get_storage_at` | Returns the value from a storage position at a given address. | `client.get_storage_at(&self, address: &str, slot: H256, block: BlockTag)` |
| `eth_getBlockTransactionCountByHash` | `get_block_transaction_count_by_hash` | Returns the number of transactions in a block from a block matching the transaction hash. | `client.get_block_transaction_count_by_hash(&self, hash: &str)` |
Expand Down
Loading