Skip to content

Commit

Permalink
Make sure to subscribe to blocks in time
Browse files Browse the repository at this point in the history
  • Loading branch information
Lbqds committed Jun 10, 2024
1 parent 882cbf6 commit 6094985
Showing 1 changed file with 31 additions and 20 deletions.
51 changes: 31 additions & 20 deletions packages/web3/src/block/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import * as web3 from '../global'
import { TOTAL_NUMBER_OF_CHAINS } from '../constants'

const DEFAULT_INTERVAL = 60 * 1000 // 60 seconds
const EXPIRE_DURATION = 20 * 1000 // 20 seconds

export type ReorgCallback = (
fromGroup: number,
Expand Down Expand Up @@ -86,6 +87,7 @@ export class BlockSubscription extends BlockSubscriptionBase {
readonly reorgCallback?: ReorgCallback
private fromTimeStamp: number
private parents: ({ hash: string; height: number } | undefined)[]
private cache: Map<string, number>

constructor(
options: BlockSubscribeOptions,
Expand All @@ -97,6 +99,7 @@ export class BlockSubscription extends BlockSubscriptionBase {
this.reorgCallback = options.reorgCallback
this.fromTimeStamp = fromTimeStamp
this.parents = new Array(TOTAL_NUMBER_OF_CHAINS).fill(undefined)
this.cache = new Map()
}

override async getHashesAtHeight(fromGroup: number, toGroup: number, height: number): Promise<string[]> {
Expand Down Expand Up @@ -124,10 +127,10 @@ export class BlockSubscription extends BlockSubscriptionBase {
return blocks.reverse()
}

private async handleBlocks(blocks: node.BlockEntry[][]) {
private async handleBlocks(blocks: node.BlockEntry[][], now: number) {
const allBlocks: node.BlockEntry[] = []
for (let index = 0; index < blocks.length; index += 1) {
const blocksPerChain = blocks[index]
const blocksPerChain = blocks[index].filter((b) => !this.cache.has(b.hash))
if (blocksPerChain.length === 0) continue

allBlocks.push(...blocksPerChain)
Expand All @@ -141,31 +144,39 @@ export class BlockSubscription extends BlockSubscriptionBase {
}

const sortedBlocks = allBlocks.sort((a, b) => a.timestamp - b.timestamp)
await this.messageCallback(sortedBlocks)
try {
await this.messageCallback(sortedBlocks)
} finally {
const threshold = now - EXPIRE_DURATION
Array.from(this.cache.entries()).forEach(([hash, ts]) => {
if (ts < threshold) this.cache.delete(hash)
})
const index = sortedBlocks.findIndex((b) => b.timestamp >= threshold)
if (index !== -1) {
sortedBlocks.slice(index).forEach((b) => this.cache.set(b.hash, b.timestamp))
}
}
}

override async polling(): Promise<void> {
try {
const now = Date.now()
if (this.fromTimeStamp >= now) return
const now = Date.now()
if (this.fromTimeStamp >= now) return

while (this.fromTimeStamp < now) {
if (this.isCancelled()) return
const toTs = Math.min(this.fromTimeStamp + DEFAULT_INTERVAL, now)
while (this.fromTimeStamp < now) {
if (this.isCancelled()) return
const toTs = Math.min(this.fromTimeStamp + DEFAULT_INTERVAL, now)
try {
const result = await this.nodeProvider.blockflow.getBlockflowBlocks({ fromTs: this.fromTimeStamp, toTs })
const isEmpty = result.blocks.every((blocksPerChain) => blocksPerChain.length === 0)
if (!isEmpty) {
this.fromTimeStamp = toTs + 1
await this.handleBlocks(result.blocks)
continue
}
// there are no blocks within the time range [fromTimeStamp, fromTimeStamp + DEFAULT_INTERVAL]
// update the `fromTimeStamp` to avoid infinite loop
if (toTs !== now) this.fromTimeStamp += DEFAULT_INTERVAL
await this.handleBlocks(result.blocks, now)
} catch (err) {
await this.errorCallback(err, this)
}

if (this.fromTimeStamp + EXPIRE_DURATION < now) {
this.fromTimeStamp = Math.min(toTs + 1, now - EXPIRE_DURATION)
} else {
return
}
} catch (err) {
await this.errorCallback(err, this)
}
}
}

0 comments on commit 6094985

Please sign in to comment.