diff --git a/packages/core/test/aggregator-events.test.js b/packages/core/test/aggregator-events.test.js index dc51b9a..1671b0d 100644 --- a/packages/core/test/aggregator-events.test.js +++ b/packages/core/test/aggregator-events.test.js @@ -8,7 +8,7 @@ import { decode as JSONdecode } from '@ipld/dag-json' import { tesWorkflowWithMultipleQueues as test } from './helpers/context.js' import { getMockService, getConnection } from '@web3-storage/filecoin-api/test/context/service' -import { createDynamodDb, createS3, createQueue } from './helpers/resources.js' +import { createDynamodDb, createS3, createSQS, createQueue } from './helpers/resources.js' import { getStores, getQueues } from './helpers/aggregator-context.js' /** @@ -17,8 +17,27 @@ import { getStores, getQueues } from './helpers/aggregator-context.js' const queueNames = ['pieceQueue', 'bufferQueue', 'aggregateOfferQueue', 'pieceAcceptQueue'] -test.before(async t => { +test.before(async (t) => { await delay(1000) + + const { client: sqsClient } = await createSQS() + const { client: s3Client, stop: s3Stop } = await createS3({ port: 9000 }) + const { client: dynamoClient, stop: dynamoStop} = await createDynamodDb() + + Object.assign(t.context, { + s3: s3Client, + dynamoClient, + sqsClient, + stop: async () => { + await s3Stop() + await dynamoStop() + } + }) +}) + +test.beforeEach(async t => { + await delay(1000) + /** @type {Record} */ const queues = {} // /** @type {import('@aws-sdk/client-sqs').Message[]} */ @@ -26,51 +45,40 @@ test.before(async t => { const queuedMessages = new Map() for (const name of queueNames) { - const sqs = await createQueue() + const { queueUrl, queueName } = await createQueue(t.context.sqsClient) queuedMessages.set(name, []) + const queueConsumer = Consumer.create({ - queueUrl: sqs.queueUrl, - sqs: sqs.client, + queueUrl: queueUrl, + sqs: t.context.sqsClient, handleMessage: (message) => { // @ts-expect-error may not have body const decodedBytes = fromString(message.Body) const decodedMessage = JSONdecode(decodedBytes) const messages = queuedMessages.get(name) || [] messages.push(decodedMessage) - queuedMessages.set(name, messages) return Promise.resolve() } }) queues[name] = { - sqsClient: sqs.client, - queueName: sqs.queueName, - queueUrl: sqs.queueUrl, + queueName: queueName, + queueUrl: queueUrl, queueConsumer, } } - const dynamo = await createDynamodDb() + for (const [, q] of Object.entries(queues)) { + q.queueConsumer.start() + await pWaitFor(() => q.queueConsumer.isRunning) + } Object.assign(t.context, { - s3: (await createS3()).client, - dynamoClient: dynamo.client, queues, queuedMessages }) }) -test.beforeEach(async t => { - await delay(1000) - for (const name of queueNames) { - t.context.queuedMessages.set(name, []) - } - for (const [, q] of Object.entries(t.context.queues)) { - q.queueConsumer.start() - await pWaitFor(() => q.queueConsumer.isRunning) - } -}) - test.afterEach(async t => { for (const [, q] of Object.entries(t.context.queues)) { q.queueConsumer.stop() @@ -79,7 +87,7 @@ test.afterEach(async t => { }) test.after(async t => { - await delay(1000) + await t.context.stop() }) for (const [title, unit] of Object.entries(filecoinApiTest.events.aggregator)) { diff --git a/packages/core/test/aggregator-service.test.js b/packages/core/test/aggregator-service.test.js index b715228..2b16c2e 100644 --- a/packages/core/test/aggregator-service.test.js +++ b/packages/core/test/aggregator-service.test.js @@ -7,7 +7,7 @@ import { fromString } from 'uint8arrays/from-string' import { decode as JSONdecode } from '@ipld/dag-json' import { tesWorkflowWithMultipleQueues as test } from './helpers/context.js' -import { createDynamodDb, createS3, createQueue } from './helpers/resources.js' +import { createDynamodDb, createS3, createSQS, createQueue } from './helpers/resources.js' import { getStores, getQueues } from './helpers/aggregator-context.js' /** @@ -18,6 +18,25 @@ const queueNames = ['pieceQueue', 'bufferQueue', 'aggregateOfferQueue', 'pieceAc test.before(async (t) => { await delay(1000) + + const { client: sqsClient } = await createSQS() + const { client: s3Client, stop: s3Stop } = await createS3({ port: 9000 }) + const { client: dynamoClient, stop: dynamoStop} = await createDynamodDb() + + Object.assign(t.context, { + s3: s3Client, + dynamoClient, + sqsClient, + stop: async () => { + await s3Stop() + await dynamoStop() + } + }) +}) + +test.beforeEach(async t => { + await delay(1000) + /** @type {Record} */ const queues = {} // /** @type {import('@aws-sdk/client-sqs').Message[]} */ @@ -25,11 +44,12 @@ test.before(async (t) => { const queuedMessages = new Map() for (const name of queueNames) { - const sqs = await createQueue() + const { queueUrl, queueName } = await createQueue(t.context.sqsClient) queuedMessages.set(name, []) + const queueConsumer = Consumer.create({ - queueUrl: sqs.queueUrl, - sqs: sqs.client, + queueUrl: queueUrl, + sqs: t.context.sqsClient, handleMessage: (message) => { // @ts-expect-error may not have body const decodedBytes = fromString(message.Body) @@ -41,34 +61,23 @@ test.before(async (t) => { }) queues[name] = { - sqsClient: sqs.client, - queueName: sqs.queueName, - queueUrl: sqs.queueUrl, + queueName: queueName, + queueUrl: queueUrl, queueConsumer, } } - const dynamo = await createDynamodDb() + for (const [, q] of Object.entries(queues)) { + q.queueConsumer.start() + await pWaitFor(() => q.queueConsumer.isRunning) + } Object.assign(t.context, { - s3: (await createS3()).client, - dynamoClient: dynamo.client, queues, queuedMessages }) }) -test.beforeEach(async t => { - await delay(1000) - for (const name of queueNames) { - t.context.queuedMessages.set(name, []) - } - for (const [, q] of Object.entries(t.context.queues)) { - q.queueConsumer.start() - await pWaitFor(() => q.queueConsumer.isRunning) - } -}) - test.afterEach(async t => { for (const [, q] of Object.entries(t.context.queues)) { q.queueConsumer.stop() @@ -77,7 +86,7 @@ test.afterEach(async t => { }) test.after(async t => { - await delay(1000) + await t.context.stop() }) for (const [title, unit] of Object.entries(filecoinApiTest.service.aggregator)) { diff --git a/packages/core/test/deal-tracker-service.test.js b/packages/core/test/deal-tracker-service.test.js index b873aee..5b06fcf 100644 --- a/packages/core/test/deal-tracker-service.test.js +++ b/packages/core/test/deal-tracker-service.test.js @@ -9,14 +9,18 @@ import { testService as test } from './helpers/context.js' import { createDynamodDb, createTable } from './helpers/resources.js' test.before(async (t) => { - const dynamo = await createDynamodDb() + const { client: dynamoClient, stop: dynamoStop} = await createDynamodDb() Object.assign(t.context, { - dynamoClient: dynamo.client, + dynamoClient, + stop: async () => { + await dynamoStop() + } }) }) test.after(async t => { + await t.context.stop() await delay(1000) }) diff --git a/packages/core/test/dealer-events.test.js b/packages/core/test/dealer-events.test.js index 26b5810..d4ae89c 100644 --- a/packages/core/test/dealer-events.test.js +++ b/packages/core/test/dealer-events.test.js @@ -11,15 +11,21 @@ import { getMockService, getConnection } from '@web3-storage/filecoin-api/test/c import { createDynamodDb, createTable, createS3, createBucket } from '@w3filecoin/core/test/helpers/resources.js' test.before(async (t) => { - const dynamo = await createDynamodDb() + const { client: s3Client, stop: s3Stop } = await createS3({ port: 9000 }) + const { client: dynamoClient, stop: dynamoStop} = await createDynamodDb() Object.assign(t.context, { - s3: (await createS3()).client, - dynamoClient: dynamo.client, + s3: s3Client, + dynamoClient, + stop: async () => { + await s3Stop() + await dynamoStop() + } }) }) test.after(async t => { + await t.context.stop() await delay(1000) }) diff --git a/packages/core/test/dealer-service.test.js b/packages/core/test/dealer-service.test.js index 8df70ac..9ccef8c 100644 --- a/packages/core/test/dealer-service.test.js +++ b/packages/core/test/dealer-service.test.js @@ -11,15 +11,21 @@ import { getMockService, getConnection } from '@web3-storage/filecoin-api/test/c import { createDynamodDb, createTable, createS3, createBucket } from './helpers/resources.js' test.before(async (t) => { - const dynamo = await createDynamodDb() + const { client: s3Client, stop: s3Stop } = await createS3({ port: 9000 }) + const { client: dynamoClient, stop: dynamoStop} = await createDynamodDb() Object.assign(t.context, { - s3: (await createS3()).client, - dynamoClient: dynamo.client, + s3: s3Client, + dynamoClient, + stop: async () => { + await s3Stop() + await dynamoStop() + } }) }) test.after(async t => { + await t.context.stop() await delay(1000) }) diff --git a/packages/core/test/helpers/aggregator-context.js b/packages/core/test/helpers/aggregator-context.js index b12a8f6..e92f152 100644 --- a/packages/core/test/helpers/aggregator-context.js +++ b/packages/core/test/helpers/aggregator-context.js @@ -61,24 +61,24 @@ export async function getStores (ctx) { */ export function getQueues (ctx) { return { - pieceQueue: createPieceQueueClient(ctx.queues.pieceQueue.sqsClient, + pieceQueue: createPieceQueueClient(ctx.sqsClient, { queueUrl: ctx.queues.pieceQueue.queueUrl } ), - bufferQueue: createBufferQueueClient(ctx.queues.bufferQueue.sqsClient, + bufferQueue: createBufferQueueClient(ctx.sqsClient, { queueUrl: ctx.queues.bufferQueue.queueUrl, // testing is not FIFO QUEUE disableMessageGroupId: true } ), - aggregateOfferQueue: createAggregateOfferQueueClient(ctx.queues.aggregateOfferQueue.sqsClient, + aggregateOfferQueue: createAggregateOfferQueueClient(ctx.sqsClient, { queueUrl: ctx.queues.aggregateOfferQueue.queueUrl, // testing is not FIFO QUEUE disableMessageGroupId: true } ), - pieceAcceptQueue: createPieceAcceptQueueClient(ctx.queues.pieceAcceptQueue.sqsClient, + pieceAcceptQueue: createPieceAcceptQueueClient(ctx.sqsClient, { queueUrl: ctx.queues.pieceAcceptQueue.queueUrl } ), } diff --git a/packages/core/test/helpers/context.js b/packages/core/test/helpers/context.js index 9f41b67..9b3ed0c 100644 --- a/packages/core/test/helpers/context.js +++ b/packages/core/test/helpers/context.js @@ -2,7 +2,6 @@ import anyTest from 'ava' /** * @typedef {object} QueueContext - * @property {import('@aws-sdk/client-sqs').SQSClient} sqsClient * @property {string} queueName * @property {string} queueUrl * @property {import('sqs-consumer').Consumer} queueConsumer @@ -15,16 +14,20 @@ import anyTest from 'ava' * @property {import('@aws-sdk/client-s3').S3Client} s3 * * @typedef {object} MultipleQueueContext + * @property {import('@aws-sdk/client-sqs').SQSClient} sqsClient * @property {Record} queues * @property {Map} queuedMessages * + * @typedef {object} Stoppable + * @property {() => Promise} stop + * * @typedef {import('ava').TestFn} Test - * @typedef {import('ava').TestFn} TestService - * @typedef {import('ava').TestFn} TestStore + * @typedef {import('ava').TestFn} TestService + * @typedef {import('ava').TestFn} TestStore * @typedef {import('ava').TestFn} TestQueue * @typedef {import('ava').TestFn} TestWorkflow * @typedef {import('ava').TestFn} TestDealTracker - * @typedef {import('ava').TestFn} TestWorkflowWithMultipleQueues + * @typedef {import('ava').TestFn} TestWorkflowWithMultipleQueues */ // eslint-disable-next-line unicorn/prefer-export-from diff --git a/packages/core/test/helpers/resources.js b/packages/core/test/helpers/resources.js index 4d16762..5046e3a 100644 --- a/packages/core/test/helpers/resources.js +++ b/packages/core/test/helpers/resources.js @@ -25,7 +25,8 @@ export async function createDynamodDb(opts = {}) { region, endpoint }), - endpoint + endpoint, + stop: () => dbContainer.stop(), } } @@ -140,6 +141,7 @@ export async function createS3(opts = {}) { return { client: new S3Client(clientOpts), clientOpts, + stop: () => minio.stop(), } } @@ -158,7 +160,7 @@ export async function createBucket(s3) { * @param {number} [opts.port] * @param {string} [opts.region] */ -export async function createQueue(opts = {}) { +export const createSQS = async (opts = {}) => { const region = opts.region || 'us-west-2' const port = opts.port || 9324 @@ -167,25 +169,35 @@ export async function createQueue(opts = {}) { .withExposedPorts(port) .start() ) - const endpoint = `http://${queue.getHost()}:${queue.getMappedPort(port)}` const client = new SQSClient({ region, endpoint }) - const accountId = '000000000000' + + return { + client, + stop: () => queue.stop() + } +} + +/** + * @param {import('@aws-sdk/client-sqs').SQSClient} sqs + */ +export async function createQueue (sqs) { const id = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 10) const QueueName = id() - await pRetry(() => - client.send(new CreateQueueCommand({ + const res = await pRetry(() => + sqs.send(new CreateQueueCommand({ QueueName, })) ) + if (!res.QueueUrl) throw new Error('missing queue URL') + return { - client, - queueName: QueueName, - queueUrl: `${endpoint}/${accountId}/${QueueName}` + queueUrl: res.QueueUrl, + queueName: QueueName } }