From 609498541c3b9da7b22db20e83e9b0684fc00937 Mon Sep 17 00:00:00 2001 From: lbqds Date: Mon, 10 Jun 2024 09:35:12 +0800 Subject: [PATCH] Make sure to subscribe to blocks in time --- packages/web3/src/block/block.ts | 51 +++++++++++++++++++------------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/packages/web3/src/block/block.ts b/packages/web3/src/block/block.ts index 5d5752a12..e09b66674 100644 --- a/packages/web3/src/block/block.ts +++ b/packages/web3/src/block/block.ts @@ -23,6 +23,7 @@ import * as web3 from '../global' import { TOTAL_NUMBER_OF_CHAINS } from '../constants' const DEFAULT_INTERVAL = 60 * 1000 // 60 seconds +const EXPIRE_DURATION = 20 * 1000 // 20 seconds export type ReorgCallback = ( fromGroup: number, @@ -86,6 +87,7 @@ export class BlockSubscription extends BlockSubscriptionBase { readonly reorgCallback?: ReorgCallback private fromTimeStamp: number private parents: ({ hash: string; height: number } | undefined)[] + private cache: Map constructor( options: BlockSubscribeOptions, @@ -97,6 +99,7 @@ export class BlockSubscription extends BlockSubscriptionBase { this.reorgCallback = options.reorgCallback this.fromTimeStamp = fromTimeStamp this.parents = new Array(TOTAL_NUMBER_OF_CHAINS).fill(undefined) + this.cache = new Map() } override async getHashesAtHeight(fromGroup: number, toGroup: number, height: number): Promise { @@ -124,10 +127,10 @@ export class BlockSubscription extends BlockSubscriptionBase { return blocks.reverse() } - private async handleBlocks(blocks: node.BlockEntry[][]) { + private async handleBlocks(blocks: node.BlockEntry[][], now: number) { const allBlocks: node.BlockEntry[] = [] for (let index = 0; index < blocks.length; index += 1) { - const blocksPerChain = blocks[index] + const blocksPerChain = blocks[index].filter((b) => !this.cache.has(b.hash)) if (blocksPerChain.length === 0) continue allBlocks.push(...blocksPerChain) @@ -141,31 +144,39 @@ export class BlockSubscription extends BlockSubscriptionBase { } const sortedBlocks = allBlocks.sort((a, b) => a.timestamp - b.timestamp) - await this.messageCallback(sortedBlocks) + try { + await this.messageCallback(sortedBlocks) + } finally { + const threshold = now - EXPIRE_DURATION + Array.from(this.cache.entries()).forEach(([hash, ts]) => { + if (ts < threshold) this.cache.delete(hash) + }) + const index = sortedBlocks.findIndex((b) => b.timestamp >= threshold) + if (index !== -1) { + sortedBlocks.slice(index).forEach((b) => this.cache.set(b.hash, b.timestamp)) + } + } } override async polling(): Promise { - try { - const now = Date.now() - if (this.fromTimeStamp >= now) return + const now = Date.now() + if (this.fromTimeStamp >= now) return - while (this.fromTimeStamp < now) { - if (this.isCancelled()) return - const toTs = Math.min(this.fromTimeStamp + DEFAULT_INTERVAL, now) + while (this.fromTimeStamp < now) { + if (this.isCancelled()) return + const toTs = Math.min(this.fromTimeStamp + DEFAULT_INTERVAL, now) + try { const result = await this.nodeProvider.blockflow.getBlockflowBlocks({ fromTs: this.fromTimeStamp, toTs }) - const isEmpty = result.blocks.every((blocksPerChain) => blocksPerChain.length === 0) - if (!isEmpty) { - this.fromTimeStamp = toTs + 1 - await this.handleBlocks(result.blocks) - continue - } - // there are no blocks within the time range [fromTimeStamp, fromTimeStamp + DEFAULT_INTERVAL] - // update the `fromTimeStamp` to avoid infinite loop - if (toTs !== now) this.fromTimeStamp += DEFAULT_INTERVAL + await this.handleBlocks(result.blocks, now) + } catch (err) { + await this.errorCallback(err, this) + } + + if (this.fromTimeStamp + EXPIRE_DURATION < now) { + this.fromTimeStamp = Math.min(toTs + 1, now - EXPIRE_DURATION) + } else { return } - } catch (err) { - await this.errorCallback(err, this) } } }