Skip to content

Commit

Permalink
Merge pull request #355 from alephium/add-block-subscription
Browse files Browse the repository at this point in the history
Add block subscription
  • Loading branch information
polarker authored May 25, 2024
2 parents bcca3fc + 16602bd commit cc74532
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 102 deletions.
158 changes: 158 additions & 0 deletions packages/web3/src/block/block.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
Copyright 2018 - 2022 The Alephium Authors
This file is part of the alephium project.
The library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
The library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with the library. If not, see <http://www.gnu.org/licenses/>.
*/

import { BlockSubscribeOptions, BlockSubscriptionBase, ReorgCallback } from './block'
import * as node from '../api/api-alephium'
import { TOTAL_NUMBER_OF_GROUPS } from '../constants'
import { randomInt } from 'crypto'

describe('block subscription', function () {
let fromGroup: number
let toGroup: number
let orphanHashes: string[] = []
let newHashes: string[] = []
let options: BlockSubscribeOptions

beforeEach(() => {
fromGroup = randomInt(0, TOTAL_NUMBER_OF_GROUPS)
toGroup = randomInt(0, TOTAL_NUMBER_OF_GROUPS)
orphanHashes = []
newHashes = []
options = {
pollingInterval: 0,
messageCallback: () => {
return
},
errorCallback: () => {
return
},
reorgCallback: (orphans, newBlocks) => {
orphanHashes.push(...orphans.map((b) => b.hash))
newHashes.push(...newBlocks.map((b) => b.hash))
}
}
})

it('should handle reorg(1 orphan block)', async () => {
const chains = [
['common', 'main-0'],
['common', 'fork-0']
]
const blockSubscription = new BlockSubscriptionTest(options, fromGroup, toGroup, chains)
await blockSubscription.handleReorgForTest('fork-0', 1)
expect(orphanHashes).toEqual(['fork-0'])
expect(newHashes).toEqual(['main-0'])
})

it('should handle reorg(multiple orphan blocks)', async () => {
const chains = [
['common', 'main-0', 'main-1', 'main-2', 'main-3'],
['common', 'fork-0', 'fork-1', 'fork-2']
]
const blockSubscription = new BlockSubscriptionTest(options, fromGroup, toGroup, chains)
await blockSubscription.handleReorgForTest('fork-2', 3)
expect(orphanHashes).toEqual(['fork-0', 'fork-1', 'fork-2'])
expect(newHashes).toEqual(['main-0', 'main-1', 'main-2'])
})

function buildHashesByHeightMap(chains: string[][]) {
const hashesByHeight: Map<number, string[]> = new Map()
const sortedChains = chains.sort((a, b) => {
if (a.length === b.length) {
return b[b.length - 1] > a[a.length - 1] ? 1 : -1
}
return b.length - a.length
})
const maxHeight = sortedChains[0].length
for (let currentHeight = 0; currentHeight < maxHeight; currentHeight += 1) {
const hashes = hashesByHeight.get(currentHeight) ?? []
sortedChains
.filter((c) => c.length > currentHeight)
.forEach((c) => {
if (!hashes.includes(c[currentHeight])) {
hashes.push(c[currentHeight])
}
})
hashesByHeight.set(currentHeight, hashes)
}
return hashesByHeight
}

function buildBlockByHashMap(chains: string[][], fromGroup: number, toGroup: number) {
const blockByHash: Map<string, node.BlockEntry> = new Map()
for (const chain of chains) {
for (let index = 0; index < chain.length; index += 1) {
const hash = chain[index]
const parentHash = index === 0 ? 'undefined' : chain[index - 1]
const depsLength = TOTAL_NUMBER_OF_GROUPS * 2 - 1
const deps = Array.from(Array(depsLength).keys()).map(() => '')
const parentIndex = Math.floor(depsLength / 2) + toGroup
deps[parentIndex] = parentHash
const blockEntry: node.BlockEntry = {
hash: hash,
timestamp: 0,
chainFrom: fromGroup,
chainTo: toGroup,
height: index,
deps,
transactions: [],
nonce: '',
version: 0,
depStateHash: '',
txsHash: '',
target: ''
}
blockByHash.set(hash, blockEntry)
}
}
return blockByHash
}

class BlockSubscriptionTest extends BlockSubscriptionBase {
readonly fromGroup: number
readonly toGroup: number
readonly reorgCallback?: ReorgCallback
readonly hashesByHeight: Map<number, string[]>
readonly blockByHash: Map<string, node.BlockEntry>

override getHashesAtHeight(height: number): Promise<string[]> {
return Promise.resolve(this.hashesByHeight.get(height)!)
}

override getBlockByHash(hash: string): Promise<node.BlockEntry> {
return Promise.resolve(this.blockByHash.get(hash)!)
}

override polling(): Promise<void> {
return Promise.resolve()
}

async handleReorgForTest(fromHash: string, fromHeight: number) {
await this.handleReorg(fromHash, fromHeight)
}

constructor(options: BlockSubscribeOptions, fromGroup: number, toGroup: number, chains: string[][]) {
super(options)
this.fromGroup = fromGroup
this.toGroup = toGroup
this.reorgCallback = options.reorgCallback
this.hashesByHeight = buildHashesByHeightMap(chains)
this.blockByHash = buildBlockByHashMap(chains, fromGroup, toGroup)
}
}
})
139 changes: 139 additions & 0 deletions packages/web3/src/block/block.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
Copyright 2018 - 2022 The Alephium Authors
This file is part of the alephium project.
The library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
The library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with the library. If not, see <http://www.gnu.org/licenses/>.
*/

