From 251dc996e84e2c35e33714b369bcd2d86e32ce1f Mon Sep 17 00:00:00 2001 From: MiniFrenchBread <103425574+MiniFrenchBread@users.noreply.github.com> Date: Sun, 7 Apr 2024 20:50:43 +0800 Subject: [PATCH] chore: update submodules --- 0g-storage-node | 2 +- .../src/sync_manager/log_entry_fetcher.rs | 212 ++++++++++++------ node/log_entry_sync/src/sync_manager/mod.rs | 15 +- node/src/config/mod.rs | 2 +- 4 files changed, 153 insertions(+), 78 deletions(-) diff --git a/0g-storage-node b/0g-storage-node index 588bf39..b8a59e9 160000 --- a/0g-storage-node +++ b/0g-storage-node @@ -1 +1 @@ -Subproject commit 588bf39d7deb3d9ae3896c0418cc3e272ee18ab4 +Subproject commit b8a59e92225a6e2a6cdab20c38214e4690029789 diff --git a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs index 20f5519..541bea1 100644 --- a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs +++ b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs @@ -7,7 +7,7 @@ use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow}; use ethers::abi::RawLog; use ethers::prelude::{BlockNumber, EthLogDecode, Http, Middleware, Provider}; use ethers::providers::{HttpRateLimitRetryPolicy, RetryClient, RetryClientBuilder}; -use ethers::types::H256; +use ethers::types::{Block, Log, H256}; use futures::StreamExt; use jsonrpsee::tracing::{debug, error, info, warn}; use kv_types::{submission_topic_to_stream_ids, KVMetadata, KVTransaction}; @@ -337,7 +337,7 @@ impl LogEntryFetcher { #[allow(clippy::too_many_arguments)] async fn watch_loop( provider: &Provider>, - block_number: u64, + from_block_number: u64, parent_block_hash: H256, watch_tx: &UnboundedSender, confirmation_delay: u64, @@ -346,22 +346,31 @@ impl LogEntryFetcher { ) -> Result>)>> { let latest_block_number = provider.get_block_number().await?.as_u64(); debug!( - "block number {}, latest block number {}, confirmation_delay {}", - block_number, latest_block_number, confirmation_delay + "from block number {}, latest block number {}, confirmation delay {}", + from_block_number, latest_block_number, confirmation_delay ); - if block_number > latest_block_number.saturating_sub(confirmation_delay) { + let to_block_number = latest_block_number.saturating_sub(confirmation_delay); + if from_block_number > to_block_number { return Ok(None); } let block = provider - .get_block_with_txs(block_number) + .get_block_with_txs(from_block_number) .await? - .ok_or_else(|| anyhow!("None for block {}", block_number))?; - if block_number > 0 && block.parent_hash != parent_block_hash { + .ok_or_else(|| anyhow!("None for block {}", from_block_number))?; + if Some(from_block_number.into()) != block.number { + bail!( + "block number mismatch, expected {}, actual {:?}", + from_block_number, + block.number + ); + } + + if from_block_number > 0 && block.parent_hash != parent_block_hash { // reorg happened let (parent_block_number, block_hash) = revert_one_block( parent_block_hash, - block_number.saturating_sub(1), + from_block_number.saturating_sub(1), watch_tx, block_hash_cache, ) @@ -369,81 +378,140 @@ impl LogEntryFetcher { return Ok(Some((parent_block_number, block_hash, None))); } - let txs_hm = block - .transactions - .iter() - .map(|tx| (tx.transaction_index, tx)) - .collect::>(); - - let filter = contract - .submit_filter() - .from_block(block_number) - .to_block(block_number) - .address(contract.address().into()) - .filter; - let mut logs = vec![]; - let mut first_submission_index = None; - for log in provider.get_logs(&filter).await? { - if log.block_hash != block.hash { - bail!( - "log block hash mismatch, log block hash {:?}, block hash {:?}", - log.block_hash, - block.hash - ); - } - if log.block_number != block.number { + let mut blocks: HashMap> = Default::default(); + let mut parent_block_hash = block.hash; + blocks.insert(from_block_number, block); + for block_number in from_block_number + 1..to_block_number + 1 { + let block = provider + .get_block_with_txs(block_number) + .await? + .ok_or_else(|| anyhow!("None for block {}", block_number))?; + if Some(block_number.into()) != block.number { bail!( - "log block num mismatch, log block number {:?}, block number {:?}", - log.block_number, + "block number mismatch, expected {}, actual {:?}", + block_number, block.number ); } - - let tx = txs_hm[&log.transaction_index]; - if log.transaction_hash != Some(tx.hash) { - bail!( - "log tx hash mismatch, log transaction {:?}, block transaction {:?}", - log.transaction_hash, - tx.hash - ); - } - if log.transaction_index != tx.transaction_index { + if Some(block.parent_hash) != parent_block_hash { bail!( - "log tx index mismatch, log tx index {:?}, block transaction index {:?}", - log.transaction_index, - tx.transaction_index + "parent block hash mismatch, expected {:?}, actual {}", + parent_block_hash, + block.parent_hash ); } + parent_block_hash = block.hash; + blocks.insert(block_number, block); + } - let tx = SubmitFilter::decode_log(&RawLog { - topics: log.topics, - data: log.data.to_vec(), - })?; + let filter = contract + .submit_filter() + .from_block(from_block_number) + .to_block(to_block_number) + .address(contract.address().into()) + .filter; + let mut block_logs: BTreeMap> = BTreeMap::new(); + for log in provider.get_logs(&filter).await? { + let block_number = log + .block_number + .ok_or_else(|| anyhow!("block number missing"))? + .as_u64(); + block_logs.entry(block_number).or_default().push(log); + } - if first_submission_index.is_none() - || first_submission_index > Some(tx.submission_index.as_u64()) - { - first_submission_index = Some(tx.submission_index.as_u64()); - } + let mut progress = None; + for block_number in from_block_number..to_block_number + 1 { + if let Some(block) = blocks.remove(&block_number) { + let txs_hm = block + .transactions + .iter() + .map(|tx| (tx.transaction_index, tx)) + .collect::>(); + let mut log_events = vec![]; + let mut first_submission_index = None; + + if let Some(logs) = block_logs.remove(&block_number) { + for log in logs.into_iter() { + if log.block_hash != block.hash { + warn!( + "log block hash mismatch, log block hash {:?}, block hash {:?}", + log.block_hash, block.hash + ); + return Ok(progress); + } + if log.block_number != block.number { + warn!( + "log block num mismatch, log block number {:?}, block number {:?}", + log.block_number, block.number + ); + return Ok(progress); + } - logs.push(submission_event_to_transaction(tx)); - } + let tx = txs_hm[&log.transaction_index]; + if log.transaction_hash != Some(tx.hash) { + warn!( + "log tx hash mismatch, log transaction {:?}, block transaction {:?}", + log.transaction_hash, + tx.hash + ); + return Ok(progress); + } + if log.transaction_index != tx.transaction_index { + warn!( + "log tx index mismatch, log tx index {:?}, block transaction index {:?}", + log.transaction_index, + tx.transaction_index + ); + return Ok(progress); + } - let progress = if block.hash.is_some() && block.number.is_some() { - Some(( - block.number.unwrap().as_u64(), - block.hash.unwrap(), - Some(first_submission_index), - )) - } else { - None - }; - if let Some(p) = &progress { - watch_tx.send(LogFetchProgress::SyncedBlock(*p))?; - } + let submit_filter = match SubmitFilter::decode_log(&RawLog { + topics: log.topics, + data: log.data.to_vec(), + }) { + Ok(v) => v, + Err(e) => { + return { + warn!("decode log failed: {:?}", e); + Ok(progress) + } + } + }; - for log in logs.into_iter() { - watch_tx.send(log)?; + if first_submission_index.is_none() + || first_submission_index + > Some(submit_filter.submission_index.as_u64()) + { + first_submission_index = Some(submit_filter.submission_index.as_u64()); + } + + log_events.push(submission_event_to_transaction(submit_filter)); + } + } + + let new_progress = if block.hash.is_some() && block.number.is_some() { + Some(( + block.number.unwrap().as_u64(), + block.hash.unwrap(), + Some(first_submission_index), + )) + } else { + None + }; + if let Some(p) = &new_progress { + if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) { + warn!("send LogFetchProgress failed: {:?}", e); + return Ok(progress); + } + } + for log in log_events.into_iter() { + if let Err(e) = watch_tx.send(log) { + warn!("send log failed: {:?}", e); + return Ok(progress); + } + } + progress = new_progress; + } } Ok(progress) diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index a6b44c3..ee37db6 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -191,6 +191,16 @@ impl LogSyncManager { let mut submission_idx = None; let parent_block_hash = if start_block_number >= finalized_block_number { if start_block_number > 0 { + if let Some(b) = log_sync_manager + .block_hash_cache + .read() + .await + .get(&start_block_number) + { + // special case avoid reorg + submission_idx = b.first_submission_index; + } + let parent_block_number = start_block_number.saturating_sub(1); match log_sync_manager .block_hash_cache @@ -198,10 +208,7 @@ impl LogSyncManager { .await .get(&parent_block_number) { - Some(b) => { - submission_idx = b.first_submission_index; // special case avoid reorg - b.block_hash - } + Some(b) => b.block_hash, _ => log_sync_manager .log_fetcher .provider() diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index 5d16bd3..2c2377e 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -24,7 +24,7 @@ build_config! { (default_finalized_block_count, (u64), 100) (remove_finalized_block_interval_minutes, (u64), 30) - (watch_loop_wait_time_ms, (u64), 50) + (watch_loop_wait_time_ms, (u64), 500) // rpc (rpc_enabled, (bool), true)