Skip to content

Commit

Permalink
feat: deal monitor alert
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 27, 2023
1 parent 96d4431 commit 5d1cb0d
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 5 deletions.
143 changes: 143 additions & 0 deletions packages/core/src/dealer/deal-monitor-alert-tick.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* @typedef {import('@web3-storage/data-segment').PieceLink} PieceLink
* @typedef {import('@web3-storage/filecoin-api/aggregator/api').AggregateStore} AggregatorAggregateStore
* @typedef {import('@web3-storage/filecoin-api/dealer/api').AggregateStore} DealerAggregateStore
* @typedef {import('@web3-storage/filecoin-api/dealer/api').AggregateRecord} AggregateRecord
*
* @typedef {object} MonitorContext
* @property {AggregatorAggregateStore} context.aggregatorAggregateStore
* @property {DealerAggregateStore} context.dealerAggregateStore
* @property {number} context.oldestPieceAlertThresholdMs
* @property {number} context.oldestPieceWarnThresholdMs
* @property {number} context.aggregateMonitorThresholdMs
*
* @typedef {Object} Notification
* @property {PieceLink} aggregate
* @property {number} duration
*/

/**
* On CRON tick, get aggregates without deals, and verify if there
*
* @param {MonitorContext} context
*/
export async function dealMonitorAlertTick (context) {
// TODO: URL, ...

// Get offered deals pending approval/rejection
const offeredAggregates = await context.dealerAggregateStore.query({
status: 'offered',
})
if (offeredAggregates.error) {
return {
error: offeredAggregates.error,
}
}

// Get offered aggregates to monitor
const offeredAggregatesToMonitor = []
const currentTime = Date.now()
for (const offeredAggregate of offeredAggregates.ok) {
const offerTime = (new Date(offeredAggregate.insertedAt)).getTime()
// Monitor if offer time + monitor threshold is bigger than current time
if (offerTime + context.aggregateMonitorThresholdMs > currentTime) {
offeredAggregatesToMonitor.push(offeredAggregate)
}
}

// Get aggregates duration
const monitoredAggregatesResponse = await Promise.all(
offeredAggregatesToMonitor.map(aggregate => monitorAggregate(aggregate, context))
)
// Fail if any failed to get information
const monitoredAggregatesErrorResponse = monitoredAggregatesResponse.find(r => r?.error)
if (monitoredAggregatesErrorResponse) {
return {
error: monitoredAggregatesErrorResponse.error
}
}

const toWarn = /** @typedef {Notification[]} */ ([])
const toAlert = /** @typedef {Notification[]} */ ([])

// Verify if monitored aggregates should create notifications
for (const res of monitoredAggregatesResponse) {
// @ts-ignore if not ok, should have failed before
const duration = /** @type {number} */ (res.ok?.duration)
// @ts-ignore if not ok, should have failed before
const aggregate = /** @type {import('@web3-storage/data-segment').PieceLink} */ (res.ok?.aggregate)

if (duration > context.oldestPieceAlertThresholdMs) {
toAlert.push({
aggregate,
duration
})
} else if (duration > context.oldestPieceWarnThresholdMs) {
toWarn.push({
aggregate,
duration
})
}
}

// Send alerts


return {
ok: {}
}
}

