diff --git a/CHANGELOG.md b/CHANGELOG.md index d04424e2b37..87bc14df64a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,8 @@ - [#4751](https://github.com/ChainSafe/forest/issues/4751) Add support for `Filecoin.GetActorEventsRaw` RPC method. +- [#4671](https://github.com/ChainSafe/forest/issues/4671) Add support for `Filecoin.EthGetFilterLogs` RPC method. + - [#5309](https://github.com/ChainSafe/forest/issues/5309) Add `forest-tool shed f3 check-activation-raw` command. - [#5242](https://github.com/ChainSafe/forest/issues/5242) Fix existing signature verification and add `delegated` signature diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index a3e0a00aa2e..fdf275a173d 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -24,7 +24,7 @@ use crate::interpreter::VMTrace; use crate::lotus_json::{lotus_json_with_self, HasLotusJson}; use crate::message::{ChainMessage, Message as _, SignedMessage}; use crate::rpc::error::ServerError; -use crate::rpc::eth::filter::SkipEvent; +use crate::rpc::eth::filter::{event::EventFilter, SkipEvent}; use crate::rpc::eth::types::{EthBlockTrace, EthTrace}; use crate::rpc::types::{ApiTipsetKey, EventEntry, MessageLookup}; use crate::rpc::EthEventHandler; @@ -2590,7 +2590,7 @@ impl RpcMethod<1> for EthSendRawTransaction { } } -#[derive(Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct CollectedEvent { pub(crate) entries: Vec, pub(crate) emitter_addr: crate::shim::address::Address, @@ -2792,6 +2792,53 @@ impl RpcMethod<1> for EthGetLogs { } } +pub enum EthGetFilterLogs {} +impl RpcMethod<1> for EthGetFilterLogs { + const NAME: &'static str = "Filecoin.EthGetFilterLogs"; + const NAME_ALIAS: Option<&'static str> = Some("eth_getFilterLogs"); + const N_REQUIRED_PARAMS: usize = 1; + const PARAM_NAMES: [&'static str; 1] = ["filterId"]; + const API_PATHS: ApiPaths = ApiPaths::V1; + const PERMISSION: Permission = Permission::Write; + type Params = (FilterID,); + type Ok = EthFilterResult; + async fn handle( + ctx: Ctx, + (filter_id,): Self::Params, + ) -> Result { + let eth_event_handler = ctx.eth_event_handler.clone(); + if let Some(store) = ð_event_handler.filter_store { + let filter = store.get(&filter_id)?; + if let Some(event_filter) = filter.as_any().downcast_ref::() { + let events = ctx + .eth_event_handler + .get_events_for_parsed_filter( + &ctx, + &event_filter.into(), + SkipEvent::OnUnresolvedAddress, + ) + .await?; + let recent_events: Vec = events + .clone() + .into_iter() + .filter(|event| !event_filter.collected.contains(event)) + .collect(); + let filter = Arc::new(EventFilter { + id: event_filter.id.clone(), + tipsets: event_filter.tipsets.clone(), + addresses: event_filter.addresses.clone(), + keys_with_codec: event_filter.keys_with_codec.clone(), + max_results: event_filter.max_results, + collected: events.clone(), + }); + store.update(filter); + return Ok(eth_filter_result_from_events(&ctx, &recent_events)?); + } + } + Err(anyhow::anyhow!("method not supported").into()) + } +} + pub enum EthTraceBlock {} impl RpcMethod<1> for EthTraceBlock { const NAME: &'static str = "Filecoin.EthTraceBlock"; diff --git a/src/rpc/methods/eth/filter/event.rs b/src/rpc/methods/eth/filter/event.rs index 34e3e25f992..dc2c768e94b 100644 --- a/src/rpc/methods/eth/filter/event.rs +++ b/src/rpc/methods/eth/filter/event.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::rpc::eth::filter::{ActorEventBlock, ParsedFilter, ParsedFilterTipsets}; -use crate::rpc::eth::{filter::Filter, FilterID}; +use crate::rpc::eth::{filter::Filter, CollectedEvent, FilterID}; use crate::rpc::Arc; use crate::shim::address::Address; use ahash::AHashMap as HashMap; @@ -13,11 +13,22 @@ use std::any::Any; #[allow(dead_code)] #[derive(Debug, PartialEq)] pub struct EventFilter { - id: FilterID, - tipsets: ParsedFilterTipsets, - addresses: Vec
, // list of actor addresses that are extpected to emit the event - keys_with_codec: HashMap>, // map of key names to a list of alternate values that may match - max_results: usize, // maximum number of results to collect + pub id: FilterID, + pub tipsets: ParsedFilterTipsets, + pub addresses: Vec
, // list of actor addresses that are extpected to emit the event + pub keys_with_codec: HashMap>, // map of key names to a list of alternate values that may match + pub max_results: usize, // maximum number of results to collect + pub collected: Vec, +} + +impl From<&EventFilter> for ParsedFilter { + fn from(event_filter: &EventFilter) -> Self { + ParsedFilter { + tipsets: event_filter.tipsets.clone(), + addresses: event_filter.addresses.clone(), + keys: event_filter.keys_with_codec.clone(), + } + } } impl Filter for EventFilter { @@ -55,6 +66,7 @@ impl EventFilterManager { addresses: pf.addresses, keys_with_codec: pf.keys, max_results: self.max_filter_results, + collected: vec![], }); self.filters.write().insert(id, filter.clone()); diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 653d1358a13..e98819c1c58 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -14,7 +14,7 @@ //! - **Event Filter**: Captures blockchain events, such as smart contract log events, emitted by specific actors. //! - **TipSet Filter**: Tracks changes in the blockchain's tipset (the latest set of blocks). //! - **Mempool Filter**: Monitors the Ethereum mempool for new pending transactions that meet certain criteria. -mod event; +pub mod event; mod mempool; mod store; mod tipset; @@ -85,7 +85,7 @@ pub trait FilterManager { /// including event filters and tipSet filters. It interacts with a filter store and manages /// configurations such as the maximum filter height range and maximum filter results. pub struct EthEventHandler { - filter_store: Option>, + pub filter_store: Option>, pub max_filter_results: usize, pub max_filter_height_range: ChainEpoch, event_filter_manager: Option>, @@ -382,15 +382,17 @@ impl EthEventHandler { .await?; } ParsedFilterTipsets::Range(range) => { + ensure!(*range.end() >= 0, "max_height requested is less than 0"); + // we can't return events for the heaviest tipset as the transactions in that tipset will be executed + // in the next non-null tipset (because of Filecoin's "deferred execution" model) + let heaviest_epoch = ctx.chain_store().heaviest_tipset().epoch(); + ensure!( + *range.end() < heaviest_epoch, + "max_height requested is greater than the heaviest tipset" + ); let max_height = if *range.end() == -1 { // heaviest tipset doesn't have events because its messages haven't been executed yet - ctx.chain_store().heaviest_tipset().epoch() - 1 - } else if *range.end() < 0 { - bail!("max_height requested is less than 0") - } else if *range.end() > ctx.chain_store().heaviest_tipset().epoch() - 1 { - // we can't return events for the heaviest tipset as the transactions in that tipset will be executed - // in the next non-null tipset (because of Filecoin's "deferred execution" model) - bail!("max_height requested is greater than the heaviest tipset"); + heaviest_epoch - 1 } else { *range.end() }; @@ -596,7 +598,7 @@ fn parse_eth_topics( Ok(keys) } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub(crate) struct ActorEventBlock { codec: u64, value: Vec, @@ -622,7 +624,7 @@ fn keys_to_keys_with_codec( keys_with_codec } -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub enum ParsedFilterTipsets { Range(RangeInclusive), Hash(EthHash), @@ -704,6 +706,21 @@ impl Matcher for ParsedFilter { } } +impl Matcher for EventFilter { + fn matches( + &self, + resolved: &crate::shim::address::Address, + _entries: &[Entry], + ) -> anyhow::Result { + let match_addr = if self.addresses.is_empty() { + true + } else { + self.addresses.iter().any(|other| *other == *resolved) + }; + Ok(match_addr) + } +} + #[cfg(test)] mod tests { use ahash::AHashMap; diff --git a/src/rpc/methods/eth/filter/store.rs b/src/rpc/methods/eth/filter/store.rs index cbea42ff79d..bd2f40966fa 100644 --- a/src/rpc/methods/eth/filter/store.rs +++ b/src/rpc/methods/eth/filter/store.rs @@ -21,6 +21,7 @@ pub trait Filter: Send + Sync + std::fmt::Debug { pub trait FilterStore: Send + Sync { fn add(&self, filter: Arc) -> Result<()>; fn get(&self, id: &FilterID) -> Result>; + fn update(&self, filter: Arc); fn remove(&self, id: &FilterID) -> Option>; } @@ -58,7 +59,13 @@ impl FilterStore for MemFilterStore { filters .get(id) .cloned() - .ok_or_else(|| anyhow!("Filter with the given ID not found")) + .ok_or_else(|| anyhow!("filter not found")) + } + + fn update(&self, filter: Arc) { + let mut filters = self.filters.write(); + + filters.insert(filter.id().clone(), filter); } fn remove(&self, id: &FilterID) -> Option> { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index af199facbc8..237cbb616b5 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -91,6 +91,7 @@ macro_rules! for_each_rpc_method { $callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber); $callback!($crate::rpc::eth::EthGetCode); $callback!($crate::rpc::eth::EthGetLogs); + $callback!($crate::rpc::eth::EthGetFilterLogs); $callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash); $callback!($crate::rpc::eth::EthGetStorageAt); $callback!($crate::rpc::eth::EthGetTransactionByHash); diff --git a/src/tool/subcommands/api_cmd/api_compare_tests.rs b/src/tool/subcommands/api_cmd/api_compare_tests.rs index e168cfece01..6b9d446e02b 100644 --- a/src/tool/subcommands/api_cmd/api_compare_tests.rs +++ b/src/tool/subcommands/api_cmd/api_compare_tests.rs @@ -1576,6 +1576,8 @@ fn eth_tests_with_tipset(store: &Arc, shared_tipset: &Tipset .unwrap(), ) .sort_policy(SortPolicy::All), + RpcTest::identity(EthGetFilterLogs::request((FilterID::new().unwrap(),)).unwrap()) + .policy_on_rejected(PolicyOnRejected::PassWithIdenticalError), RpcTest::identity(EthGetTransactionHashByCid::request((block_cid,)).unwrap()), RpcTest::identity( EthTraceBlock::request((ExtBlockNumberOrHash::from_block_number(