Skip to content

Commit

Permalink
Merge pull request #763 from bcnmy/fix/add-rabbitmq-try-catch-handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
TheDivic authored Jan 24, 2025
2 parents 32ceffb + c67c0dc commit 6ce29ac
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 84 deletions.
3 changes: 2 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,8 @@
"adminAddresses": [],
"callGasLimitMarkup": {
"100": 20000,
"88888": 20000
"88888": 20000,
"81457": 50000
},
"hardcodedGasLimits": {
"84532": {
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ services:
- mongo-data:/data/db

rabbitmq:
image: heidiks/rabbitmq-delayed-message-exchange:3.8.3-management
image: heidiks/rabbitmq-delayed-message-exchange:3.13.0-management
container_name: "rabbitmq"
healthcheck:
test: rabbitmq-diagnostics -q ping
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"@types/crypto-js": "^4.1.1",
"@types/express-prometheus-middleware": "^1.2.1",
"@types/node": "^22.10.1",
"amqplib": "^0.10.3",
"amqplib": "^0.10.5",
"async-mutex": "^0.4.0",
"axios": "^1.1.2",
"axios-retry": "^4.5.0",
Expand Down Expand Up @@ -66,7 +66,7 @@
"@biconomy/sdk": "^0.0.29",
"@eslint/js": "^9.13.0",
"@rhinestone/module-sdk": "^0.1.32",
"@types/amqplib": "^0.8.2",
"@types/amqplib": "^0.10.6",
"@types/big.js": "^6.1.5",
"@types/consolidate": "^0.14.1",
"@types/cors": "^2.8.12",
Expand Down
4 changes: 2 additions & 2 deletions src/common/interface/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { ConsumeMessage } from "amqplib";
import { ConsumeMessage, Connection } from "amqplib";

export interface IQueue<TransactionMessageType> {
chainId: number;
connect(): Promise<void>;
connect(connection: Connection): Promise<void>;
publish(arg0: TransactionMessageType): Promise<boolean>;
consume(onMessageReceived: () => void): Promise<boolean>;
ack(arg0: ConsumeMessage): Promise<void>;
Expand Down
92 changes: 61 additions & 31 deletions src/common/queue/BundlerTransactionQueue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import amqp, { Channel, ConsumeMessage } from "amqplib";
import nodeconfig from "config";
import { Channel, ConsumeMessage, Connection} from "amqplib";
import { logger } from "../logger";
import { SendUserOperation, TransactionType } from "../types";
import { IQueue } from "./interface/IQueue";
Expand All @@ -10,8 +9,7 @@ const log = logger.child({
module: module.filename.split("/").slice(-4).join("/"),
});

const queueUrl =
process.env.BUNDLER_QUEUE_URL || nodeconfig.get<string>("queueUrl");


export class BundlerTransactionQueue implements IQueue<SendUserOperation> {
readonly chainId: number;
Expand All @@ -36,13 +34,25 @@ export class BundlerTransactionQueue implements IQueue<SendUserOperation> {
this.queueName = `relayer_queue_${this.chainId}_type_${this.transactionType}`;
}

async connect() {
const connection = await amqp.connect(queueUrl);
if (!this.channel) {
this.channel = await connection.createChannel();
this.channel.assertExchange(this.exchangeName, this.exchangeType, {
durable: true,
});
async connect(connection: Connection) {
const _log = log.child({
chainId: this.chainId,
transactionType: this.transactionType,
queueName: this.queueName,
exchangeKey: this.exchangeKey,
});

try {
if (!this.channel) {
this.channel = await connection.createChannel();
this.channel.assertExchange(this.exchangeName, this.exchangeType, {
durable: true,
}).catch((err) => {
_log.error({ err }, `BundlerTransactionQueue:: assertExchange() failed`);
});
}
} catch (err) {
_log.error({ err }, `BundlerTransactionQueue:: Error while connecting to the queue`);
}
}

Expand All @@ -54,52 +64,72 @@ export class BundlerTransactionQueue implements IQueue<SendUserOperation> {
transactionId: data.transactionId,
});

_log.info(`Publishing data to retry queue`);
_log.info(`BundlerTransactionQueue:: Publishing data to retry queue`);

if (shouldDiscardStaleMessage(this.chainId, data, Date.now())) {
_log.warn(`Discarding message because it's stale`);
try {
if (shouldDiscardStaleMessage(this.chainId, data, Date.now())) {
_log.warn(`BundlerTransactionQueue:: Discarding message because it's stale`);
return true;
}

this.channel.publish(
this.exchangeName,
key,
Buffer.from(customJSONStringify(data)),
{
persistent: true,
},
);
return true;
} catch (err) {
_log.error({ err }, `BundlerTransactionQueue:: Error while publishing the data to the queue`);
return false;
}

this.channel.publish(
this.exchangeName,
key,
Buffer.from(customJSONStringify(data)),
{
persistent: true,
},
);
return true;
}

async consume(onMessageReceived: () => void) {
this.channel.prefetch(this.prefetch);

const _log = log.child({
chainId: this.chainId,
transactionType: this.transactionType,
queueName: this.queueName,
exchangeKey: this.exchangeKey,
});

_log.info(`Setting up consumer for queue`);
this.channel.prefetch(this.prefetch).catch((err) => {
_log.error({ err }, `BundlerTransactionQueue:: Error while prefetching`);
});

_log.info(`BundlerTransactionQueue:: Setting up consumer for queue`);
try {
// setup a consumer
const queue = await this.channel.assertQueue(this.queueName);

this.channel.bindQueue(queue.queue, this.exchangeName, this.exchangeKey);
this.channel.bindQueue(queue.queue, this.exchangeName, this.exchangeKey).catch((err) => {
_log.error({ err }, `BundlerTransactionQueue:: Error while binding queue`);
});

_log.info(`Waiting for transactions...`);
_log.info(`BundlerTransactionQueue:: Waiting for transactions...`);
await this.channel.consume(queue.queue, onMessageReceived);

return true;
} catch (err) {
_log.error({ err }, `Error while consuming queue`);
_log.error({ err }, `BundlerTransactionQueue:: Error while consuming queue`);
return false;
}
}

async ack(data: ConsumeMessage) {
this.channel.ack(data);
const _log = log.child({
chainId: this.chainId,
transactionType: this.transactionType,
queueName: this.queueName,
exchangeKey: this.exchangeKey,
});

try {
this.channel.ack(data);
} catch (err) {
_log.error({ err }, `BundlerTransactionQueue:: Error while acknowledging `);
}
}
}
104 changes: 68 additions & 36 deletions src/common/queue/RetryTransactionHandlerQueue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import amqp, { Channel, ConsumeMessage, Replies } from "amqplib";
import { Channel, ConsumeMessage, Replies, Connection } from "amqplib";
import nodeconfig from "config";
import { logger } from "../logger";
import { IQueue } from "./interface/IQueue";
Expand Down Expand Up @@ -48,16 +48,28 @@ export class RetryTransactionHandlerQueue
: 30_000;
}

async connect() {
const connection = await amqp.connect(this.queueUrl);
if (!this.channel) {
this.channel = await connection.createChannel();
this.channel.assertExchange(this.exchangeName, this.exchangeType, {
durable: true,
arguments: {
"x-delayed-type": "direct",
},
});
async connect(connection: Connection) {
const _log = log.child({
chainId: this.chainId,
queueName: this.queueName,
exchangeName: this.exchangeName,
nodePathIndex: this.nodePathIndex,
});

try {
if (!this.channel) {
this.channel = await connection.createChannel();
this.channel.assertExchange(this.exchangeName, this.exchangeType, {
durable: true,
arguments: {
"x-delayed-type": "direct",
},
}).catch((err) => {
_log.error({ err }, `RetryTransactionHandlerQueue:: Error while calling assertExchange()`);
});
}
} catch (err) {
_log.error({ err }, `RetryTransactionHandlerQueue:: Error while connecting to the queue`);
}
}

Expand All @@ -69,28 +81,33 @@ export class RetryTransactionHandlerQueue
transactionId: data.transactionId,
});

_log.info({ data }, `Publishing data to retry queue`);

if (shouldDiscardStaleMessage(this.chainId, data, Date.now())) {
_log.warn(`Discarding message because it's stale`);
return true;
}
_log.info({ data }, `RetryTransactionHandlerQueue:: Publishing data to retry queue`);

if (this.channel) {
this.channel.publish(
this.exchangeName,
key,
Buffer.from(customJSONStringify(data)),
{
persistent: true,
headers: {
"x-delay": this.retryTransactionInterval,
try {
if (shouldDiscardStaleMessage(this.chainId, data, Date.now())) {
_log.warn(`RetryTransactionHandlerQueue:: Discarding message because it's stale`);
return true;
}

if (this.channel) {
this.channel.publish(
this.exchangeName,
key,
Buffer.from(customJSONStringify(data)),
{
persistent: true,
headers: {
"x-delay": this.retryTransactionInterval,
},
},
},
);
return true;
);
return true;
}
return false;
} catch (err) {
_log.error({ err }, `RetryTransactionHandlerQueue:: Error while publishing the data to the queue`);
return false;
}
return false;
}

async consume(onMessageReceived: () => void) {
Expand All @@ -101,34 +118,49 @@ export class RetryTransactionHandlerQueue
nodePathIndex: this.nodePathIndex,
});

_log.info(`Setting up consumer for retry transaction queue`);
this.channel.prefetch(1);
_log.info(`RetryTransactionHandlerQueue:: Setting up consumer for retry transaction queue`);
this.channel.prefetch(1).catch((err) => {
_log.error({ err }, `RetryTransactionHandlerQueue:: Error while prefetching`);
});
try {
// setup a consumer
const retryTransactionQueue: Replies.AssertQueue =
await this.channel.assertQueue(`${this.queueName}_${this.chainId}`);
const key = `retry_chainid.${this.chainId}_${this.nodePathIndex}`;

_log.info(`Waiting for retry transactions`);
_log.info(`RetryTransactionHandlerQueue:: Waiting for retry transactions`);

this.channel.bindQueue(
retryTransactionQueue.queue,
this.exchangeName,
key,
);
).catch((err) => {
_log.error({ err }, `RetryTransactionHandlerQueue:: Error while binging queue`);
});
await this.channel.consume(
retryTransactionQueue.queue,
onMessageReceived,
);

return true;
} catch (err) {
log.error({ err }, `Error while consuming retry transaction queue`);
_log.error({ err }, `RetryTransactionHandlerQueue:: Error while consuming retry transaction queue`);
return false;
}
}

async ack(data: ConsumeMessage) {
this.channel.ack(data);
const _log = log.child({
chainId: this.chainId,
queueName: this.queueName,
exchangeName: this.exchangeName,
nodePathIndex: this.nodePathIndex,
});

try {
this.channel.ack(data);
} catch (err) {
_log.error({ err }, `RetryTransactionHandlerQueue:: Error while acknowledging message`);
}
}
}
4 changes: 2 additions & 2 deletions src/common/queue/interface/IQueue.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { ConsumeMessage } from "amqplib";
import { ConsumeMessage, Connection } from "amqplib";

export interface IQueue<TransactionMessageType> {
chainId: number;
connect(): Promise<void>;
connect(connection: Connection): Promise<void>;
publish(arg0: TransactionMessageType): Promise<boolean>;
consume(onMessageReceived: () => void): Promise<boolean>;
ack(arg0: ConsumeMessage): Promise<void>;
Expand Down
Loading

0 comments on commit 6ce29ac

Please sign in to comment.