Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message publishing optimization #75

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
22 changes: 15 additions & 7 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

final class Adapter implements AdapterInterface
{
private ?AMQPMessage $message = null;

public function __construct(
private QueueProviderInterface $queueProvider,
private MessageSerializerInterface $serializer,
Expand Down Expand Up @@ -52,10 +54,10 @@ public function status(string $id): JobStatus
public function push(MessageInterface $message): void
{
$payload = $this->serializer->serialize($message);
$amqpMessage = new AMQPMessage(
$payload,
array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
);
$amqpMessage = $this->getAmqpMessage();
$amqpMessage->setBody($payload);
$amqpMessage->set('message_id', $message->getId());

$exchangeSettings = $this->queueProvider->getExchangeSettings();
$this->queueProvider
->getChannel()
Expand All @@ -66,9 +68,6 @@ public function push(MessageInterface $message): void
->getQueueSettings()
->getName()
);
/** @var string $messageId */
$messageId = $amqpMessage->get('message_id');
$message->setId($messageId);
}

public function subscribe(callable $handlerCallback): void
Expand Down Expand Up @@ -117,4 +116,13 @@ public function withQueueProvider(QueueProviderInterface $queueProvider): self

return $new;
}

private function getAmqpMessage(): AMQPMessage
{
if ($this->message === null) {
$this->message = new AMQPMessage('', $this->queueProvider->getMessageProperties());
}

return $this->message;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scrutinizer review

The expression return $this->message could return the type null which is incompatible with the type-hinted return PhpAmqpLib\Message\AMQPMessage. Consider adding an additional type-check to rule them out.

}
}
22 changes: 22 additions & 0 deletions src/Middleware/MessageIdGeneratingMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Yii\Queue\AMQP\Middleware;

use Yiisoft\Yii\Queue\Middleware\Push\MessageHandlerPushInterface;
use Yiisoft\Yii\Queue\Middleware\Push\MiddlewarePushInterface;
use Yiisoft\Yii\Queue\Middleware\Push\PushRequest;

final class MessageIdGeneratingMiddleware implements MiddlewarePushInterface
{
public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest
{
$message = $request->getMessage();
if ($message->getId() === null) {
$message->setId(uniqid(more_entropy: true));
}

return $handler->handlePush($request);
}
}
6 changes: 5 additions & 1 deletion tests/Unit/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Yiisoft\Yii\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Yii\Queue\AMQP\MessageSerializer;
use Yiisoft\Yii\Queue\AMQP\MessageSerializerInterface;
use Yiisoft\Yii\Queue\AMQP\Middleware\MessageIdGeneratingMiddleware;
use Yiisoft\Yii\Queue\AMQP\QueueProvider;
use Yiisoft\Yii\Queue\AMQP\QueueProviderInterface;
use Yiisoft\Yii\Queue\AMQP\Settings\Exchange as ExchangeSettings;
Expand Down Expand Up @@ -37,11 +38,14 @@ public function testStatus(): void
$message = new Message('ext-simple', null);
$queue->push(
$message,
new MessageIdGeneratingMiddleware()
);

$messageId = $message->getId();
$this->assertNotNull($messageId);
$this->expectException(NotImplementedException::class);
$this->expectExceptionMessage("Status check is not supported by the adapter $adapterClass.");
$adapter->status($message->getId());
$adapter->status($messageId);
}

/**
Expand Down
Loading