Skip to content

Commit

Permalink
Expire entries + separate integration test (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
HDegroote authored Jul 19, 2024
1 parent 1515f55 commit a31b8aa
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 20 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:
node-version: 20
- run: npm install
- run: npm test
- run: npm run integration

docker:
if: startsWith(github.ref, 'refs/tags/') && !contains(github.ref, 'debug')
Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,14 @@ DHT_PROM_PROMETHEUS_TARGETS_LOC=path/to/prometheus/targets.json DHT_PROM_HTTP_PO

## Test

Note: the tests run [./prep-integration-test.sh](./prep-integration-test.sh), which downloads Prometheus and copies the executable to the ./prometheus directory.

```
npm test
```

Integration tests are also included:

```
npm run integration
```

Note: the integration tests run [./prep-integration-test.sh](./prep-integration-test.sh), which downloads Prometheus and copies the executable to the ./prometheus directory.
96 changes: 85 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ class PrometheusDhtBridge extends ReadyResource {
constructor (dht, server, sharedSecret, {
keyPairSeed,
_forceFlushOnClientReady = false,
prometheusTargetsLoc = DEFAULT_PROM_TARGETS_LOC
prometheusTargetsLoc = DEFAULT_PROM_TARGETS_LOC,
entryExpiryMs = 3 * 60 * 60 * 1000,
checkExpiredsIntervalMs = 60 * 60 * 1000
} = {}) {
super()

Expand All @@ -30,6 +32,10 @@ class PrometheusDhtBridge extends ReadyResource {

this.secret = sharedSecret // Shared with clients

this.entryExpiryMs = entryExpiryMs
this.checkExpiredsIntervalMs = checkExpiredsIntervalMs
this._checkExpiredsInterval = null

this.server = server
this.server.get(
'/scrape/:alias/metrics',
Expand All @@ -44,7 +50,7 @@ class PrometheusDhtBridge extends ReadyResource {
this._writeAliases = debounceify(this._writeAliasesUndebounced.bind(this))

// for tests, to ensure we're connected to the scraper on first scrape
this._forceFlushOnCLientReady = _forceFlushOnClientReady
this._forceFlushOnClientReady = _forceFlushOnClientReady
}

get dht () {
Expand All @@ -61,34 +67,54 @@ class PrometheusDhtBridge extends ReadyResource {
// It is important that the aliases are first loaded
// otherwise the old aliases might get overwritten
await this.aliasRpcServer.ready()

this._checkExpiredsInterval = setInterval(
() => this.cleanupExpireds(),
this.checkExpiredsIntervalMs
)
}

async _close () {
// Should be first (no expireds cleanup during closing)
if (this._checkExpiredsInterval) {
clearInterval(this._checkExpiredsInterval)
}

await this.aliasRpcServer.close()

await Promise.all([
[...this.aliases.values()].map(a => a.close())
])
[...this.aliases.values()].map(a => {
return a.close().catch(safetyCatch)
})]
)

await this.swarm.destroy()

if (this.opened) await this._writeAliases()
}

putAlias (alias, targetPubKey, { write = true } = {}) {
if (!this.opened && write) throw new Error('Cannot put aliases before ready')

targetPubKey = idEnc.decode(idEnc.normalize(targetPubKey))
const current = this.aliases.get(alias)

if (current) {
if (b4a.equals(current.targetKey, targetPubKey)) {
current.setExpiry(Date.now() + this.entryExpiryMs)
const updated = false // Idempotent
return updated
}

current.close().catch(safetyCatch)
}

const scrapeClient = new ScraperClient(this.swarm, targetPubKey)
this.aliases.set(alias, scrapeClient)
const entry = new AliasesEntry(
new ScraperClient(this.swarm, targetPubKey),
Date.now() + this.entryExpiryMs
)

this.aliases.set(alias, entry)
this.emit('set-alias', { alias, publicKey: targetPubKey })
const updated = true

Expand All @@ -99,22 +125,39 @@ class PrometheusDhtBridge extends ReadyResource {
return updated
}

// Should be kept sync (or think hard)
cleanupExpireds () {
const toRemove = []
for (const [alias, entry] of this.aliases) {
if (entry.isExpired) toRemove.push(alias)
}

for (const alias of toRemove) {
const entry = this.aliases.get(alias)
this.aliases.delete(alias)
entry.close().catch(safetyCatch)
this.emit('alias-expired', { publicKey: entry.targetKey, alias })
}
}

async _handleGet (req, reply) {
const alias = req.params.alias

const scrapeClient = this.aliases.get(alias)
const entry = this.aliases.get(alias)

if (!scrapeClient) {
if (!entry) {
reply.code(404)
reply.send('Unknown alias')
return
}

if (!scrapeClient.opened) {
await scrapeClient.ready()
if (this._forceFlushOnCLientReady) await scrapeClient.swarm.flush()
if (!entry.opened) {
await entry.ready()
if (this._forceFlushOnClientReady) await entry.scrapeClient.swarm.flush()
}

const scrapeClient = entry.scrapeClient

let res
try {
res = await scrapeClient.lookup()
Expand Down Expand Up @@ -157,4 +200,35 @@ class PrometheusDhtBridge extends ReadyResource {
}
}

class AliasesEntry extends ReadyResource {
constructor (scrapeClient, expiry) {
super()

this.scrapeClient = scrapeClient
this.expiry = expiry
}

get targetKey () {
return this.scrapeClient.targetKey
}

get isExpired () {
return this.expiry < Date.now()
}

setExpiry (expiry) {
this.expiry = expiry
}

async _open () {
await this.scrapeClient.ready()
}

async _close () {
if (this.scrapeClient.opening) {
await this.scrapeClient.close()
}
}
}

module.exports = PrometheusDhtBridge
6 changes: 3 additions & 3 deletions lib/prom-targets.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ const idEnc = require('hypercore-id-encoding')

async function writePromTargets (location, aliases) {
const targets = []
// DEVNOTE: a bit ugly we know about scrapeClient here,
// DEVNOTE: a bit ugly we know about alias entries here,
// but easier to just pass in the full aliases map
// than to extract the pubkey in the caller
for (const [target, scrapeClient] of aliases) {
const pubKey = idEnc.normalize(scrapeClient.targetKey)
for (const [target, entry] of aliases) {
const pubKey = idEnc.normalize(entry.targetKey)
targets.push(`${target}:${pubKey}`)
}

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"description": "Bridge to scrape Prometheus metrics fully peer to peer",
"main": "index.js",
"scripts": {
"test": "standard && brittle test/test.js && ./prep-integration-test.sh && brittle test/integration.js"
"test": "standard && brittle test/test.js",
"integration": "./prep-integration-test.sh && brittle test/integration.js"
},
"bin": {
"dht-prometheus": "./run.js"
Expand Down
4 changes: 4 additions & 0 deletions run.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ function setupLogging (bridge, logger) {
logger.info(`Updated the aliases file at ${loc}`)
})

bridge.on('alias-expired', ({ alias, publicKey }) => {
logger.info(`Alias entry expired: ${alias} -> ${idEnc.normalize(publicKey)}`)
})

bridge.on('load-aliases-error', e => { // TODO: test
logger.error('failed to load aliases file')
logger.error(e)
Expand Down
71 changes: 68 additions & 3 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const path = require('path')
const { once } = require('events')

const test = require('brittle')
const promClient = require('prom-client')
const DhtPromClient = require('dht-prom-client')
Expand All @@ -7,7 +10,6 @@ const fastify = require('fastify')
const axios = require('axios')
const hypCrypto = require('hypercore-crypto')
const getTmpDir = require('test-tmp')
const path = require('path')
const PrometheusDhtBridge = require('../index')

test('put alias + lookup happy flow', async t => {
Expand Down Expand Up @@ -118,6 +120,7 @@ test('No new alias if adding same key', async t => {
await bridge.ready()
bridge.putAlias('dummy', key)
const clientA = bridge.aliases.get('dummy')
await clientA.ready() // Bit of a hack, but needed for lifecycle check

t.is(clientA != null, true, 'sanity check')
bridge.putAlias('dummy', key)
Expand Down Expand Up @@ -162,7 +165,68 @@ test('A client which registers itself can get scraped', async t => {
)
})

async function setup (t) {
test('A client gets removed and closed after it expires', async t => {
const { bridge } = await setup(t, {
entryExpiryMs: 500,
checkExpiredsIntervalMs: 100
})
const key = 'a'.repeat(64)

await bridge.ready()

bridge.putAlias('dummy', key)

const entry = bridge.aliases.get('dummy')
await entry.ready() // ~Hack, to make it easy to check the lifecycle

t.is(entry.closing === null, true, 'sanity check')
t.is(bridge.aliases.size, 1, 'sanity check')

const [{ alias: expiredAlias }] = await once(bridge, 'alias-expired')
t.is(expiredAlias, 'dummy', 'alias-expired event emitted')

t.is(bridge.aliases.size, 0, 'alias removed when expired')
t.is(entry.closing !== null, true, 'The alias entry is closing (or closed)')
})

test('A client does not get removed if it renews before the expiry', async t => {
// Test is somewhat susceptible to CPU blocking due to timings
// (add more margin if that happens in practice)
const { bridge } = await setup(t, {
entryExpiryMs: 500,
checkExpiredsIntervalMs: 100
})
const key = 'a'.repeat(64)

await bridge.ready()
bridge.putAlias('dummy', key)
setTimeout(() => {
bridge.putAlias('dummy', key)
}, bridge.entryExpiryMs / 2)

const entry = bridge.aliases.get('dummy')
await entry.ready() // ~Hack, to make it easy to check the lifecycle

t.is(entry.closing === null, true, 'sanity check')

t.is(bridge.aliases.size, 1, 'sanity check')

await new Promise(resolve => setTimeout(
resolve, bridge.entryExpiryMs + 100
))

t.is(bridge.aliases.size, 1, 'alias not removed if renewed in time')
t.is(entry.closing === null, true, 'Sanity check: entry not closed if renewed in time')

await new Promise(resolve => setTimeout(
resolve, bridge.entryExpiryMs + 100
))

t.is(bridge.aliases.size, 0, 'alias removed when expired')
t.is(entry.closing !== null, true, 'The alias entry is closing (or closed)')
})

async function setup (t, bridgeOpts = {}) {
promClient.collectDefaultMetrics() // So we have something to scrape
t.teardown(() => promClient.register.clear())

Expand All @@ -177,7 +241,8 @@ async function setup (t) {
const prometheusTargetsLoc = path.join(tmpDir, 'prom-targets.json')
const bridge = new PrometheusDhtBridge(dht, server, sharedSecret, {
_forceFlushOnClientReady: true, // to avoid race conditions
prometheusTargetsLoc
prometheusTargetsLoc,
...bridgeOpts
})
const scraperPubKey = bridge.publicKey

Expand Down

0 comments on commit a31b8aa

Please sign in to comment.