/**
* @param {AggregateRecord} aggregateRecord
* @param {MonitorContext} context
*/
async function monitorAggregate (aggregateRecord, context) {
const getAggregateInfo = await context.aggregatorAggregateStore.get({
aggregate: aggregateRecord.aggregate
})
if (getAggregateInfo.error) {
return {
error: getAggregateInfo.error
}
}

// Get aggregate current duration
const currentTime = Date.now()
// @ts-expect-error needs updated dep
const offerTime = (new Date(getAggregateInfo.ok.oldestPieceInsertedAt)).getTime()

return {
ok: {
aggregate: aggregateRecord.aggregate,
duration: currentTime - offerTime
}
}

// // Verify if already in alert threshold
// if (offerTime + context.oldestPieceAlertThresholdMs > currentTime) {
// return {
// ok: {
// alert: {
// aggregate: aggregateRecord.aggregate,
// duration: currentTime - offerTime
// }
// }
// }
// // Verify if already in warn threshold
// } else if (offerTime + context.oldestPieceWarnThresholdMs > currentTime) {
// return {
// ok: {
// warn: {
// aggregate: aggregateRecord.aggregate,
// duration: currentTime - offerTime
// }
// }
// }
// }
// return {
// ok: {

// }
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import * as Sentry from '@sentry/serverless'
import { Table } from 'sst/node/table'

// store clients
import { createClient as createAggregatorAggregateStoreClient } from '@w3filecoin/core/src/store/aggregator-aggregate-store.js'
import { createClient as createDealerAggregateStoreClient } from '@w3filecoin/core/src/store/dealer-aggregate-store.js'

import { dealMonitorAlertTick } from '@w3filecoin/core/src/dealer/deal-monitor-alert-tick.js'

import { mustGetEnv } from '../utils.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

async function handleEvent () {
const {
aggregatorAggregateStoreTableName,
aggregatorAggregateStoreTableRegion,
dealerAggregateStoreTableName,
dealerAggregateStoreTableRegion,
oldestPieceAlertThresholdMs,
oldestPieceWarnThresholdMs,
aggregateMonitorThresholdMs
} = getEnv()

// stores
const aggregatorAggregateStore = createAggregatorAggregateStoreClient(
{ region: aggregatorAggregateStoreTableRegion },
{ tableName: aggregatorAggregateStoreTableName.tableName }
)
const dealerAggregateStore = createDealerAggregateStoreClient({
region: dealerAggregateStoreTableRegion
}, {
tableName: dealerAggregateStoreTableName.tableName
})

const { error } = await dealMonitorAlertTick({
aggregatorAggregateStore,
dealerAggregateStore,
oldestPieceAlertThresholdMs,
oldestPieceWarnThresholdMs,
aggregateMonitorThresholdMs
})

if (error) {
console.error(error)
return {
statusCode: 500,
body: error.message
}
}

return {
statusCode: 200,
}
}

/**
* Get Env validating it is set.
*/
function getEnv () {
return {
aggregatorAggregateStoreTableName: Table['aggregator-aggregate-store'],
aggregatorAggregateStoreTableRegion: mustGetEnv('AWS_REGION'),
dealerAggregateStoreTableName: Table['dealer-aggregate-store'],
dealerAggregateStoreTableRegion: mustGetEnv('AWS_REGION'),
oldestPieceAlertThresholdMs: parseInt(mustGetEnv('OLDEST_PIECE_ALERT_THRESHOLD_MS')),
oldestPieceWarnThresholdMs: parseInt(mustGetEnv('OLDEST_PIECE_WARN_THRESHOLD_MS')),
aggregateMonitorThresholdMs: parseInt(mustGetEnv('AGGREGATE_MONITOR_THRESHOLD_MS')),
}
}

export const main = Sentry.AWSLambda.wrapHandler(handleEvent)
32 changes: 30 additions & 2 deletions stacks/api-stack.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { Api, Config, use } from 'sst/constructs'
import { Api, Config, Cron, use } from 'sst/constructs'

import { DataStack } from './data-stack.js'
import { AggregatorStack } from './aggregator-stack.js'
import {
getApiPackageJson,
getGitInfo,
getCustomDomain,
getEnv,
getAggregatorEnv,
getDealerEnv,
getDealTrackerEnv,
setupSentry
setupSentry,
getResourceName
} from './config.js'

/**
Expand All @@ -30,6 +32,11 @@ export function ApiStack({ app, stack }) {
DEAL_API_HOSTED_ZONE,
DEALER_DID,
} = getDealerEnv()
const {
OLDEST_PIECE_ALERT_THRESHOLD_MS,
OLDEST_PIECE_WARN_THRESHOLD_MS,
AGGREGATE_MONITOR_THRESHOLD_MS
} = getEnv()

// Setup app monitoring with Sentry
setupSentry(app, stack)
Expand Down Expand Up @@ -174,6 +181,27 @@ export function ApiStack({ app, stack }) {
},
})

// Setup `monitoring`
const dealMonitorAlertCronName = getResourceName('deal-monitor-alert-cron', stack.stage)
new Cron(stack, dealMonitorAlertCronName, {
schedule: 'rate(30 minutes)',
job: {
function: {
timeout: '5 minutes',
handler: 'packages/functions/src/monitor/handle-deal-monitor-alert-cron-tick.main',
environment: {
OLDEST_PIECE_ALERT_THRESHOLD_MS,
OLDEST_PIECE_WARN_THRESHOLD_MS,
AGGREGATE_MONITOR_THRESHOLD_MS
},
bind: [
dealerAggregateStoreTable,
aggregatorAggregateStoreTable,
],
}
}
})

stack.addOutputs({
AggregatorApiEndpoint: api.url,
AggregatorApiCustomDomain: aggregatorApiCustomDomain ? `https://${aggregatorApiCustomDomain.domainName}` : 'Set AGGREGATOR_HOSTED_ZONE in env to deploy to a custom domain',
Expand Down
13 changes: 10 additions & 3 deletions stacks/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ import { RemovalPolicy } from 'aws-cdk-lib'
import git from 'git-rev-sync'
import * as pack from '../package.json'

export const DEFAULT_FERRY_CARGO_MAX_SIZE = 127*(1<<28)
export const DEFAULT_FERRY_CARGO_MIN_SIZE = 1+127*(1<<27)
// 72 Hours
export const DEFAULT_OLDEST_PIECE_ALERT_THRESHOLD_MS = String(72 * 60 * 60 * 1000)
// 60 Hours
export const DEFAULT_OLDEST_PIECE_WARN_THRESHOLD_MS = String(60 * 60 * 60 * 1000)
// 48 Hours
export const DEFAULT_AGGREGATE_MONITOR_THRESHOLD_MS = String(48 * 60 * 60 * 1000)

/**
* Get nicer resources name
Expand Down Expand Up @@ -105,7 +109,10 @@ export function setupSentry (app, stack) {
export function getEnv() {
return {
SENTRY_DSN: mustGetEnv('SENTRY_DSN'),
UCAN_LOG_URL: mustGetEnv('UCAN_LOG_URL')
UCAN_LOG_URL: mustGetEnv('UCAN_LOG_URL'),
OLDEST_PIECE_ALERT_THRESHOLD_MS: process.env.OLDEST_PIECE_ALERT_THRESHOLD_MS || DEFAULT_OLDEST_PIECE_ALERT_THRESHOLD_MS,
OLDEST_PIECE_WARN_THRESHOLD_MS: process.env.OLDEST_PIECE_WARN_THRESHOLD_MS || DEFAULT_OLDEST_PIECE_WARN_THRESHOLD_MS,
AGGREGATE_MONITOR_THRESHOLD_MS: process.env.AGGREGATE_MONITOR_THRESHOLD_MS || DEFAULT_AGGREGATE_MONITOR_THRESHOLD_MS
}
}

Expand Down

0 comments on commit 5d1cb0d

Please sign in to comment.