diff --git a/.env b/.env index 5a5fa4bd3e..06ea4bd627 100644 --- a/.env +++ b/.env @@ -178,4 +178,4 @@ STACKS_NODE_TYPE=L1 # IBD_MODE_UNTIL_BLOCK= # Folder with events to be imported by the event-replay. -STACKS_EVENTS_DIR=./eventssds +STACKS_EVENTS_DIR=./events diff --git a/package-lock.json b/package-lock.json index ad08823bd9..538249f37d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -34,7 +34,7 @@ "cross-env": "7.0.3", "dotenv": "8.6.0", "dotenv-flow": "3.2.0", - "duckdb": "0.8.1", + "duckdb": "0.9.2", "ecpair": "2.1.0", "elliptic": "6.5.4", "escape-goat": "3.0.0", @@ -5830,13 +5830,13 @@ } }, "node_modules/duckdb": { - "version": "0.8.1", - "resolved": "https://registry.npmjs.org/duckdb/-/duckdb-0.8.1.tgz", - "integrity": "sha512-a2SJDuvBVKy5muYFxXTANlqdNX1daF3NHzpqRdrk0Qx5n3Sh7BxL66O+WY9epaDFukiXEpz45sds5T1LaPaHog==", + "version": "0.9.2", + "resolved": "https://registry.npmjs.org/duckdb/-/duckdb-0.9.2.tgz", + "integrity": "sha512-POZ37Vf5cHVmk4W8z0PsdGi67VeQsr9HRrBbmivDt9AQ8C1XLESVE79facydbroNiqm7+n7fx6jqjyxyrBFYlQ==", "hasInstallScript": true, "dependencies": { "@mapbox/node-pre-gyp": "^1.0.0", - "node-addon-api": "*", + "node-addon-api": "^7.0.0", "node-gyp": "^9.3.0" } }, diff --git a/package.json b/package.json index b19fd946a5..b4fa0eb110 100644 --- a/package.json +++ b/package.json @@ -109,7 +109,7 @@ "cross-env": "7.0.3", "dotenv": "8.6.0", "dotenv-flow": "3.2.0", - "duckdb": "0.8.1", + "duckdb": "0.9.2", "ecpair": "2.1.0", "elliptic": "6.5.4", "escape-goat": "3.0.0", diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index ce543761b5..a590a4b495 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -3109,7 +3109,7 @@ export class PgWriteStore extends PgStore { block_hash: tx.block_hash, parent_block_hash: tx.parent_block_hash, block_height: tx.block_height, - block_time: tx.block_time, + block_time: tx.block_time ?? 0, burn_block_time: tx.burn_block_time, parent_burn_block_time: tx.parent_burn_block_time, type_id: tx.type_id, diff --git a/src/event-replay/parquet-based/dataset/store.ts b/src/event-replay/parquet-based/dataset/store.ts index 1b1c29723e..caf134a079 100644 --- a/src/event-replay/parquet-based/dataset/store.ts +++ b/src/event-replay/parquet-based/dataset/store.ts @@ -1,5 +1,7 @@ +import * as fs from 'fs'; + import { loadDotEnv } from '../../../helpers'; -import { Database, QueryResult, TableData } from 'duckdb'; +import { Database, QueryResult } from 'duckdb'; loadDotEnv(); @@ -89,23 +91,33 @@ export class DatasetStore { rawEvents = (): Promise => { return new Promise(resolve => { - const con = this.db.connect(); - con.all( - `SELECT method, payload FROM READ_PARQUET([ - '${EVENTS_DIR}/new_burn_block/canonical/*.parquet', - '${EVENTS_DIR}/attachments_new/canonical/*.parquet', - '${EVENTS_DIR}/new_microblocks/*.parquet', - '${EVENTS_DIR}/drop_mempool_tx/*.parquet', - '${EVENTS_DIR}/new_mempool_tx/*.parquet', - ]) ORDER BY id`, - (err: any, result: any) => { - if (err) { - throw err; + const dirs = fs + .readdirSync(`${EVENTS_DIR}`) + .filter( + (dir: string) => + !dir.includes('canonical') && !dir.includes('new_block') && !dir.includes('remainder') + ) + .map((dir: string) => { + if (dir.includes('new_burn_block') || dir.includes('attachments_new')) { + return `${EVENTS_DIR}/${dir}/canonical/*.parquet`; + } else { + return `${EVENTS_DIR}/${dir}/*.parquet`; } + }); - resolve(result); - } - ); + const con = this.db.connect(); + dirs.forEach((dir: string) => { + con.all( + `SELECT method, payload FROM READ_PARQUET(${JSON.stringify(dir)}) ORDER BY id`, + (err: any, result: any) => { + if (err) { + console.warn(err); + throw err; + } + resolve(result); + } + ); + }); }); }; diff --git a/src/event-replay/parquet-based/importers/new-block-importer.ts b/src/event-replay/parquet-based/importers/new-block-importer.ts index a0b3ba4f82..d34f0c8458 100644 --- a/src/event-replay/parquet-based/importers/new-block-importer.ts +++ b/src/event-replay/parquet-based/importers/new-block-importer.ts @@ -84,7 +84,7 @@ const populateBatchInserters = (db: PgWriteStore) => { batchInserters.push(dbMicroblockBatchInserter); const dbTxBatchInserter = createBatchInserter({ - batchSize: 1400, + batchSize: 1000, insertFn: entries => { logger.debug({ component: 'event-replay' }, 'Inserting into txs table...'); return db.insertTxBatch(db.sql, entries); diff --git a/src/event-replay/parquet-based/replay-controller.ts b/src/event-replay/parquet-based/replay-controller.ts index b9ec3136ef..31d56f39fb 100644 --- a/src/event-replay/parquet-based/replay-controller.ts +++ b/src/event-replay/parquet-based/replay-controller.ts @@ -1,4 +1,5 @@ import * as tty from 'tty'; +import * as fs from 'fs'; import { PgWriteStore } from '../../datastore/pg-write-store'; import { logger } from '../../logger'; @@ -20,6 +21,8 @@ import { cycleMigrations, dangerousDropAllTables } from '@hirosystems/api-toolki import { PgServer, getConnectionArgs } from '../../datastore/connection'; import { MIGRATIONS_DIR } from '../../datastore/pg-store'; +const EVENTS_DIR = process.env.STACKS_EVENTS_DIR; + /** * This class is an entry point for the event-replay based on parquet files, * being responsible to start the replay process (check "do" method). @@ -80,21 +83,27 @@ export class ReplayController { * */ private ingestAttachmentNewEvents = async () => { - const timeTracker = createTimeTracker(); - try { - await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => { - await processAttachmentNewEvents(this.db, this.dataset); - }); + if (fs.existsSync(`${EVENTS_DIR}/attachments_new`)) { + const timeTracker = createTimeTracker(); + + try { + await timeTracker.track('ATTACHMENTS_NEW_EVENTS', async () => { + await processAttachmentNewEvents(this.db, this.dataset); + }); + } catch (err) { + throw err; + } finally { + if (true || tty.isatty(1)) { + console.log('Tracked function times:'); + console.table(timeTracker.getDurations(3)); + } else { + logger.info(`Tracked function times`, timeTracker.getDurations(3)); + } + } + } } catch (err) { throw err; - } finally { - if (true || tty.isatty(1)) { - console.log('Tracked function times:'); - console.table(timeTracker.getDurations(3)); - } else { - logger.info(`Tracked function times`, timeTracker.getDurations(3)); - } } };