From 27c527e81c659635d2c824af5806b6a6e08baa27 Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Tue, 17 Sep 2024 23:39:20 +0200 Subject: [PATCH] V1 (#15) * Use v1 deps * rm todo * Put logs in registerLogger function * Clean up logs * Use latest scrape-client API * Use published dht-prom-client * Update logs to v1 scrape client * Simplify: no async life cycle for alias entries * Complete readme --- README.md | 16 +++++- index.js | 136 +++++++++++++++++++++++++++++--------------- package.json | 4 +- run.js | 100 +------------------------------- test/integration.js | 10 ++-- test/test.js | 18 +++--- 6 files changed, 121 insertions(+), 163 deletions(-) diff --git a/README.md b/README.md index 5024a1e..6951c4b 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ # DHT Prometheus -A bridge to scrape Prometheus metrics from self-registering services, all using direct, end-to-end encrypted peer-to-peer connections. +A bridge to scrape Prometheus metrics from self-registering services, all using direct, end-to-end encrypted peer-to-peer connections (not http). -Its main advantage is that it does not use http: service discovery is done with a decentralised hash table ([HyperDHT](https://github.com/holepunchto/hyperdht)). This means that both this service and the clients it scrapes can live behind a firewall and need no reverse proy nor DNS entries. +Service discovery is done with a decentralised hash table ([HyperDHT](https://github.com/holepunchto/hyperdht)). This means that both this service and the clients it scrapes can live behind a firewall and need no reverse proy nor DNS entries. -Another advantage is the small amount of configuration required. [Clients](https://gitlab.com/dcent-tech/dht-prom-client) register themselves with the DHT-Prometheus service, so no manual list of targets needs to be maintained. All a client needs to register itself, is the DHT-Prometheus service's public key, and a shared secret. +An advantage is the small amount of configuration required. [Clients](https://gitlab.com/dcent-tech/dht-prom-client) register themselves with the DHT-Prometheus service, so no manual list of targets needs to be maintained. All a client needs to register itself, is the DHT-Prometheus service's public key, and a shared secret. ## Deployment @@ -16,6 +16,14 @@ The DHT-prometheus service fulfils two complementary roles: ### Run +Configuration is done through environment variables: + +- `DHT_PROM_KEY_PAIR_SEED`: 32-byte seed passed to `HyperDHT.keyPair()`, set as hex or z32. Set this to have a consistent public key (otherwise random, which is only useful for tests). +- `DHT_PROM_SHARED_SECRET`: 32-byte secret key, set as hex or z32. +- `DHT_PROM_LOG_LEVEL`: defaults to info +- `DHT_PROM_HTTP_PORT`: port where the http server listens. Defaults to a random port. +- `DHT_PROM_HTTP_HOST`: host where the http server listens. Defaults to 127.0.0.1 + #### Docker ``` @@ -26,6 +34,8 @@ The intent is for the prometheus service to read its config from a read-only bin Note: `/etc/prometheus/config/prometheus-dht-targets` should be writable by the container's user. +Note: `--network=host` is optional, but HyperDHT holepunching can struggle using the default bridge network, particularly for LAN and localhost connections. + #### CLI Install: diff --git a/index.js b/index.js index 68f9489..1fda1e6 100644 --- a/index.js +++ b/index.js @@ -76,11 +76,10 @@ class PrometheusDhtBridge extends ReadyResource { } async _open () { - await this._loadAliases() - // It is important that the aliases are first loaded // otherwise the old aliases might get overwritten - await this.aliasRpcServer.ready() + await this._loadAliases() + await this.swarm.listen() this._checkExpiredsInterval = setInterval( () => this.cleanupExpireds(), @@ -94,13 +93,9 @@ class PrometheusDhtBridge extends ReadyResource { clearInterval(this._checkExpiredsInterval) } - await this.aliasRpcServer.close() - - await Promise.all([ - [...this.aliases.values()].map(a => { - return a.close().catch(safetyCatch) - })] - ) + for (const entry of this.aliases.values()) { + entry.close() + } await this.swarm.destroy() @@ -120,7 +115,7 @@ class PrometheusDhtBridge extends ReadyResource { return updated } - current.close().catch(safetyCatch) + current.close() } const entry = new AliasesEntry( @@ -131,7 +126,6 @@ class PrometheusDhtBridge extends ReadyResource { ) this.aliases.set(alias, entry) - // TODO: just emit entry? this.emit('set-alias', { alias, entry }) const updated = true @@ -142,25 +136,6 @@ class PrometheusDhtBridge extends ReadyResource { return updated } - // Should be kept sync (or think hard) - cleanupExpireds () { - const toRemove = [] - for (const [alias, entry] of this.aliases) { - if (entry.isExpired) toRemove.push(alias) - } - - for (const alias of toRemove) { - const entry = this.aliases.get(alias) - this.aliases.delete(alias) - entry.close().catch(safetyCatch) - this.emit('alias-expired', { publicKey: entry.targetKey, alias }) - } - - if (toRemove.length > 0) { - this._writeAliases().catch(safetyCatch) - } - } - async _handleGet (req, reply) { const alias = req.params.alias @@ -172,16 +147,16 @@ class PrometheusDhtBridge extends ReadyResource { return } - if (!entry.opened) { - await entry.ready() - if (this._forceFlushOnClientReady) await entry.scrapeClient.swarm.flush() + if (this._forceFlushOnClientReady && !entry.hasHandledGet) { + await entry.scrapeClient.swarm.flush() } + entry.hasHandledGet = true const scrapeClient = entry.scrapeClient let res try { - res = await scrapeClient.lookup() + res = await scrapeClient.requestMetrics() } catch (e) { this.emit('upstream-error', e) reply.code(502) @@ -217,19 +192,94 @@ class PrometheusDhtBridge extends ReadyResource { this.putAlias(alias, z32PubKey, hostname, service, { write: false }) } } catch (e) { + // An error is expected if the file does not yet exist + // (typically first run only) this.emit('load-aliases-error', e) } } + + // Should be kept sync (or think hard) + cleanupExpireds () { + const toRemove = [] + for (const [alias, entry] of this.aliases) { + if (entry.isExpired) toRemove.push(alias) + } + + for (const alias of toRemove) { + const entry = this.aliases.get(alias) + this.aliases.delete(alias) + entry.close() + this.emit('alias-expired', { publicKey: entry.targetKey, alias }) + } + + if (toRemove.length > 0) { + this._writeAliases().catch(safetyCatch) + } + } + + registerLogger (logger) { + this.on('set-alias', ({ alias, entry }) => { + const scrapeClient = entry.scrapeClient + const publicKey = scrapeClient.targetKey + const { service, hostname } = entry + + logger.info(`Registered alias: ${alias} -> ${idEnc.normalize(publicKey)} (${service} on host ${hostname})`) + + scrapeClient.on('connection-open', ({ uid, remotePublicKey, remoteAddress }) => { + logger.info(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} opened connection to ${remoteAddress} (uid: ${uid})`) + }) + scrapeClient.on('connection-close', ({ uid, remotePublicKey, remoteAddress }) => { + logger.info(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} closed connection to ${remoteAddress} (uid: ${uid})`) + }) + scrapeClient.on('connection-error', ({ error, uid, remotePublicKey, remoteAddress }) => { + logger.info(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} at ${remoteAddress} connection error (uid: ${uid}): ${error.stack}`) + }) + + if (logger.level === 'debug') { + scrapeClient.on('connection-ignore', ({ uid, remotePublicKey, remoteAddress }) => { + logger.debug(`Scraper for ${alias}->${idEnc.normalize(remotePublicKey)} at ${remoteAddress} ignored connection (uid: ${uid})`) + }) + } + }) + + this.on('aliases-updated', (loc) => { + logger.info(`Updated the aliases file at ${loc}`) + }) + + this.on('alias-expired', ({ alias, publicKey }) => { + logger.info(`Alias entry expired: ${alias} -> ${idEnc.normalize(publicKey)}`) + }) + + this.on('load-aliases-error', e => { + // Expected first time the service starts (creates it then) + logger.error(`failed to load aliases file: ${e.stack}`) + }) + + this.on('upstream-error', e => { + logger.info(`upstream error: ${e.stack}`) + }) + + this.on('write-aliases-error', e => { + logger.error(`Failed to write aliases file ${e.stack}`) + }) + + this.aliasRpcServer.registerLogger(logger) + } } -class AliasesEntry extends ReadyResource { +class AliasesEntry { constructor (scrapeClient, hostname, service, expiry) { - super() - this.scrapeClient = scrapeClient this.hostname = hostname this.service = service this.expiry = expiry + this.hasHandledGet = false + + this.scrapeClient.ready() + } + + get closed () { + return this.scrapeClient.closed } get targetKey () { @@ -244,14 +294,8 @@ class AliasesEntry extends ReadyResource { this.expiry = expiry } - async _open () { - await this.scrapeClient.ready() - } - - async _close () { - if (this.scrapeClient.opening) { - await this.scrapeClient.close() - } + close () { + this.scrapeClient.close() } } diff --git a/package.json b/package.json index c84af26..b19b89d 100644 --- a/package.json +++ b/package.json @@ -40,8 +40,8 @@ "dependencies": { "b4a": "^1.6.6", "debounceify": "^1.1.0", - "dht-prom-alias-rpc": "^0.0.1-alpha.1", - "dht-prom-client": "^0.0.1-alpha.10", + "dht-prom-alias-rpc": "^1.0.0", + "dht-prom-client": "^1.0.1", "fastify": "^4.28.0", "graceful-goodbye": "^1.3.0", "hypercore-id-encoding": "^1.3.0", diff --git a/run.js b/run.js index ee0e62b..a69cdf5 100755 --- a/run.js +++ b/run.js @@ -13,7 +13,7 @@ function loadConfig () { prometheusTargetsLoc: process.env.DHT_PROM_PROMETHEUS_TARGETS_LOC || './prometheus/targets.json', logLevel: (process.env.DHT_PROM_LOG_LEVEL || 'info').toLowerCase(), httpPort: process.env.DHT_PROM_HTTP_PORT || 0, - httpHost: '127.0.0.1', + httpHost: process.env.DHT_PROM_HTTP_HOST || '127.0.0.1', _forceFlushOnClientReady: process.env._DHT_PROM_FORCE_FLUSH || 'false' // Tests only } @@ -75,7 +75,7 @@ async function main () { serverLogLevel }) - setupLogging(bridge, logger) + bridge.registerLogger(logger) goodbye(async () => { logger.info('Shutting down') @@ -90,100 +90,4 @@ async function main () { logger.info(`DHT RPC ready at public key ${idEnc.normalize(bridge.publicKey)}`) } -function setupLogging (bridge, logger) { - bridge.on('set-alias', ({ alias, entry }) => { - const scrapeClient = entry.scrapeClient - const publicKey = scrapeClient.targetKey - const { service, hostname } = entry - - logger.info(`Registered alias: ${alias} -> ${idEnc.normalize(publicKey)} (${service} on host ${hostname})`) - - scrapeClient.on('connection-open', ({ uid, targetKey, peerInfo }) => { - logger.info(`Scraper for ${alias}->${idEnc.normalize(targetKey)} opened connection from ${idEnc.normalize(peerInfo.publicKey)} (uid: ${uid})`) - }) - scrapeClient.on('connection-close', ({ uid }) => { - logger.info(`Scraper for ${alias} closed connection (uid: ${uid})`) - }) - scrapeClient.on('connection-error', ({ error, uid }) => { - logger.info(`Scraper for ${alias} connection error (uid: ${uid})`) - logger.info(error) - }) - - if (logger.level === 'debug') { - scrapeClient.on('connection-ignore', ({ uid }) => { - logger.debug(`Scraper for ${alias} ignored connection (uid: ${uid})`) - }) - } - }) - - bridge.on('aliases-updated', (loc) => { - logger.info(`Updated the aliases file at ${loc}`) - }) - - bridge.on('alias-expired', ({ alias, publicKey }) => { - logger.info(`Alias entry expired: ${alias} -> ${idEnc.normalize(publicKey)}`) - }) - - bridge.on('load-aliases-error', e => { // TODO: test - // Expected first time the service starts (creates it then) - logger.error('failed to load aliases file') - logger.error(e) - }) - - bridge.on('upstream-error', e => { // TODO: test - logger.info('upstream error:') - logger.info(e) - }) - - bridge.on('write-aliases-error', e => { - logger.error('Failed to write aliases file') - logger.error(e) - }) - - bridge.aliasRpcServer.on( - 'alias-request', - ({ uid, remotePublicKey, targetPublicKey, alias }) => { - logger.info(`Alias request from ${idEnc.normalize(remotePublicKey)} to set ${alias}->${idEnc.normalize(targetPublicKey)} (uid ${uid})`) - } - ) - bridge.aliasRpcServer.on( - 'register-success', ({ uid, alias, targetPublicKey, updated }) => { - logger.info(`Alias success for ${alias}->${idEnc.normalize(targetPublicKey)}--updated: ${updated} (uid: ${uid})`) - } - ) - // TODO: log IP address + rate limit - bridge.aliasRpcServer.on( - 'alias-unauthorised', ({ uid, remotePublicKey, targetPublicKey, alias }) => { - logger.info(`Unauthorised alias request from ${idEnc.normalize(remotePublicKey)} to set alias ${alias}->${idEnc.normalize(targetPublicKey)} (uid: ${uid})`) - } - ) - bridge.aliasRpcServer.on( - 'register-error', ({ uid, error }) => { - logger.info(`Alias error: ${error} (${uid})`) - } - ) - - bridge.aliasRpcServer.on( - 'connection-open', - ({ uid, peerInfo }) => { - const remotePublicKey = idEnc.normalize(peerInfo.publicKey) - logger.info(`Alias server opened connection to ${idEnc.normalize(remotePublicKey)} (uid ${uid})`) - } - ) - bridge.aliasRpcServer.on( - 'connection-close', - ({ uid, peerInfo }) => { - const remotePublicKey = idEnc.normalize(peerInfo.publicKey) - logger.info(`Alias server closed connection to ${idEnc.normalize(remotePublicKey)} (uid ${uid})`) - } - ) - bridge.aliasRpcServer.on( - 'connection-error', - ({ uid, error, peerInfo }) => { - const remotePublicKey = idEnc.normalize(peerInfo.publicKey) - logger.info(`Alias server socket error: ${error.stack} on connection to ${idEnc.normalize(remotePublicKey)} (uid ${uid})`) - } - ) -} - main() diff --git a/test/integration.js b/test/integration.js index c0ba49f..8d0c87e 100644 --- a/test/integration.js +++ b/test/integration.js @@ -17,12 +17,9 @@ const axios = require('axios') const BRIDGE_EXECUTABLE = path.join(path.dirname(__dirname), 'run.js') const PROMETHEUS_EXECUTABLE = path.join(path.dirname(__dirname), 'prometheus', 'prometheus') -const DEBUG = true +const DEBUG = false const DEBUG_PROMETHEUS = false -// Note: move this inside the test if we ever have >1 integration test -promClient.collectDefaultMetrics() // So we have something to scrape - // To force the process.on('exit') to be called on those exits too process.prependListener('SIGINT', () => process.exit(1)) process.prependListener('SIGTERM', () => process.exit(1)) @@ -34,6 +31,11 @@ test('Integration test, happy path', async t => { throw new Error('the integration test requires a prometheus exec') } + promClient.collectDefaultMetrics() // So we have something to scrape + t.teardown(() => { + promClient.register.clear() + }) + const tBridgeSetup = t.test('Bridge setup') tBridgeSetup.plan(2) diff --git a/test/test.js b/test/test.js index 26a0843..8ecfd8d 100644 --- a/test/test.js +++ b/test/test.js @@ -120,16 +120,15 @@ test('No new alias if adding same key', async t => { await bridge.ready() bridge.putAlias('dummy', key) const clientA = bridge.aliases.get('dummy') - await clientA.ready() // Bit of a hack, but needed for lifecycle check t.is(clientA != null, true, 'sanity check') bridge.putAlias('dummy', key) t.is(clientA, bridge.aliases.get('dummy'), 'no new client') - t.is(clientA.closing == null, true, 'sanity check') + t.is(clientA.closed, false, 'sanity check') bridge.putAlias('dummy', key2) t.not(clientA, bridge.aliases.get('dummy'), 'sanity check') - t.is(clientA.closing != null, true, 'lifecycle ok') + t.is(clientA.closed, true, 'lifecycle ok') }) test('A client which registers itself can get scraped', async t => { @@ -157,6 +156,7 @@ test('A client which registers itself can get scraped', async t => { `${baseUrl}/scrape/dummy/metrics`, { validateStatus: null } ) + t.is(res.status, 200, 'correct status') t.is( res.data.includes('process_cpu_user_seconds_total'), @@ -177,16 +177,14 @@ test('A client gets removed and closed after it expires', async t => { bridge.putAlias('dummy', key) const entry = bridge.aliases.get('dummy') - await entry.ready() // ~Hack, to make it easy to check the lifecycle - t.is(entry.closing === null, true, 'sanity check') t.is(bridge.aliases.size, 1, 'sanity check') const [{ alias: expiredAlias }] = await once(bridge, 'alias-expired') t.is(expiredAlias, 'dummy', 'alias-expired event emitted') t.is(bridge.aliases.size, 0, 'alias removed when expired') - t.is(entry.closing !== null, true, 'The alias entry is closing (or closed)') + t.is(entry.closed, true, 'The alias entry is closed') await once(bridge, 'aliases-updated') t.pass('aliases file rewritten after an entry gets removed') @@ -208,9 +206,8 @@ test('A client does not get removed if it renews before the expiry', async t => }, bridge.entryExpiryMs / 2) const entry = bridge.aliases.get('dummy') - await entry.ready() // ~Hack, to make it easy to check the lifecycle - t.is(entry.closing === null, true, 'sanity check') + t.is(entry.closed, false, 'sanity check') t.is(bridge.aliases.size, 1, 'sanity check') @@ -219,14 +216,14 @@ test('A client does not get removed if it renews before the expiry', async t => )) t.is(bridge.aliases.size, 1, 'alias not removed if renewed in time') - t.is(entry.closing === null, true, 'Sanity check: entry not closed if renewed in time') + t.is(entry.closed, false, 'Sanity check: entry not closed if renewed in time') await new Promise(resolve => setTimeout( resolve, bridge.entryExpiryMs + 100 )) t.is(bridge.aliases.size, 0, 'alias removed when expired') - t.is(entry.closing !== null, true, 'The alias entry is closing (or closed)') + t.is(entry.closed, true, 'The alias entry is closed') }) async function setup (t, bridgeOpts = {}) { @@ -266,6 +263,7 @@ async function setup (t, bridgeOpts = {}) { await dhtPromClient.close() await dht.destroy() await testnet.destroy() + promClient.register.clear() }) const ownPublicKey = dhtPromClient.dht.defaultKeyPair.publicKey