Skip to content

Commit

Permalink
fix: add dead letter queues to aggregator (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos authored Nov 8, 2023
1 parent b6e6163 commit 87b151f
Showing 1 changed file with 36 additions and 0 deletions.
36 changes: 36 additions & 0 deletions stacks/aggregator-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ export function AggregatorStack({ stack, app }) {
* 1st processor queue - piece/offer invocation
*/
const pieceQueueName = getResourceName('piece-queue', stack.stage)
const pieceQueueDLQ = new Queue(stack, `${pieceQueueName}-dlq`, {
cdk: { queue: { retentionPeriod: Duration.days(14) } }
})
const pieceQueue = new Queue(stack, pieceQueueName)

/**
* 2nd processor queue - buffer reducing event
*/
const bufferQueueName = getResourceName('buffer-queue', stack.stage)
const bufferQueueDLQ = new Queue(stack, `${bufferQueueName}-dlq`, {
cdk: { queue: { retentionPeriod: Duration.days(14) } }
})
const bufferQueue = new Queue(stack, bufferQueueName, {
cdk: {
queue: {
Expand All @@ -72,6 +78,9 @@ export function AggregatorStack({ stack, app }) {
* 3rd processor queue - aggregator/offer invocation
*/
const aggregateOfferQueueName = getResourceName('aggregate-offer-queue', stack.stage)
const aggregateOfferQueueDLQ = new Queue(stack, `${aggregateOfferQueueName}-dlq`, {
cdk: { queue: { retentionPeriod: Duration.days(14) } }
})
const aggregateOfferQueue = new Queue(stack, aggregateOfferQueueName, {
cdk: {
queue: {
Expand All @@ -90,6 +99,9 @@ export function AggregatorStack({ stack, app }) {
* 4th processor queue - piece/accept invocation
*/
const pieceAcceptQueueName = getResourceName('piece-accept-queue', stack.stage)
const pieceAcceptQueueDLQ = new Queue(stack, `${pieceAcceptQueueName}-dlq`, {
cdk: { queue: { retentionPeriod: Duration.days(14) } }
})
const pieceAcceptQueue = new Queue(stack, pieceAcceptQueueName)

/**
Expand All @@ -102,13 +114,17 @@ export function AggregatorStack({ stack, app }) {
aggregatorPieceStoreTable
]
},
deadLetterQueue: pieceQueueDLQ.cdk.queue,
cdk: {
eventSource: {
batchSize: 1
},
},
})

const aggregatorPieceStoreHandleInsertDLQ = new Queue(stack, `aggregator-piece-store-handle-insert-dlq`, {
cdk: { queue: { retentionPeriod: Duration.days(14) } }
})
/**
* On Piece store insert batch, buffer pieces together to resume buffer processing.
*/
Expand All @@ -125,6 +141,7 @@ export function AggregatorStack({ stack, app }) {
bufferQueue
]
},
deadLetterQueue: aggregatorPieceStoreHandleInsertDLQ.cdk.queue,
cdk: {
// https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_lambda_event_sources.DynamoEventSourceProps.html#filters
eventSource: {
Expand Down Expand Up @@ -173,6 +190,7 @@ export function AggregatorStack({ stack, app }) {
aggregateOfferQueue
]
},
deadLetterQueue: bufferQueueDLQ.cdk.queue,
cdk: {
eventSource: {
// as soon as we have 2, we can act fast and reduce to see if enough bytes
Expand All @@ -193,13 +211,20 @@ export function AggregatorStack({ stack, app }) {
aggregatorAggregateStoreTable
],
},
deadLetterQueue: aggregateOfferQueueDLQ.cdk.queue,
cdk: {
eventSource: {
batchSize: 1,
},
}
})

const aggregatorAggregateStoreHandleInsertToPieceAcceptDLQ = new Queue(stack, `aggregator-aggregate-store-handle-insert-to-piece-accept-dlq`, {
cdk: { queue: { retentionPeriod: Duration.days(14) } }
})
const aggregatorAggregateStoreHandleInsertToAggregateOfferDLQ = new Queue(stack, `aggregator-aggregate-store-handle-insert-to-aggregate-offer-dlq`, {
cdk: { queue: { retentionPeriod: Duration.days(14) } }
})
aggregatorAggregateStoreTable.addConsumers(stack, {
/**
* On Aggregate store insert, offer inserted aggregate for deal.
Expand All @@ -219,6 +244,7 @@ export function AggregatorStack({ stack, app }) {
pieceAcceptQueue
],
},
deadLetterQueue: aggregatorAggregateStoreHandleInsertToPieceAcceptDLQ.cdk.queue,
cdk: {
eventSource: {
batchSize: 1,
Expand Down Expand Up @@ -249,6 +275,7 @@ export function AggregatorStack({ stack, app }) {
aggregatorPrivateKey
]
},
deadLetterQueue: aggregatorAggregateStoreHandleInsertToAggregateOfferDLQ.cdk.queue,
cdk: {
eventSource: {
batchSize: 1,
Expand Down Expand Up @@ -280,13 +307,20 @@ export function AggregatorStack({ stack, app }) {
aggregatorInclusionStoreTable,
],
},
deadLetterQueue: pieceAcceptQueueDLQ.cdk.queue,
cdk: {
eventSource: {
batchSize: 1,
},
}
})

const aggregatorInclusionStoreHandleInsertToUpdateStateDLQ = new Queue(stack, `aggregator-inclusion-store-handle-insert-to-update-state-dlq`, {
cdk: { queue: { retentionPeriod: Duration.days(14) } }
})
const aggregatorInclusionStoreHandleInsertToPieceAcceptDLQ = new Queue(stack, `aggregator-inclusion-store-handle-insert-to-piece-accept-dlq`, {
cdk: { queue: { retentionPeriod: Duration.days(14) } }
})
aggregatorInclusionStoreTable.addConsumers(stack, {
/**
* On Inclusion store insert, piece table can be updated to reflect piece state.
Expand All @@ -299,6 +333,7 @@ export function AggregatorStack({ stack, app }) {
aggregatorPieceStoreTable,
]
},
deadLetterQueue: aggregatorInclusionStoreHandleInsertToUpdateStateDLQ.cdk.queue,
cdk: {
eventSource: {
batchSize: 1,
Expand Down Expand Up @@ -328,6 +363,7 @@ export function AggregatorStack({ stack, app }) {
aggregatorPrivateKey
]
},
deadLetterQueue: aggregatorInclusionStoreHandleInsertToPieceAcceptDLQ.cdk.queue,
cdk: {
eventSource: {
batchSize: 1,
Expand Down

0 comments on commit 87b151f

Please sign in to comment.