diff --git a/packages/core/src/dealer/deal-monitor-alert-tick.js b/packages/core/src/dealer/deal-monitor-alert-tick.js new file mode 100644 index 0000000..c81aae1 --- /dev/null +++ b/packages/core/src/dealer/deal-monitor-alert-tick.js @@ -0,0 +1,143 @@ +/** + * @typedef {import('@web3-storage/data-segment').PieceLink} PieceLink + * @typedef {import('@web3-storage/filecoin-api/aggregator/api').AggregateStore} AggregatorAggregateStore + * @typedef {import('@web3-storage/filecoin-api/dealer/api').AggregateStore} DealerAggregateStore + * @typedef {import('@web3-storage/filecoin-api/dealer/api').AggregateRecord} AggregateRecord + * + * @typedef {object} MonitorContext + * @property {AggregatorAggregateStore} context.aggregatorAggregateStore + * @property {DealerAggregateStore} context.dealerAggregateStore + * @property {number} context.oldestPieceAlertThresholdMs + * @property {number} context.oldestPieceWarnThresholdMs + * @property {number} context.aggregateMonitorThresholdMs + * + * @typedef {Object} Notification + * @property {PieceLink} aggregate + * @property {number} duration + */ + +/** + * On CRON tick, get aggregates without deals, and verify if there + * + * @param {MonitorContext} context + */ +export async function dealMonitorAlertTick (context) { + // TODO: URL, ... + + // Get offered deals pending approval/rejection + const offeredAggregates = await context.dealerAggregateStore.query({ + status: 'offered', + }) + if (offeredAggregates.error) { + return { + error: offeredAggregates.error, + } + } + + // Get offered aggregates to monitor + const offeredAggregatesToMonitor = [] + const currentTime = Date.now() + for (const offeredAggregate of offeredAggregates.ok) { + const offerTime = (new Date(offeredAggregate.insertedAt)).getTime() + // Monitor if offer time + monitor threshold is bigger than current time + if (offerTime + context.aggregateMonitorThresholdMs > currentTime) { + offeredAggregatesToMonitor.push(offeredAggregate) + } + } + + // Get aggregates duration + const monitoredAggregatesResponse = await Promise.all( + offeredAggregatesToMonitor.map(aggregate => monitorAggregate(aggregate, context)) + ) + // Fail if any failed to get information + const monitoredAggregatesErrorResponse = monitoredAggregatesResponse.find(r => r?.error) + if (monitoredAggregatesErrorResponse) { + return { + error: monitoredAggregatesErrorResponse.error + } + } + + const toWarn = /** @typedef {Notification[]} */ ([]) + const toAlert = /** @typedef {Notification[]} */ ([]) + + // Verify if monitored aggregates should create notifications + for (const res of monitoredAggregatesResponse) { + // @ts-ignore if not ok, should have failed before + const duration = /** @type {number} */ (res.ok?.duration) + // @ts-ignore if not ok, should have failed before + const aggregate = /** @type {import('@web3-storage/data-segment').PieceLink} */ (res.ok?.aggregate) + + if (duration > context.oldestPieceAlertThresholdMs) { + toAlert.push({ + aggregate, + duration + }) + } else if (duration > context.oldestPieceWarnThresholdMs) { + toWarn.push({ + aggregate, + duration + }) + } + } + + // Send alerts + + + return { + ok: {} + } +} + +/** + * @param {AggregateRecord} aggregateRecord + * @param {MonitorContext} context + */ +async function monitorAggregate (aggregateRecord, context) { + const getAggregateInfo = await context.aggregatorAggregateStore.get({ + aggregate: aggregateRecord.aggregate + }) + if (getAggregateInfo.error) { + return { + error: getAggregateInfo.error + } + } + + // Get aggregate current duration + const currentTime = Date.now() + // @ts-expect-error needs updated dep + const offerTime = (new Date(getAggregateInfo.ok.oldestPieceInsertedAt)).getTime() + + return { + ok: { + aggregate: aggregateRecord.aggregate, + duration: currentTime - offerTime + } + } + + // // Verify if already in alert threshold + // if (offerTime + context.oldestPieceAlertThresholdMs > currentTime) { + // return { + // ok: { + // alert: { + // aggregate: aggregateRecord.aggregate, + // duration: currentTime - offerTime + // } + // } + // } + // // Verify if already in warn threshold + // } else if (offerTime + context.oldestPieceWarnThresholdMs > currentTime) { + // return { + // ok: { + // warn: { + // aggregate: aggregateRecord.aggregate, + // duration: currentTime - offerTime + // } + // } + // } + // } + // return { + // ok: { + + // } + // } +} diff --git a/packages/functions/src/monitor/handle-deal-monitor-alert-cron-tick.js b/packages/functions/src/monitor/handle-deal-monitor-alert-cron-tick.js new file mode 100644 index 0000000..e7648e1 --- /dev/null +++ b/packages/functions/src/monitor/handle-deal-monitor-alert-cron-tick.js @@ -0,0 +1,76 @@ +import * as Sentry from '@sentry/serverless' +import { Table } from 'sst/node/table' + +// store clients +import { createClient as createAggregatorAggregateStoreClient } from '@w3filecoin/core/src/store/aggregator-aggregate-store.js' +import { createClient as createDealerAggregateStoreClient } from '@w3filecoin/core/src/store/dealer-aggregate-store.js' + +import { dealMonitorAlertTick } from '@w3filecoin/core/src/dealer/deal-monitor-alert-tick.js' + +import { mustGetEnv } from '../utils.js' + +Sentry.AWSLambda.init({ + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0, +}) + +async function handleEvent () { + const { + aggregatorAggregateStoreTableName, + aggregatorAggregateStoreTableRegion, + dealerAggregateStoreTableName, + dealerAggregateStoreTableRegion, + oldestPieceAlertThresholdMs, + oldestPieceWarnThresholdMs, + aggregateMonitorThresholdMs + } = getEnv() + + // stores + const aggregatorAggregateStore = createAggregatorAggregateStoreClient( + { region: aggregatorAggregateStoreTableRegion }, + { tableName: aggregatorAggregateStoreTableName.tableName } + ) + const dealerAggregateStore = createDealerAggregateStoreClient({ + region: dealerAggregateStoreTableRegion + }, { + tableName: dealerAggregateStoreTableName.tableName + }) + + const { error } = await dealMonitorAlertTick({ + aggregatorAggregateStore, + dealerAggregateStore, + oldestPieceAlertThresholdMs, + oldestPieceWarnThresholdMs, + aggregateMonitorThresholdMs + }) + + if (error) { + console.error(error) + return { + statusCode: 500, + body: error.message + } + } + + return { + statusCode: 200, + } +} + +/** + * Get Env validating it is set. + */ +function getEnv () { + return { + aggregatorAggregateStoreTableName: Table['aggregator-aggregate-store'], + aggregatorAggregateStoreTableRegion: mustGetEnv('AWS_REGION'), + dealerAggregateStoreTableName: Table['dealer-aggregate-store'], + dealerAggregateStoreTableRegion: mustGetEnv('AWS_REGION'), + oldestPieceAlertThresholdMs: parseInt(mustGetEnv('OLDEST_PIECE_ALERT_THRESHOLD_MS')), + oldestPieceWarnThresholdMs: parseInt(mustGetEnv('OLDEST_PIECE_WARN_THRESHOLD_MS')), + aggregateMonitorThresholdMs: parseInt(mustGetEnv('AGGREGATE_MONITOR_THRESHOLD_MS')), + } +} + +export const main = Sentry.AWSLambda.wrapHandler(handleEvent) diff --git a/stacks/api-stack.js b/stacks/api-stack.js index 5a876f8..d4aa573 100644 --- a/stacks/api-stack.js +++ b/stacks/api-stack.js @@ -1,4 +1,4 @@ -import { Api, Config, use } from 'sst/constructs' +import { Api, Config, Cron, use } from 'sst/constructs' import { DataStack } from './data-stack.js' import { AggregatorStack } from './aggregator-stack.js' @@ -6,10 +6,12 @@ import { getApiPackageJson, getGitInfo, getCustomDomain, + getEnv, getAggregatorEnv, getDealerEnv, getDealTrackerEnv, - setupSentry + setupSentry, + getResourceName } from './config.js' /** @@ -30,6 +32,11 @@ export function ApiStack({ app, stack }) { DEAL_API_HOSTED_ZONE, DEALER_DID, } = getDealerEnv() + const { + OLDEST_PIECE_ALERT_THRESHOLD_MS, + OLDEST_PIECE_WARN_THRESHOLD_MS, + AGGREGATE_MONITOR_THRESHOLD_MS + } = getEnv() // Setup app monitoring with Sentry setupSentry(app, stack) @@ -174,6 +181,27 @@ export function ApiStack({ app, stack }) { }, }) + // Setup `monitoring` + const dealMonitorAlertCronName = getResourceName('deal-monitor-alert-cron', stack.stage) + new Cron(stack, dealMonitorAlertCronName, { + schedule: 'rate(30 minutes)', + job: { + function: { + timeout: '5 minutes', + handler: 'packages/functions/src/monitor/handle-deal-monitor-alert-cron-tick.main', + environment: { + OLDEST_PIECE_ALERT_THRESHOLD_MS, + OLDEST_PIECE_WARN_THRESHOLD_MS, + AGGREGATE_MONITOR_THRESHOLD_MS + }, + bind: [ + dealerAggregateStoreTable, + aggregatorAggregateStoreTable, + ], + } + } + }) + stack.addOutputs({ AggregatorApiEndpoint: api.url, AggregatorApiCustomDomain: aggregatorApiCustomDomain ? `https://${aggregatorApiCustomDomain.domainName}` : 'Set AGGREGATOR_HOSTED_ZONE in env to deploy to a custom domain', diff --git a/stacks/config.js b/stacks/config.js index 539b5a8..432c6bf 100644 --- a/stacks/config.js +++ b/stacks/config.js @@ -2,8 +2,12 @@ import { RemovalPolicy } from 'aws-cdk-lib' import git from 'git-rev-sync' import * as pack from '../package.json' -export const DEFAULT_FERRY_CARGO_MAX_SIZE = 127*(1<<28) -export const DEFAULT_FERRY_CARGO_MIN_SIZE = 1+127*(1<<27) +// 72 Hours +export const DEFAULT_OLDEST_PIECE_ALERT_THRESHOLD_MS = String(72 * 60 * 60 * 1000) +// 60 Hours +export const DEFAULT_OLDEST_PIECE_WARN_THRESHOLD_MS = String(60 * 60 * 60 * 1000) +// 48 Hours +export const DEFAULT_AGGREGATE_MONITOR_THRESHOLD_MS = String(48 * 60 * 60 * 1000) /** * Get nicer resources name @@ -105,7 +109,10 @@ export function setupSentry (app, stack) { export function getEnv() { return { SENTRY_DSN: mustGetEnv('SENTRY_DSN'), - UCAN_LOG_URL: mustGetEnv('UCAN_LOG_URL') + UCAN_LOG_URL: mustGetEnv('UCAN_LOG_URL'), + OLDEST_PIECE_ALERT_THRESHOLD_MS: process.env.OLDEST_PIECE_ALERT_THRESHOLD_MS || DEFAULT_OLDEST_PIECE_ALERT_THRESHOLD_MS, + OLDEST_PIECE_WARN_THRESHOLD_MS: process.env.OLDEST_PIECE_WARN_THRESHOLD_MS || DEFAULT_OLDEST_PIECE_WARN_THRESHOLD_MS, + AGGREGATE_MONITOR_THRESHOLD_MS: process.env.AGGREGATE_MONITOR_THRESHOLD_MS || DEFAULT_AGGREGATE_MONITOR_THRESHOLD_MS } }