-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBlockTracker.ts
108 lines (87 loc) · 3.42 KB
/
BlockTracker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import { LiteClient } from "ton-lite-client";
import { liteServer_masterchainInfo } from "ton-lite-client/dist/schema"
import Queue from 'queue'
import { EventEmitter } from "events";
const getBlockKey = (blockId: liteServer_masterchainInfo) =>
`(${blockId.last.workchain},${blockId.last.shard})`;
export class BlockTracker extends EventEmitter {
private readonly shardsCursors = new Map<string, number>();
private importTransactionsQueue = new Queue({ concurrency: 1, autostart: true });
private interval: NodeJS.Timeout | undefined
private started = false
private lastMasterChainInfo: liteServer_masterchainInfo | null = null
constructor(private readonly client: LiteClient) {
super();
}
start() {
this.started = true;
this.#tick();
this.interval = setInterval(() => this.#tick(), 1_000);
}
stop() {
if (!this.started) {
return;
}
clearInterval(this.interval);
this.interval = undefined;
this.importTransactionsQueue.end();
}
getLatestMasterBlock() {
return this.lastMasterChainInfo;
}
async importBlockTransactions(masterBlock: liteServer_masterchainInfo, workchain: number, shard: bigint, seqno: number) {
const block = await this.client.lookupBlockByID({
workchain,
shard: shard.toString(),
seqno,
})
await this.importTransactionsQueue.push(
async () => {
const blockTransactions = await this.client.listBlockTransactions(block.id);
this.emit('transactions', {
masterBlock,
transactions: blockTransactions,
workchain,
})
}
);
}
async importMasterchainBlock(masterBlock: liteServer_masterchainInfo) {
const masterBlockKey = getBlockKey(masterBlock);
await this.importBlockTransactions(masterBlock, masterBlock.last.workchain, BigInt(masterBlock.last.shard), masterBlock.last.seqno);
const shardsInfo = await this.client.getAllShardsInfo(masterBlock.last);
const { shards } = shardsInfo;
for (const [workchain, shardValue] of Object.entries(shards)) {
for (const [shard, shardSeqno] of Object.entries(shardValue)) {
const shardBlockKey = `(${workchain},${shard})`;
const previousSeqno = this.shardsCursors.get(shardBlockKey);
if (previousSeqno) {
for (let seqno = previousSeqno + 1; seqno <= shardSeqno; seqno++) {
await this.importBlockTransactions(masterBlock, Number(workchain), BigInt(shard), seqno);
}
} else {
await this.importBlockTransactions(masterBlock, Number(workchain), BigInt(shard), shardSeqno);
}
this.shardsCursors.set(shardBlockKey, shardSeqno);
}
}
this.shardsCursors.set(masterBlockKey, masterBlock.last.seqno);
const currentShardKeys = Object.entries(shards).flatMap(([workchain, shardValue]) =>
Object.entries(shardValue).map(([shard]) => `(${workchain},${shard})`)
);
this.shardsCursors.forEach((_, blockKey) => {
if (!currentShardKeys.includes(blockKey) && blockKey !== masterBlockKey) {
this.shardsCursors.delete(blockKey);
}
});
}
async #tick() {
const masterBlock = await this.client.getMasterchainInfo();
this.lastMasterChainInfo = masterBlock;
const masterBlockKey = getBlockKey(masterBlock);
const previousSeqno = this.shardsCursors.get(masterBlockKey);
if (masterBlock.last.seqno !== previousSeqno) {
await this.importMasterchainBlock(masterBlock);
}
}
}