Skip to content

Commit

Permalink
feat: deal monitor alert
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 27, 2023
1 parent 96d4431 commit 50b6eef
Show file tree
Hide file tree
Showing 7 changed files with 484 additions and 7 deletions.
3 changes: 1 addition & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"fzstd": "^0.1.0",
"multiformats": "12.0.1",
"uint8arrays": "^4.0.4",
"pretty-ms": "^8.0.0",
"p-all": "^5.0.0",
"p-retry": "^5.1.2",
"stream-read-all": "^4.0.0"
Expand Down
177 changes: 177 additions & 0 deletions packages/core/src/monitor/deal-monitor-alert-tick.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import prettyMilliseconds from 'pretty-ms'

/**
* @typedef {import('@web3-storage/data-segment').PieceLink} PieceLink
* @typedef {import('@web3-storage/filecoin-api/aggregator/api').AggregateStore} AggregatorAggregateStore
* @typedef {import('@web3-storage/filecoin-api/dealer/api').AggregateStore} DealerAggregateStore
* @typedef {import('@web3-storage/filecoin-api/dealer/api').AggregateRecord} AggregateRecord
*
* @typedef {object} MonitorContext
* @property {AggregatorAggregateStore} context.aggregatorAggregateStore
* @property {DealerAggregateStore} context.dealerAggregateStore
* @property {number} context.oldestPieceCriticalThresholdMs
* @property {number} context.oldestPieceWarnThresholdMs
* @property {number} context.aggregateMonitorThresholdMs
* @property {URL} context.monitoringNotificationsUrl
*
* @typedef {object} Alert
* @property {PieceLink} aggregate
* @property {number} duration
* @property {string} severity
*/

/**
* On CRON tick, get aggregates without deals, and verify if there
*
* @param {MonitorContext} context
*/
export async function dealMonitorAlertTick (context) {
// Get offered deals pending approval/rejection
const offeredAggregates = await context.dealerAggregateStore.query({
status: 'offered',
})
if (offeredAggregates.error) {
return {
error: offeredAggregates.error,
}
}

// Get offered aggregates to monitor
const offeredAggregatesToMonitor = []
const currentTime = Date.now()
for (const offeredAggregate of offeredAggregates.ok) {
const offerTime = (new Date(offeredAggregate.insertedAt)).getTime()
// Monitor if offer time + monitor threshold is bigger than current time
if (offerTime + context.aggregateMonitorThresholdMs > currentTime) {
offeredAggregatesToMonitor.push(offeredAggregate)
}
}

// Get aggregates duration
const monitoredAggregatesResponse = await Promise.all(
offeredAggregatesToMonitor.map(aggregate => monitorAggregate(aggregate, context))
)
// Fail if any failed to get information
const monitoredAggregatesErrorResponse = monitoredAggregatesResponse.find(r => r?.error)
if (monitoredAggregatesErrorResponse) {
return {
error: monitoredAggregatesErrorResponse.error
}
}

const alerts = /** @typedef {Alert[]} */ ([])
// Verify if monitored aggregates should create notifications
for (const res of monitoredAggregatesResponse) {
// @ts-ignore if not ok, should have failed before
const duration = /** @type {number} */ (res.ok?.duration)
// @ts-ignore if not ok, should have failed before
const aggregate = /** @type {import('@web3-storage/data-segment').PieceLink} */ (res.ok?.aggregate)

if (duration > context.oldestPieceCriticalThresholdMs) {
alerts.push({
aggregate,
duration,
severity: 'critical'
})
} else if (duration > context.oldestPieceWarnThresholdMs) {
alerts.push({
aggregate,
duration,
severity: 'warn'
})
}
}

if (!alerts.length) {
return {
ok: {
alerts
}
}
}

// Send alerts
const alertPayload = getAlertPayload(alerts)
const alertResponse = await fetch(
context.monitoringNotificationsUrl,
{
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(alertPayload)
}
)
if (!alertResponse.ok) {
return {
error: new Error(`failed to send alert to ${context.monitoringNotificationsUrl} with ${alerts.length}`)
}
}

return {
ok: {
alerts
}
}
}

