Skip to content

Commit

Permalink
Implement Filecoin.EthGetFilterLogs (#5308)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudo-shashank authored Feb 21, 2025
1 parent b0bb52a commit 32a34e9
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@

- [#4751](https://github.com/ChainSafe/forest/issues/4751) Add support for `Filecoin.GetActorEventsRaw` RPC method.

- [#4671](https://github.com/ChainSafe/forest/issues/4671) Add support for `Filecoin.EthGetFilterLogs` RPC method.

- [#5309](https://github.com/ChainSafe/forest/issues/5309) Add `forest-tool shed f3 check-activation-raw` command.

- [#5242](https://github.com/ChainSafe/forest/issues/5242) Fix existing signature verification and add `delegated` signature
Expand Down
51 changes: 49 additions & 2 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::interpreter::VMTrace;
use crate::lotus_json::{lotus_json_with_self, HasLotusJson};
use crate::message::{ChainMessage, Message as _, SignedMessage};
use crate::rpc::error::ServerError;
use crate::rpc::eth::filter::SkipEvent;
use crate::rpc::eth::filter::{event::EventFilter, SkipEvent};
use crate::rpc::eth::types::{EthBlockTrace, EthTrace};
use crate::rpc::types::{ApiTipsetKey, EventEntry, MessageLookup};
use crate::rpc::EthEventHandler;
Expand Down Expand Up @@ -2590,7 +2590,7 @@ impl RpcMethod<1> for EthSendRawTransaction {
}
}

#[derive(Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct CollectedEvent {
pub(crate) entries: Vec<EventEntry>,
pub(crate) emitter_addr: crate::shim::address::Address,
Expand Down Expand Up @@ -2792,6 +2792,53 @@ impl RpcMethod<1> for EthGetLogs {
}
}

pub enum EthGetFilterLogs {}
impl RpcMethod<1> for EthGetFilterLogs {
const NAME: &'static str = "Filecoin.EthGetFilterLogs";
const NAME_ALIAS: Option<&'static str> = Some("eth_getFilterLogs");
const N_REQUIRED_PARAMS: usize = 1;
const PARAM_NAMES: [&'static str; 1] = ["filterId"];
const API_PATHS: ApiPaths = ApiPaths::V1;
const PERMISSION: Permission = Permission::Write;
type Params = (FilterID,);
type Ok = EthFilterResult;
async fn handle(
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(filter_id,): Self::Params,
) -> Result<Self::Ok, ServerError> {
let eth_event_handler = ctx.eth_event_handler.clone();
if let Some(store) = &eth_event_handler.filter_store {
let filter = store.get(&filter_id)?;
if let Some(event_filter) = filter.as_any().downcast_ref::<EventFilter>() {
let events = ctx
.eth_event_handler
.get_events_for_parsed_filter(
&ctx,
&event_filter.into(),
SkipEvent::OnUnresolvedAddress,
)
.await?;
let recent_events: Vec<CollectedEvent> = events
.clone()
.into_iter()
.filter(|event| !event_filter.collected.contains(event))
.collect();
let filter = Arc::new(EventFilter {
id: event_filter.id.clone(),
tipsets: event_filter.tipsets.clone(),
addresses: event_filter.addresses.clone(),
keys_with_codec: event_filter.keys_with_codec.clone(),
max_results: event_filter.max_results,
collected: events.clone(),
});
store.update(filter);
return Ok(eth_filter_result_from_events(&ctx, &recent_events)?);
}
}
Err(anyhow::anyhow!("method not supported").into())
}
}

pub enum EthTraceBlock {}
impl RpcMethod<1> for EthTraceBlock {
const NAME: &'static str = "Filecoin.EthTraceBlock";
Expand Down
24 changes: 18 additions & 6 deletions src/rpc/methods/eth/filter/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::rpc::eth::filter::{ActorEventBlock, ParsedFilter, ParsedFilterTipsets};
use crate::rpc::eth::{filter::Filter, FilterID};
use crate::rpc::eth::{filter::Filter, CollectedEvent, FilterID};
use crate::rpc::Arc;
use crate::shim::address::Address;
use ahash::AHashMap as HashMap;
Expand All @@ -13,11 +13,22 @@ use std::any::Any;
#[allow(dead_code)]
#[derive(Debug, PartialEq)]
pub struct EventFilter {
id: FilterID,
tipsets: ParsedFilterTipsets,
addresses: Vec<Address>, // list of actor addresses that are extpected to emit the event
keys_with_codec: HashMap<String, Vec<ActorEventBlock>>, // map of key names to a list of alternate values that may match
max_results: usize, // maximum number of results to collect
pub id: FilterID,
pub tipsets: ParsedFilterTipsets,
pub addresses: Vec<Address>, // list of actor addresses that are extpected to emit the event
pub keys_with_codec: HashMap<String, Vec<ActorEventBlock>>, // map of key names to a list of alternate values that may match
pub max_results: usize, // maximum number of results to collect
pub collected: Vec<CollectedEvent>,
}

impl From<&EventFilter> for ParsedFilter {
fn from(event_filter: &EventFilter) -> Self {
ParsedFilter {
tipsets: event_filter.tipsets.clone(),
addresses: event_filter.addresses.clone(),
keys: event_filter.keys_with_codec.clone(),
}
}
}

impl Filter for EventFilter {
Expand Down Expand Up @@ -55,6 +66,7 @@ impl EventFilterManager {
addresses: pf.addresses,
keys_with_codec: pf.keys,
max_results: self.max_filter_results,
collected: vec![],
});

self.filters.write().insert(id, filter.clone());
Expand Down
39 changes: 28 additions & 11 deletions src/rpc/methods/eth/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//! - **Event Filter**: Captures blockchain events, such as smart contract log events, emitted by specific actors.
//! - **TipSet Filter**: Tracks changes in the blockchain's tipset (the latest set of blocks).
//! - **Mempool Filter**: Monitors the Ethereum mempool for new pending transactions that meet certain criteria.
mod event;
pub mod event;
mod mempool;
mod store;
mod tipset;
Expand Down Expand Up @@ -85,7 +85,7 @@ pub trait FilterManager {
/// including event filters and tipSet filters. It interacts with a filter store and manages
/// configurations such as the maximum filter height range and maximum filter results.
pub struct EthEventHandler {
filter_store: Option<Arc<dyn FilterStore>>,
pub filter_store: Option<Arc<dyn FilterStore>>,
pub max_filter_results: usize,
pub max_filter_height_range: ChainEpoch,
event_filter_manager: Option<Arc<EventFilterManager>>,
Expand Down Expand Up @@ -382,15 +382,17 @@ impl EthEventHandler {
.await?;
}
ParsedFilterTipsets::Range(range) => {
ensure!(*range.end() >= 0, "max_height requested is less than 0");
// we can't return events for the heaviest tipset as the transactions in that tipset will be executed
// in the next non-null tipset (because of Filecoin's "deferred execution" model)
let heaviest_epoch = ctx.chain_store().heaviest_tipset().epoch();
ensure!(
*range.end() < heaviest_epoch,
"max_height requested is greater than the heaviest tipset"
);
let max_height = if *range.end() == -1 {
// heaviest tipset doesn't have events because its messages haven't been executed yet
ctx.chain_store().heaviest_tipset().epoch() - 1
} else if *range.end() < 0 {
bail!("max_height requested is less than 0")
} else if *range.end() > ctx.chain_store().heaviest_tipset().epoch() - 1 {
// we can't return events for the heaviest tipset as the transactions in that tipset will be executed
// in the next non-null tipset (because of Filecoin's "deferred execution" model)
bail!("max_height requested is greater than the heaviest tipset");
heaviest_epoch - 1
} else {
*range.end()
};
Expand Down Expand Up @@ -596,7 +598,7 @@ fn parse_eth_topics(
Ok(keys)
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct ActorEventBlock {
codec: u64,
value: Vec<u8>,
Expand All @@ -622,7 +624,7 @@ fn keys_to_keys_with_codec(
keys_with_codec
}

#[derive(Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub enum ParsedFilterTipsets {
Range(RangeInclusive<ChainEpoch>),
Hash(EthHash),
Expand Down Expand Up @@ -704,6 +706,21 @@ impl Matcher for ParsedFilter {
}
}

impl Matcher for EventFilter {
fn matches(
&self,
resolved: &crate::shim::address::Address,
_entries: &[Entry],
) -> anyhow::Result<bool> {
let match_addr = if self.addresses.is_empty() {
true
} else {
self.addresses.iter().any(|other| *other == *resolved)
};
Ok(match_addr)
}
}

#[cfg(test)]
mod tests {
use ahash::AHashMap;
Expand Down
9 changes: 8 additions & 1 deletion src/rpc/methods/eth/filter/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub trait Filter: Send + Sync + std::fmt::Debug {
pub trait FilterStore: Send + Sync {
fn add(&self, filter: Arc<dyn Filter>) -> Result<()>;
fn get(&self, id: &FilterID) -> Result<Arc<dyn Filter>>;
fn update(&self, filter: Arc<dyn Filter>);
fn remove(&self, id: &FilterID) -> Option<Arc<dyn Filter>>;
}

Expand Down Expand Up @@ -58,7 +59,13 @@ impl FilterStore for MemFilterStore {
filters
.get(id)
.cloned()
.ok_or_else(|| anyhow!("Filter with the given ID not found"))
.ok_or_else(|| anyhow!("filter not found"))
}

fn update(&self, filter: Arc<dyn Filter>) {
let mut filters = self.filters.write();

filters.insert(filter.id().clone(), filter);
}

fn remove(&self, id: &FilterID) -> Option<Arc<dyn Filter>> {
Expand Down
1 change: 1 addition & 0 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ macro_rules! for_each_rpc_method {
$callback!($crate::rpc::eth::EthGetBlockTransactionCountByNumber);
$callback!($crate::rpc::eth::EthGetCode);
$callback!($crate::rpc::eth::EthGetLogs);
$callback!($crate::rpc::eth::EthGetFilterLogs);
$callback!($crate::rpc::eth::EthGetMessageCidByTransactionHash);
$callback!($crate::rpc::eth::EthGetStorageAt);
$callback!($crate::rpc::eth::EthGetTransactionByHash);
Expand Down
2 changes: 2 additions & 0 deletions src/tool/subcommands/api_cmd/api_compare_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,8 @@ fn eth_tests_with_tipset<DB: Blockstore>(store: &Arc<DB>, shared_tipset: &Tipset
.unwrap(),
)
.sort_policy(SortPolicy::All),
RpcTest::identity(EthGetFilterLogs::request((FilterID::new().unwrap(),)).unwrap())
.policy_on_rejected(PolicyOnRejected::PassWithIdenticalError),
RpcTest::identity(EthGetTransactionHashByCid::request((block_cid,)).unwrap()),
RpcTest::identity(
EthTraceBlock::request((ExtBlockNumberOrHash::from_block_number(
Expand Down

0 comments on commit 32a34e9

Please sign in to comment.