import { Subscription, SubscribeOptions } from '../utils/subscription'
import * as node from '../api/api-alephium'
import { NodeProvider } from '../api'
import * as web3 from '../global'

export type ReorgCallback = (orphanBlocks: node.BlockEntry[], newBlocks: node.BlockEntry[]) => Promise<void> | void

export interface BlockSubscribeOptions extends SubscribeOptions<node.BlockEntry> {
reorgCallback?: ReorgCallback
}

export abstract class BlockSubscriptionBase extends Subscription<node.BlockEntry> {
abstract readonly reorgCallback?: ReorgCallback
abstract readonly fromGroup: number
abstract readonly toGroup: number

abstract getHashesAtHeight(height: number): Promise<string[]>
abstract getBlockByHash(hash: string): Promise<node.BlockEntry>

protected getParentHash(block: node.BlockEntry): string {
const index = Math.floor(block.deps.length / 2) + this.toGroup
return block.deps[index]
}

protected async handleReorg(blockHash: string, blockHeight: number) {
console.info(`reorg occur, hash: ${blockHash}, height: ${blockHeight}`)
if (this.reorgCallback === undefined) return

const orphans: string[] = []
const newHashes: string[] = []
let fromHash = blockHash
let fromHeight = blockHeight
while (true) {
const hashes = await this.getHashesAtHeight(fromHeight)
const canonicalHash = hashes[0]
if (canonicalHash !== fromHash) {
orphans.push(fromHash)
newHashes.push(canonicalHash)
const block = await this.getBlockByHash(fromHash)
fromHash = this.getParentHash(block)
fromHeight -= 1
} else {
break
}
}

const orphanBlocks: node.BlockEntry[] = []
for (const hash of orphans.reverse()) {
const block = await this.getBlockByHash(hash)
orphanBlocks.push(block)
}

const newBlocks: node.BlockEntry[] = []
for (const hash of newHashes.reverse()) {
const block = await this.getBlockByHash(hash)
newBlocks.push(block)
}
console.info(`orphan hashes: ${orphanBlocks.map((b) => b.hash)}, new hashes: ${newBlocks.map((b) => b.hash)}`)
await this.reorgCallback(orphanBlocks, newBlocks)
}
}

export class BlockSubscription extends BlockSubscriptionBase {
readonly nodeProvider: NodeProvider
readonly fromGroup: number
readonly toGroup: number
readonly reorgCallback?: ReorgCallback
private currentBlockHeight: number
private parentBlockHash: string | undefined

constructor(
options: BlockSubscribeOptions,
fromGroup: number,
toGroup: number,
fromBlockHeight: number,
nodeProvider: NodeProvider | undefined = undefined
) {
super(options)
this.nodeProvider = nodeProvider ?? web3.getCurrentNodeProvider()
this.fromGroup = fromGroup
this.toGroup = toGroup
this.reorgCallback = options.reorgCallback
this.currentBlockHeight = fromBlockHeight
this.parentBlockHash = undefined
}

override async getHashesAtHeight(height: number): Promise<string[]> {
const result = await this.nodeProvider.blockflow.getBlockflowHashes({
fromGroup: this.fromGroup,
toGroup: this.toGroup,
height
})
return result.headers
}

override async getBlockByHash(hash: string): Promise<node.BlockEntry> {
return await this.nodeProvider.blockflow.getBlockflowBlocksBlockHash(hash)
}

override async polling(): Promise<void> {
try {
const chainInfo = await this.nodeProvider.blockflow.getBlockflowChainInfo({
fromGroup: this.fromGroup,
toGroup: this.toGroup
})

while (this.currentBlockHeight <= chainInfo.currentHeight) {
const hashes = await this.getHashesAtHeight(this.currentBlockHeight)
const block = await this.getBlockByHash(hashes[0])
if (this.parentBlockHash !== undefined && this.getParentHash(block) !== this.parentBlockHash) {
await this.handleReorg(this.parentBlockHash, this.currentBlockHeight - 1)
}
await this.messageCallback(block)
this.currentBlockHeight += 1
this.parentBlockHash = hashes[0]
}
} catch (err) {
await this.errorCallback(err, this)
}
}
}
19 changes: 19 additions & 0 deletions packages/web3/src/block/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
Copyright 2018 - 2022 The Alephium Authors
This file is part of the alephium project.
The library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
The library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with the library. If not, see <http://www.gnu.org/licenses/>.
*/