/**
* @param {AggregateRecord} aggregateRecord
* @param {MonitorContext} context
*/
async function monitorAggregate (aggregateRecord, context) {
const getAggregateInfo = await context.aggregatorAggregateStore.get({
aggregate: aggregateRecord.aggregate
})
if (getAggregateInfo.error) {
return {
error: getAggregateInfo.error
}
}

// Get aggregate current duration
const currentTime = Date.now()
// @ts-expect-error needs updated dep
const offerTime = (new Date(getAggregateInfo.ok.oldestPieceInsertedAt)).getTime()

return {
ok: {
aggregate: aggregateRecord.aggregate,
duration: currentTime - offerTime
}
}
}

/**
* Construct alert based on payload from Grafana Alerting.
*
* @see https://grafana.com/docs/oncall/latest/integrations/grafana-alerting/
* @see https://prometheus.io/docs/alerting/latest/notifications/#data
*
* @param {Alert[]} alerts
*/
function getAlertPayload (alerts) {
return {
alerts: alerts.map(a => ({
labels: {
aggregate: a.aggregate.toString(),
duration: prettyMilliseconds(a.duration),
severity: a.severity,
},
status: 'firing',
fingerprint: a.aggregate.toString()
})),
status: 'firing',
version: '4',
groupKey: '{}:{alertname=\\FilecoinDealDelay\\}',
receiver: 'combo',
groupLabels: {
alertname: 'FilecoinDealDelay'
},
commonLabels: {
job: 'deal-monitor-alert',
group: 'production',
alertname: 'FilecoinDealDelay'
}
}
}
180 changes: 180 additions & 0 deletions packages/core/test/monitor/handle-deal-monitor-cront-tick.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import delay from 'delay'
import * as Signer from '@ucanto/principal/ed25519'
import { randomAggregate } from '@web3-storage/filecoin-api/test'
import { CBOR } from '@ucanto/server'

import { createClient as createAggregatorAggregateStoreClient } from '../../src/store/aggregator-aggregate-store.js'
import { createClient as createDealerAggregateStoreClient } from '../../src/store/dealer-aggregate-store.js'
import { dealerAggregateStoreTableProps, aggregatorAggregateStoreTableProps } from '../../src/store/index.js'

import * as dealMonitorAlertTick from '../../src/monitor/deal-monitor-alert-tick.js'

import { testStore as test } from '../helpers/context.js'
import { createDynamodDb, createTable } from '../helpers/resources.js'

test.before(async (t) => {
const { client: dynamoClient, stop: dynamoStop} = await createDynamodDb()

Object.assign(t.context, {
dynamoClient,
stop: async () => {
await dynamoStop()
}
})
})

test.after(async t => {
await t.context.stop()
await delay(1000)
})

test('handles deal monitor tick without aggregates available', async t => {
const context = await getContext(t.context)

const tickRes = await dealMonitorAlertTick.dealMonitorAlertTick({
...context,
oldestPieceCriticalThresholdMs: 0,
oldestPieceWarnThresholdMs: 0,
aggregateMonitorThresholdMs: 0
})

t.assert(tickRes.ok)
t.is(tickRes.ok?.alerts.length, 0)
})

