Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add block subscription #355

Merged
merged 8 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/web3/src/contract/contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1831,14 +1831,14 @@ export function subscribeEventsFromContract<T extends Fields, M extends Contract
decodeFunc: (event: node.ContractEvent) => M,
fromCount?: number
): EventSubscription {
const messageCallback = (event: node.ContractEvent): Promise<void> => {
const messageCallback = (event: node.ContractEvent) => {
if (event.eventIndex !== eventIndex) {
return Promise.resolve()
}
return options.messageCallback(decodeFunc(event))
}

const errorCallback = (err: any, subscription: Subscription<node.ContractEvent>): Promise<void> => {
const errorCallback = (err: any, subscription: Subscription<node.ContractEvent>) => {
return options.errorCallback(err, subscription as unknown as Subscription<M>)
}
const opt: EventSubscribeOptions<node.ContractEvent> = {
Expand Down Expand Up @@ -2271,13 +2271,13 @@ export function subscribeContractEvents(
options: EventSubscribeOptions<ContractEvent<any>>,
fromCount?: number
): EventSubscription {
const messageCallback = (event: node.ContractEvent): Promise<void> => {
const messageCallback = (event: node.ContractEvent) => {
return options.messageCallback({
...decodeEvent(contract, instance, event, event.eventIndex),
contractAddress: instance.address
})
}
const errorCallback = (err: any, subscription: Subscription<node.ContractEvent>): Promise<void> => {
const errorCallback = (err: any, subscription: Subscription<node.ContractEvent>) => {
return options.errorCallback(err, subscription as unknown as Subscription<ContractEvent<any>>)
}
const opt: EventSubscribeOptions<node.ContractEvent> = {
Expand Down
16 changes: 2 additions & 14 deletions packages/web3/src/contract/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import { node } from '../api'
import { Subscription, SubscribeOptions } from '../utils'

export interface EventSubscribeOptions<Message> extends SubscribeOptions<Message> {
onEventCountChanged?: (eventCount: number) => Promise<void>
onEventCountChanged?: (eventCount: number) => Promise<void> | void
}

export class EventSubscription extends Subscription<node.ContractEvent> {
readonly contractAddress: string
private fromCount: number
private onEventCountChanged?: (eventCount: number) => Promise<void>
private onEventCountChanged?: (eventCount: number) => Promise<void> | void

constructor(options: EventSubscribeOptions<node.ContractEvent>, contractAddress: string, fromCount?: number) {
super(options)
Expand All @@ -38,13 +38,6 @@ export class EventSubscription extends Subscription<node.ContractEvent> {
this.startPolling()
}

override startPolling(): void {
this.eventEmitter.on('tick', async () => {
await this.polling()
})
this.eventEmitter.emit('tick')
}

currentEventCount(): number {
return this.fromCount
}
Expand All @@ -54,12 +47,7 @@ export class EventSubscription extends Subscription<node.ContractEvent> {
const events = await web3.getCurrentNodeProvider().events.getEventsContractContractaddress(this.contractAddress, {
start: this.fromCount
})
if (this.cancelled) {
return
}

if (this.fromCount === events.nextStart) {
this.task = setTimeout(() => this.eventEmitter.emit('tick'), this.pollingInterval)
return
}

Expand Down
158 changes: 158 additions & 0 deletions packages/web3/src/utils/block.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
Copyright 2018 - 2022 The Alephium Authors
This file is part of the alephium project.

The library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

The library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with the library. If not, see <http://www.gnu.org/licenses/>.
*/

import { BlockSubscribeOptions, BlockSubscriptionBase, ReorgCallback } from './block'
import * as node from '../api/api-alephium'
import { TOTAL_NUMBER_OF_GROUPS } from '../constants'
import { randomInt } from 'crypto'

describe('block subscription', function () {
let fromGroup: number
let toGroup: number
let orphanHashes: string[] = []
let newHashes: string[] = []
let options: BlockSubscribeOptions

beforeEach(() => {
fromGroup = randomInt(0, TOTAL_NUMBER_OF_GROUPS)
toGroup = randomInt(0, TOTAL_NUMBER_OF_GROUPS)
orphanHashes = []
newHashes = []
options = {
pollingInterval: 0,
messageCallback: () => {
return
},
errorCallback: () => {
return
},
reorgCallback: (orphans, newBlocks) => {
orphanHashes.push(...orphans.map((b) => b.hash))
newHashes.push(...newBlocks.map((b) => b.hash))
}
}
})

it('should handle reorg(1 orphan block)', async () => {
const chains = [
['common', 'main-0'],
['common', 'fork-0']
]
const blockSubscription = new BlockSubscriptionTest(options, fromGroup, toGroup, chains)
await blockSubscription.handleReorgForTest('fork-0', 1)
expect(orphanHashes).toEqual(['fork-0'])
expect(newHashes).toEqual(['main-0'])
})

it('should handle reorg(multiple orphan blocks)', async () => {
const chains = [
['common', 'main-0', 'main-1', 'main-2', 'main-3'],
['common', 'fork-0', 'fork-1', 'fork-2']
]
const blockSubscription = new BlockSubscriptionTest(options, fromGroup, toGroup, chains)
await blockSubscription.handleReorgForTest('fork-2', 3)
expect(orphanHashes).toEqual(['fork-0', 'fork-1', 'fork-2'])
expect(newHashes).toEqual(['main-0', 'main-1', 'main-2'])
})

function buildHashesByHeightMap(chains: string[][]) {
const hashesByHeight: Map<number, string[]> = new Map()
const sortedChains = chains.sort((a, b) => {
if (a.length === b.length) {
return b[b.length - 1] > a[a.length - 1] ? 1 : -1
}
return b.length - a.length
})
const maxHeight = sortedChains[0].length
for (let currentHeight = 0; currentHeight < maxHeight; currentHeight += 1) {
const hashes = hashesByHeight.get(currentHeight) ?? []
sortedChains
.filter((c) => c.length > currentHeight)
.forEach((c) => {
if (!hashes.includes(c[currentHeight])) {
hashes.push(c[currentHeight])
}
hashesByHeight.set(currentHeight, hashes)
})
}
return hashesByHeight
}

function buildBlockByHashMap(chains: string[][], fromGroup: number, toGroup: number) {
const blockByHash: Map<string, node.BlockEntry> = new Map()
for (const chain of chains) {
for (let index = 0; index < chain.length; index += 1) {
const hash = chain[index]
const parentHash = index === 0 ? 'undefined' : chain[index - 1]
const depsLength = TOTAL_NUMBER_OF_GROUPS * 2 - 1
const deps = Array.from(Array(depsLength).keys()).map(() => '')
const parentIndex = Math.floor(depsLength / 2) + toGroup
deps[parentIndex] = parentHash
const blockEntry: node.BlockEntry = {
hash: hash,
timestamp: 0,
chainFrom: fromGroup,
chainTo: toGroup,
height: index,
deps,
transactions: [],
nonce: '',
version: 0,
depStateHash: '',
txsHash: '',
target: ''
}
blockByHash.set(hash, blockEntry)
}
}
return blockByHash
}

class BlockSubscriptionTest extends BlockSubscriptionBase {
readonly fromGroup: number
readonly toGroup: number
readonly reorgCallback?: ReorgCallback
readonly hashesByHeight: Map<number, string[]>
readonly blockByHash: Map<string, node.BlockEntry>

override getHashesAtHeight(height: number): Promise<string[]> {
return Promise.resolve(this.hashesByHeight.get(height)!)
}

override getBlockByHash(hash: string): Promise<node.BlockEntry> {
return Promise.resolve(this.blockByHash.get(hash)!)
}

override polling(): Promise<void> {
return Promise.resolve()
}

async handleReorgForTest(fromHash: string, fromHeight: number) {
await this.handleReorg(fromHash, fromHeight)
}

constructor(options: BlockSubscribeOptions, fromGroup: number, toGroup: number, chains: string[][]) {
super(options)
this.fromGroup = fromGroup
this.toGroup = toGroup
this.reorgCallback = options.reorgCallback
this.hashesByHeight = buildHashesByHeightMap(chains)
this.blockByHash = buildBlockByHashMap(chains, fromGroup, toGroup)
}
}
})
141 changes: 141 additions & 0 deletions packages/web3/src/utils/block.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2018 - 2022 The Alephium Authors
This file is part of the alephium project.

The library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

The library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with the library. If not, see <http://www.gnu.org/licenses/>.
*/

import { Subscription, SubscribeOptions } from './subscription'
import * as node from '../api/api-alephium'
import { NodeProvider } from '../api'
import * as web3 from '../global'

export type ReorgCallback = (orphanBlocks: node.BlockEntry[], newBlocks: node.BlockEntry[]) => Promise<void> | void

export interface BlockSubscribeOptions extends SubscribeOptions<node.BlockEntry> {
reorgCallback?: ReorgCallback
}

export abstract class BlockSubscriptionBase extends Subscription<node.BlockEntry> {
abstract readonly reorgCallback?: ReorgCallback
abstract readonly fromGroup: number
abstract readonly toGroup: number

abstract getHashesAtHeight(height: number): Promise<string[]>
abstract getBlockByHash(hash: string): Promise<node.BlockEntry>

protected getParentHash(block: node.BlockEntry): string {
const index = Math.floor(block.deps.length / 2) + this.toGroup
return block.deps[index]
}

protected async handleReorg(blockHash: string, blockHeight: number) {
console.log(`reorg occur, hash: ${blockHash}, height: ${blockHeight}`)
if (this.reorgCallback === undefined) return

const orphans: string[] = []
const newHashes: string[] = []
let fromHash = blockHash
let fromHeight = blockHeight
while (true) {
const hashes = await this.getHashesAtHeight(fromHeight)
const canonicalHash = hashes[0]
if (canonicalHash !== fromHash) {
orphans.push(fromHash)
newHashes.push(canonicalHash)
const block = await this.getBlockByHash(fromHash)
fromHash = this.getParentHash(block)
fromHeight -= 1
} else {
break
}
}

const orphanBlocks: node.BlockEntry[] = []
for (const hash of orphans.reverse()) {
const block = await this.getBlockByHash(hash)
orphanBlocks.push(block)
}

const newBlocks: node.BlockEntry[] = []
for (const hash of newHashes.reverse()) {
const block = await this.getBlockByHash(hash)
newBlocks.push(block)
}
console.info(`orphan hashes: ${orphanBlocks.map((b) => b.hash)}, new hashes: ${newBlocks.map((b) => b.hash)}`)
await this.reorgCallback(orphanBlocks, newBlocks)
}
}

export class BlockSubscription extends BlockSubscriptionBase {
readonly nodeProvider: NodeProvider
readonly fromGroup: number
readonly toGroup: number
readonly reorgCallback?: ReorgCallback
private currentBlockHeight: number
private parentBlockHash: string | undefined

constructor(
options: BlockSubscribeOptions,
fromGroup: number,
toGroup: number,
fromBlockHeight: number,
nodeProvider: NodeProvider | undefined = undefined
) {
super(options)
this.nodeProvider = nodeProvider ?? web3.getCurrentNodeProvider()
this.fromGroup = fromGroup
this.toGroup = toGroup
this.reorgCallback = options.reorgCallback
this.currentBlockHeight = fromBlockHeight
this.parentBlockHash = undefined

this.startPolling()
}

override async getHashesAtHeight(height: number): Promise<string[]> {
const result = await this.nodeProvider.blockflow.getBlockflowHashes({
fromGroup: this.fromGroup,
toGroup: this.toGroup,
height
})
return result.headers
}

override async getBlockByHash(hash: string): Promise<node.BlockEntry> {
return await this.nodeProvider.blockflow.getBlockflowBlocksBlockHash(hash)
}

override async polling(): Promise<void> {
try {
const chainInfo = await this.nodeProvider.blockflow.getBlockflowChainInfo({
fromGroup: this.fromGroup,
toGroup: this.toGroup
})

while (this.currentBlockHeight <= chainInfo.currentHeight) {
const hashes = await this.getHashesAtHeight(this.currentBlockHeight)
const block = await this.getBlockByHash(hashes[0])
if (this.parentBlockHash !== undefined && this.getParentHash(block) !== this.parentBlockHash) {
await this.handleReorg(this.parentBlockHash, this.currentBlockHeight - 1)
}
await this.messageCallback(block)
this.currentBlockHeight += 1
this.parentBlockHash = hashes[0]
}
} catch (err) {
await this.errorCallback(err, this)
}
}
}
1 change: 1 addition & 0 deletions packages/web3/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ export * from './subscription'
export * from './sign'
export * from './number'
export { validateExchangeAddress, isALPHTransferTx, getSenderAddress, getALPHDepositInfo } from './exchange'
export { ReorgCallback, BlockSubscribeOptions, BlockSubscription } from './block'
4 changes: 2 additions & 2 deletions packages/web3/src/utils/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ along with the library. If not, see <http://www.gnu.org/licenses/>.

import EventEmitter from 'eventemitter3'

type MessageCallback<Message> = (message: Message) => Promise<void>
type ErrorCallback<Message> = (error: any, subscription: Subscription<Message>) => Promise<void>
type MessageCallback<Message> = (message: Message) => Promise<void> | void
type ErrorCallback<Message> = (error: any, subscription: Subscription<Message>) => Promise<void> | void

export interface SubscribeOptions<Message> {
pollingInterval: number
Expand Down
Loading
Loading