From 6d3b159198082fe974c6fde7a050010257d3f027 Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Thu, 27 Apr 2023 15:10:24 +0600 Subject: [PATCH 1/3] Message publishing optimization --- src/Adapter.php | 21 ++++++++++++++---- .../MessageIdGeneratingMiddleware.php | 22 +++++++++++++++++++ 2 files changed, 39 insertions(+), 4 deletions(-) create mode 100644 src/Middleware/MessageIdGeneratingMiddleware.php diff --git a/src/Adapter.php b/src/Adapter.php index bc9e577..4fbf847 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -14,6 +14,8 @@ final class Adapter implements AdapterInterface { + private ?AMQPMessage $message = null; + public function __construct( private QueueProviderInterface $queueProvider, private MessageSerializerInterface $serializer, @@ -52,10 +54,12 @@ public function status(string $id): JobStatus public function push(MessageInterface $message): void { $payload = $this->serializer->serialize($message); - $amqpMessage = new AMQPMessage( - $payload, - array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties()) - ); + $amqpMessage = $this->getAmqpMessage(); + $amqpMessage->setBody($payload); + if ($message->getId() !== null) { + $amqpMessage->set('message_id', $message->getId()); + } + $exchangeSettings = $this->queueProvider->getExchangeSettings(); $this->queueProvider ->getChannel() @@ -115,4 +119,13 @@ public function withQueueProvider(QueueProviderInterface $queueProvider): self return $new; } + + private function getAmqpMessage(): AMQPMessage + { + if ($this->message === null) { + $this->message = new AMQPMessage('', $this->queueProvider->getMessageProperties()); + } + + return $this->message; + } } diff --git a/src/Middleware/MessageIdGeneratingMiddleware.php b/src/Middleware/MessageIdGeneratingMiddleware.php new file mode 100644 index 0000000..fee0257 --- /dev/null +++ b/src/Middleware/MessageIdGeneratingMiddleware.php @@ -0,0 +1,22 @@ +getMessage(); + if ($message->getId() === null) { + $message->setId(uniqid(more_entropy: true)); + } + + return $handler->handlePush($request); + } +} From 9dd70e5581b9c6774af2c42dd526e21215e94e1a Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Thu, 27 Apr 2023 16:37:34 +0600 Subject: [PATCH 2/3] Message publishing optimization: bugfix --- src/Adapter.php | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index 4fbf847..1a5cb60 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -56,9 +56,7 @@ public function push(MessageInterface $message): void $payload = $this->serializer->serialize($message); $amqpMessage = $this->getAmqpMessage(); $amqpMessage->setBody($payload); - if ($message->getId() !== null) { - $amqpMessage->set('message_id', $message->getId()); - } + $amqpMessage->set('message_id', $message->getId()); $exchangeSettings = $this->queueProvider->getExchangeSettings(); $this->queueProvider @@ -70,9 +68,6 @@ public function push(MessageInterface $message): void ->getQueueSettings() ->getName() ); - /** @var string $messageId */ - $messageId = $amqpMessage->get('message_id'); - $message->setId($messageId); } public function subscribe(callable $handlerCallback): void From 92b3a7e6c6abd8489c871a425bfcc0569d6fb0fd Mon Sep 17 00:00:00 2001 From: s1lver Date: Thu, 11 May 2023 15:26:12 +0300 Subject: [PATCH 3/3] Used middleware for generate id now --- tests/Unit/QueueTest.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index 7fb8d24..fc10383 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -8,6 +8,7 @@ use Yiisoft\Yii\Queue\AMQP\Adapter; use Yiisoft\Yii\Queue\AMQP\Exception\NotImplementedException; use Yiisoft\Yii\Queue\AMQP\MessageSerializer; +use Yiisoft\Yii\Queue\AMQP\Middleware\MessageIdGeneratingMiddleware; use Yiisoft\Yii\Queue\AMQP\QueueProvider; use Yiisoft\Yii\Queue\AMQP\Settings\Exchange as ExchangeSettings; use Yiisoft\Yii\Queue\AMQP\Settings\Queue as QueueSettings; @@ -33,10 +34,13 @@ public function testStatus(): void $message = new Message('ext-simple', null); $queue->push( $message, + new MessageIdGeneratingMiddleware() ); + $messageId = $message->getId(); + $this->assertNotNull($messageId); $this->expectException(NotImplementedException::class); - $adapter->status($message->getId()); + $adapter->status($messageId); } /**