test('handles deal monitor tick with aggregates in warn type', async t => {
const context = await getContext(t.context)
const storefront = await Signer.generate()
const group = storefront.did()
const { pieces, aggregate } = await randomAggregate(10, 128)
const threshold = 1000
const pieceInsertTime = Date.now() - threshold
const buffer = {
pieces: pieces.map((p) => ({
piece: p.link,
insertedAt: new Date(
pieceInsertTime
).toISOString(),
policy: 0,
})),
group,
}
const block = await CBOR.write(buffer)
// Store aggregate record into store
const offer = pieces.map((p) => p.link)
const piecesBlock = await CBOR.write(offer)

// Store aggregate in aggregator
const aggregatePutRes = await context.aggregatorAggregateStore.put({
aggregate: aggregate.link,
pieces: piecesBlock.cid,
buffer: block.cid,
group,
insertedAt: new Date().toISOString(),
// @ts-expect-error needs
oldestPieceInsertedAt: new Date().toISOString(),
})
t.assert(aggregatePutRes.ok)

// Propagate aggregate to dealer
const putRes = await context.dealerAggregateStore.put({
aggregate: aggregate.link,
pieces: piecesBlock.cid,
status: 'offered',
insertedAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
})
t.assert(putRes.ok)

const tickRes = await dealMonitorAlertTick.dealMonitorAlertTick({
...context,
// do not wait
oldestPieceCriticalThresholdMs: threshold * 10,
oldestPieceWarnThresholdMs: 0,
aggregateMonitorThresholdMs: threshold
})

t.assert(tickRes.ok)
t.is(tickRes.ok?.alerts.length, 1)
t.is(tickRes.ok?.alerts[0].severity, 'warn')
t.assert(tickRes.ok?.alerts[0].duration)
t.assert(tickRes.ok?.alerts[0].aggregate.equals(aggregate.link))
})

test('handles deal monitor tick with aggregates in critical type', async t => {
const context = await getContext(t.context)
const storefront = await Signer.generate()
const group = storefront.did()
const { pieces, aggregate } = await randomAggregate(10, 128)
const threshold = 1000
const pieceInsertTime = Date.now() - threshold
const buffer = {
pieces: pieces.map((p) => ({
piece: p.link,
insertedAt: new Date(
pieceInsertTime
).toISOString(),
policy: 0,
})),
group,
}
const block = await CBOR.write(buffer)
// Store aggregate record into store
const offer = pieces.map((p) => p.link)
const piecesBlock = await CBOR.write(offer)

// Store aggregate in aggregator
const aggregatePutRes = await context.aggregatorAggregateStore.put({
aggregate: aggregate.link,
pieces: piecesBlock.cid,
buffer: block.cid,
group,
insertedAt: new Date().toISOString(),
// @ts-expect-error needs
oldestPieceInsertedAt: new Date().toISOString(),
})
t.assert(aggregatePutRes.ok)

// Propagate aggregate to dealer
const putRes = await context.dealerAggregateStore.put({
aggregate: aggregate.link,
pieces: piecesBlock.cid,
status: 'offered',
insertedAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
})
t.assert(putRes.ok)

const tickRes = await dealMonitorAlertTick.dealMonitorAlertTick({
...context,
// should do critical check first
oldestPieceCriticalThresholdMs: 0,
oldestPieceWarnThresholdMs: 0,
aggregateMonitorThresholdMs: threshold
})

t.assert(tickRes.ok)
t.is(tickRes.ok?.alerts.length, 1)
t.is(tickRes.ok?.alerts[0].severity, 'critical')
t.assert(tickRes.ok?.alerts[0].duration)
t.assert(tickRes.ok?.alerts[0].aggregate.equals(aggregate.link))
})

/**
* @param {import('../helpers/context.js').DbContext} context
*/
async function getContext (context) {
const { dynamoClient } = context
const aggregatorAggregateStoreTableName = await createTable(dynamoClient, aggregatorAggregateStoreTableProps)
const dealerAggregateStoreTableName = await createTable(dynamoClient, dealerAggregateStoreTableProps)

return {
aggregatorAggregateStore: createAggregatorAggregateStoreClient(dynamoClient, {
tableName: aggregatorAggregateStoreTableName
}),
dealerAggregateStore: createDealerAggregateStoreClient(dynamoClient, {
tableName: dealerAggregateStoreTableName
}),
monitoringNotificationsUrl: new URL(`http://127.0.0.1:${process.env.PORT || 9001}`)
}
}
Loading

0 comments on commit 50b6eef

Please sign in to comment.