export { ReorgCallback, BlockSubscribeOptions, BlockSubscription } from './block'
12 changes: 9 additions & 3 deletions packages/web3/src/codec/script-codec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ describe('Encode & decode scripts', function () {
testScript(DestroyAdd.script, { add: contractId, caller: testAddress }, [
{
isPublic: true,
assetModifier: 3,
usePreapprovedAssets: true,
useContractAssets: false,
usePayToContractOnly: false,
argsLength: 0,
localsLength: 0,
returnLength: 0,
Expand All @@ -186,7 +188,9 @@ describe('Encode & decode scripts', function () {
testScript(GreeterMain.script, { greeterContractId: contractId }, [
{
isPublic: true,
assetModifier: 3,
usePreapprovedAssets: true,
useContractAssets: false,
usePayToContractOnly: false,
argsLength: 0,
localsLength: 2,
returnLength: 0,
Expand Down Expand Up @@ -238,7 +242,9 @@ describe('Encode & decode scripts', function () {
expect(decodedTxScript.methods.length).toEqual(methods.length)
decodedTxScript.methods.map((decodedMethod, index) => {
expect(decodedMethod.isPublic).toEqual(methods[index].isPublic)
expect(decodedMethod.assetModifier).toEqual(methods[index].assetModifier)
expect(decodedMethod.usePreapprovedAssets).toEqual(methods[index].usePreapprovedAssets)
expect(decodedMethod.useContractAssets).toEqual(methods[index].useContractAssets)
expect(decodedMethod.usePayToContractOnly).toEqual(methods[index].usePayToContractOnly)
expect(decodedMethod.argsLength).toEqual(methods[index].argsLength)
expect(decodedMethod.localsLength).toEqual(methods[index].localsLength)
expect(decodedMethod.returnLength).toEqual(methods[index].returnLength)
Expand Down
8 changes: 4 additions & 4 deletions packages/web3/src/contract/contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1889,14 +1889,14 @@ export function subscribeEventsFromContract<T extends Fields, M extends Contract
decodeFunc: (event: node.ContractEvent) => M,
fromCount?: number
): EventSubscription {
const messageCallback = (event: node.ContractEvent): Promise<void> => {
const messageCallback = (event: node.ContractEvent) => {
if (event.eventIndex !== eventIndex) {
return Promise.resolve()
}
return options.messageCallback(decodeFunc(event))
}

const errorCallback = (err: any, subscription: Subscription<node.ContractEvent>): Promise<void> => {
const errorCallback = (err: any, subscription: Subscription<node.ContractEvent>) => {
return options.errorCallback(err, subscription as unknown as Subscription<M>)
}
const opt: EventSubscribeOptions<node.ContractEvent> = {
Expand Down Expand Up @@ -2339,13 +2339,13 @@ export function subscribeContractEvents(
options: EventSubscribeOptions<ContractEvent<any>>,
fromCount?: number
): EventSubscription {
const messageCallback = (event: node.ContractEvent): Promise<void> => {
const messageCallback = (event: node.ContractEvent) => {
return options.messageCallback({
...decodeEvent(contract, instance, event, event.eventIndex),
contractAddress: instance.address
})
}
const errorCallback = (err: any, subscription: Subscription<node.ContractEvent>): Promise<void> => {
const errorCallback = (err: any, subscription: Subscription<node.ContractEvent>) => {
return options.errorCallback(err, subscription as unknown as Subscription<ContractEvent<any>>)
}
const opt: EventSubscribeOptions<node.ContractEvent> = {
Expand Down
Loading

0 comments on commit cc74532

Please sign in to comment.