From 5f13912330fbe9a370e83b3433eb8fdaa9f8a764 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Tue, 4 Feb 2025 17:53:39 -0500 Subject: [PATCH] feat: callback to intercept and modify predicates before re-registration (#675) Closes https://github.com/hirosystems/chainhook/issues/674 * First commit adds an optional callback that clients can use to modify an inactive predicate before it is re-registered. * Second commit fixes an issue where the the `setInterval` could result in overlapping healthcheck code running if something like the callback (which could be waiting on a postgres connection, for example) take a long time. --- components/client/typescript/src/index.ts | 24 ++++++-- .../client/typescript/src/predicates.ts | 56 +++++++++++++------ .../typescript/tests/predicates.test.ts | 8 +-- 3 files changed, 63 insertions(+), 25 deletions(-) diff --git a/components/client/typescript/src/index.ts b/components/client/typescript/src/index.ts index da193fc8e..e6fc67303 100644 --- a/components/client/typescript/src/index.ts +++ b/components/client/typescript/src/index.ts @@ -6,6 +6,7 @@ import { Static, Type } from '@fastify/type-provider-typebox'; import { BitcoinIfThisOptionsSchema, BitcoinIfThisSchema } from './schemas/bitcoin/if_this'; import { StacksIfThisOptionsSchema, StacksIfThisSchema } from './schemas/stacks/if_this'; import { logger } from './util/logger'; +import { PredicateSchema } from './schemas/predicate'; const EventObserverOptionsSchema = Type.Object({ /** Event observer host name (usually '0.0.0.0') */ @@ -40,6 +41,9 @@ const EventObserverOptionsSchema = Type.Object({ * up to date. If they become obsolete, we will attempt to re-register them. */ predicate_health_check_interval_ms: Type.Optional(Type.Integer({ default: 5000 })), + predicate_re_register_callback: Type.Optional( + Type.Function([PredicateSchema], Type.Promise(PredicateSchema)) + ), }); /** Chainhook event observer configuration options */ export type EventObserverOptions = Static; @@ -126,14 +130,24 @@ export class ChainhookEventObserver { this.fastify = await buildServer(this.observer, this.chainhook, predicates, callback); await this.fastify.listen({ host: this.observer.hostname, port: this.observer.port }); if (this.observer.predicate_health_check_interval_ms && this.healthCheckTimer === undefined) { - this.healthCheckTimer = setInterval(() => { - predicateHealthCheck(this.observer, this.chainhook).catch(err => - logger.error(err, `ChainhookEventObserver predicate health check error`) - ); - }, this.observer.predicate_health_check_interval_ms); + this.scheduleHealthCheck(); } } + private scheduleHealthCheck() { + this.healthCheckTimer = setTimeout(() => { + void predicateHealthCheck(this.observer, this.chainhook) + .catch(err => { + logger.error(err, `ChainhookEventObserver predicate health check error`); + }) + .finally(() => { + if (this.healthCheckTimer) { + this.scheduleHealthCheck(); + } + }); + }, this.observer.predicate_health_check_interval_ms); + } + /** * Stop the Chainhook event server gracefully. */ diff --git a/components/client/typescript/src/predicates.ts b/components/client/typescript/src/predicates.ts index 93d08ed47..cbdef141d 100644 --- a/components/client/typescript/src/predicates.ts +++ b/components/client/typescript/src/predicates.ts @@ -1,4 +1,4 @@ -import * as fs from 'fs'; +import * as fs from 'fs/promises'; import * as path from 'path'; import { logger } from './util/logger'; import { @@ -17,17 +17,32 @@ const RegisteredPredicates = new Map(); const CompiledPredicateSchema = TypeCompiler.Compile(PredicateSchema); +// Async version of fs.existsSync +async function pathExists(path: string): Promise { + try { + await fs.access(path); + return true; + } catch (error) { + if ((error as NodeJS.ErrnoException).code === 'ENOENT') { + return false; + } + throw error; // Re-throw other errors (e.g., permission issues) + } +} + /** * Looks on disk and returns a map of registered Predicates, where the key is the predicate `name` * as defined by the user. */ -export function recallPersistedPredicatesFromDisk(basePath: string): Map { +export async function recallPersistedPredicatesFromDisk( + basePath: string +): Promise> { RegisteredPredicates.clear(); try { - if (!fs.existsSync(basePath)) return RegisteredPredicates; - for (const file of fs.readdirSync(basePath)) { + if (!(await pathExists(basePath))) return RegisteredPredicates; + for (const file of await fs.readdir(basePath)) { if (file.endsWith('.json')) { - const text = fs.readFileSync(path.join(basePath, file), 'utf-8'); + const text = await fs.readFile(path.join(basePath, file), 'utf-8'); const predicate = JSON.parse(text) as JSON; if (CompiledPredicateSchema.Check(predicate)) { logger.info( @@ -44,11 +59,11 @@ export function recallPersistedPredicatesFromDisk(basePath: string): Map { await server.start([testPredicate], async () => {}); expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true); - const disk = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path); + const disk = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path); const storedPredicate = disk.get('test'); expect(storedPredicate).not.toBeUndefined(); expect(storedPredicate?.name).toBe(testPredicate.name); @@ -102,8 +102,8 @@ describe('predicates', () => { }); describe('pre-stored', () => { - beforeEach(() => { - savePredicateToDisk(observer.predicate_disk_file_path, { + beforeEach(async () => { + await savePredicateToDisk(observer.predicate_disk_file_path, { uuid: 'e2777d77-473a-4c1d-9012-152deb36bf4c', name: 'test', version: 1, @@ -164,7 +164,7 @@ describe('predicates', () => { mockAgent.assertNoPendingInterceptors(); expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true); - const disk = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path); + const disk = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path); const storedPredicate = disk.get('test'); // Should have a different uuid expect(storedPredicate?.uuid).not.toBe('e2777d77-473a-4c1d-9012-152deb36bf4c');