Skip to content

Commit

Permalink
fix: event-replay readiness for nakamoto & fix for #1879 (#1903)
Browse files Browse the repository at this point in the history
* chore: bump duckdb

* feat: event-replay readiness for nakamoto
  • Loading branch information
csgui authored Mar 21, 2024
1 parent 7c2b3b4 commit 1572e73
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ STACKS_NODE_TYPE=L1
# IBD_MODE_UNTIL_BLOCK=

# Folder with events to be imported by the event-replay.
STACKS_EVENTS_DIR=./eventssds
STACKS_EVENTS_DIR=./events
10 changes: 5 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
"cross-env": "7.0.3",
"dotenv": "8.6.0",
"dotenv-flow": "3.2.0",
"duckdb": "0.8.1",
"duckdb": "0.9.2",
"ecpair": "2.1.0",
"elliptic": "6.5.4",
"escape-goat": "3.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3109,7 +3109,7 @@ export class PgWriteStore extends PgStore {
block_hash: tx.block_hash,
parent_block_hash: tx.parent_block_hash,
block_height: tx.block_height,
block_time: tx.block_time,
block_time: tx.block_time ?? 0,
burn_block_time: tx.burn_block_time,
parent_burn_block_time: tx.parent_burn_block_time,
type_id: tx.type_id,
Expand Down
44 changes: 28 additions & 16 deletions src/event-replay/parquet-based/dataset/store.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import * as fs from 'fs';

import { loadDotEnv } from '../../../helpers';
import { Database, QueryResult, TableData } from 'duckdb';
import { Database, QueryResult } from 'duckdb';

loadDotEnv();

Expand Down Expand Up @@ -89,23 +91,33 @@ export class DatasetStore {

rawEvents = (): Promise<QueryResult> => {
return new Promise(resolve => {
const con = this.db.connect();
con.all(
`SELECT method, payload FROM READ_PARQUET([
'${EVENTS_DIR}/new_burn_block/canonical/*.parquet',
'${EVENTS_DIR}/attachments_new/canonical/*.parquet',
'${EVENTS_DIR}/new_microblocks/*.parquet',
'${EVENTS_DIR}/drop_mempool_tx/*.parquet',
'${EVENTS_DIR}/new_mempool_tx/*.parquet',
]) ORDER BY id`,
(err: any, result: any) => {
if (err) {
throw err;
const dirs = fs
.readdirSync(`${EVENTS_DIR}`)
.filter(
(dir: string) =>
!dir.includes('canonical') && !dir.includes('new_block') && !dir.includes('remainder')
)
.map((dir: string) => {
if (dir.includes('new_burn_block') || dir.includes('attachments_new')) {
return `${EVENTS_DIR}/${dir}/canonical/*.parquet`;
} else {
return `${EVENTS_DIR}/${dir}/*.parquet`;
}
});

resolve(result);
}
);
const con = this.db.connect();
dirs.forEach((dir: string) => {
con.all(
`SELECT method, payload FROM READ_PARQUET(${JSON.stringify(dir)}) ORDER BY id`,
(err: any, result: any) => {
if (err) {
console.warn(err);
throw err;
}
resolve(result);
}
);
});
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ const populateBatchInserters = (db: PgWriteStore) => {
batchInserters.push(dbMicroblockBatchInserter);

const dbTxBatchInserter = createBatchInserter<DbTx>({
batchSize: 1400,
batchSize: 1000,
insertFn: entries => {
logger.debug({ component: 'event-replay' }, 'Inserting into txs table...');
return db.insertTxBatch(db.sql, entries);
Expand Down
33 changes: 21 additions & 12 deletions src/event-replay/parquet-based/replay-controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as tty from 'tty';
import * as fs from 'fs';

import { PgWriteStore } from '../../datastore/pg-write-store';
import { logger } from '../../logger';
Expand All @@ -20,6 +21,8 @@ import { cycleMigrations, dangerousDropAllTables } from '@hirosystems/api-toolki
import { PgServer, getConnectionArgs } from '../../datastore/connection';
import { MIGRATIONS_DIR } from '../../datastore/pg-store';

const EVENTS_DIR = process.env.STACKS_EVENTS_DIR;

/**
* This class is an entry point for the event-replay based on parquet files,
* being responsible to start the replay process (check "do" method).
Expand Down Expand Up @@ -80,21 +83,27 @@ export class ReplayController {
*
*/
private ingestAttachmentNewEvents = async () => {
const timeTracker = createTimeTracker();

try {
await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => {
await processAttachmentNewEvents(this.db, this.dataset);
});
if (fs.existsSync(`${EVENTS_DIR}/attachments_new`)) {
const timeTracker = createTimeTracker();

try {
await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => {
await processAttachmentNewEvents(this.db, this.dataset);
});
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
}
} catch (err) {
throw err;
} finally {
if (true || tty.isatty(1)) {
console.log('Tracked function times:');
console.table(timeTracker.getDurations(3));
} else {
logger.info(`Tracked function times`, timeTracker.getDurations(3));
}
}
};

Expand Down

0 comments on commit 1572e73

Please sign in to comment.