Skip to content

Commit

Permalink
Merge pull request #13 from KonstantinCodes/feature/message-decoder
Browse files Browse the repository at this point in the history
KafkaTransportTest
  • Loading branch information
KonstantinCodes authored Apr 14, 2020
2 parents 1316377 + d33b677 commit d32aff6
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 6 deletions.
10 changes: 8 additions & 2 deletions Messenger/KafkaTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Koco\Kafka\Messenger;

use Koco\Kafka\RdKafka\RdKafkaFactory;
use function json_encode;
use Psr\Log\LoggerInterface;
use const RD_KAFKA_PARTITION_UA;
Expand All @@ -23,6 +24,9 @@ class KafkaTransport implements TransportInterface
/** @var KafkaMessageDecoderInterface */
private $decoder;

/** @var RdKafkaFactory */
private $rdKafkaFactory;

/** @var KafkaConf */
private $kafkaConf;

Expand Down Expand Up @@ -51,6 +55,7 @@ public function __construct(
LoggerInterface $logger,
SerializerInterface $serializer,
KafkaMessageDecoderInterface $decoder,
RdKafkaFactory $rdKafkaFactory,
KafkaConf $kafkaConf,
string $topicName,
int $flushTimeout,
Expand All @@ -60,6 +65,7 @@ public function __construct(
$this->logger = $logger;
$this->serializer = $serializer;
$this->decoder = $decoder;
$this->rdKafkaFactory = $rdKafkaFactory;
$this->kafkaConf = $kafkaConf;
$this->topicName = $topicName;
$this->timeoutMs = $timeoutMs;
Expand Down Expand Up @@ -158,7 +164,7 @@ private function getConsumer(): KafkaConsumer
return $this->consumer;
}

$this->consumer = new KafkaConsumer($this->kafkaConf);
$this->consumer = $this->rdKafkaFactory->createConsumer($this->kafkaConf);

return $this->consumer;
}
Expand All @@ -169,7 +175,7 @@ private function getProducer(): KafkaProducer
return $this->producer;
}

$this->producer = new KafkaProducer($this->kafkaConf);
$this->producer = $this->rdKafkaFactory->createProducer($this->kafkaConf);

return $this->producer;
}
Expand Down
2 changes: 2 additions & 0 deletions Messenger/KafkaTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Koco\Kafka\Messenger;

use Koco\Kafka\RdKafka\RdKafkaFactory;
use function explode;
use Psr\Log\LoggerInterface;
use const RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
Expand Down Expand Up @@ -94,6 +95,7 @@ public function createTransport(string $dsn, array $options, SerializerInterface
$this->logger,
$serializer,
$options['decoder'] ?? new KafkaMessageJsonDecoder(),
new RdKafkaFactory(),
$conf,
$options['topic']['name'],
$options['flushTimeout'] ?? 10000,
Expand Down
20 changes: 20 additions & 0 deletions RdKafka/RdKafkaFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Koco\Kafka\RdKafka;

use RdKafka\Conf;
use RdKafka\KafkaConsumer;
use RdKafka\Producer as KafkaProducer;

class RdKafkaFactory
{
public function createConsumer(Conf $conf): KafkaConsumer
{
return new KafkaConsumer($conf);
}

public function createProducer(Conf $conf): KafkaProducer
{
return new KafkaProducer($conf);
}
}
102 changes: 98 additions & 4 deletions Tests/Unit/Messenger/KafkaTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,60 @@
namespace Koco\Kafka\Tests\Unit\Messenger;

use Koco\Kafka\Messenger\KafkaMessageDecoderInterface;
use Koco\Kafka\Messenger\KafkaMessageJsonDecoder;
use Koco\Kafka\Messenger\KafkaMessageStamp;
use Koco\Kafka\Messenger\KafkaTransport;
use Koco\Kafka\RdKafka\RdKafkaFactory;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use RdKafka\Conf as KafkaConf;
use RdKafka\KafkaConsumer;
use RdKafka\Message;
use RdKafka\Producer as KafkaProducer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class KafkaTransportTest extends TestCase
{
/** @var LoggerInterface */
/** @var MockObject|LoggerInterface */
private $mockLogger;

/** @var SerializerInterface */
/** @var MockObject|SerializerInterface */
private $mockSerializer;

/** @var KafkaMessageDecoderInterface */
/** @var MockObject|KafkaMessageDecoderInterface */
private $mockDecoder;

/** @var MockObject|KafkaConsumer */
private $mockRdKafkaConsumer;

/** @var MockObject|KafkaProducer */
private $mockRdKafkaProducer;

/** @var MockObject|RdKafkaFactory */
private $mockRdKafkaFactory;

protected function setUp(): void
{
$this->mockLogger = $this->createMock(LoggerInterface::class);

$this->mockSerializer = $this->createMock(SerializerInterface::class);

$this->mockDecoder = $this->createMock(KafkaMessageDecoderInterface::class);

// RdKafka
$this->mockRdKafkaFactory = $this->createMock(RdKafkaFactory::class);

$this->mockRdKafkaConsumer = $this->createMock(KafkaConsumer::class);
$this->mockRdKafkaFactory
->method('createConsumer')
->willReturn($this->mockRdKafkaConsumer);

$this->mockRdKafkaProducer = $this->createMock(KafkaProducer::class);
$this->mockRdKafkaFactory
->method('createProducer')
->willReturn($this->mockRdKafkaProducer);
}

public function testConstruct()
Expand All @@ -35,6 +65,7 @@ public function testConstruct()
$this->mockLogger,
$this->mockSerializer,
$this->mockDecoder,
new RdKafkaFactory(),
new KafkaConf(),
'test',
10000,
Expand All @@ -44,4 +75,67 @@ public function testConstruct()

$this->assertInstanceOf(TransportInterface::class, $transport);
}

public function testGet()
{
$this->mockRdKafkaConsumer
->method('subscribe')
->willReturn(true);

$testMessage = new Message;
$testMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR;
$testMessage->topic_name = 'test';
$testMessage->partition = 0;
$testMessage->payload = json_encode([
'body' => 'test',
'headers' => 'asdf'
]);
$testMessage->offset = 0;
$testMessage->timestamp = 1586861356;

$this->mockRdKafkaConsumer
->method('consume')
->willReturn($testMessage);

$this->mockDecoder->expects($this->once())
->method('decode')
->with($testMessage)
->willReturn(['body' => 'test', 'headers' => 'asdf']);

$this->mockSerializer->expects($this->once())
->method('decode')
->with(['body' => 'test', 'headers' => 'asdf'])
->willReturn(new Envelope(new TestMessage()));

$transport = new KafkaTransport(
$this->mockLogger,
$this->mockSerializer,
$this->mockDecoder,
$this->mockRdKafkaFactory,
new KafkaConf(),
'test',
10000,
10000,
false
);

$receivedMessages = $transport->get();
$this->assertArrayHasKey(0, $receivedMessages);

/** @var Envelope $receivedMessage */
$receivedMessage = $receivedMessages[0];
$this->assertInstanceOf(Envelope::class, $receivedMessage);
$this->assertInstanceOf(TestMessage::class, $receivedMessage->getMessage());

$stamps = $receivedMessage->all();
$this->assertCount(1, $stamps);
$this->assertArrayHasKey(KafkaMessageStamp::class, $stamps);

$kafkaMessageStamps = $stamps[KafkaMessageStamp::class];
$this->assertCount(1, $kafkaMessageStamps);

/** @var KafkaMessageStamp $kafkaMessageStamp */
$kafkaMessageStamp = $kafkaMessageStamps[0];
$this->assertEquals($testMessage, $kafkaMessageStamp->getMessage());
}
}
8 changes: 8 additions & 0 deletions Tests/Unit/Messenger/TestMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Koco\Kafka\Tests\Unit\Messenger;

class TestMessage
{
public $data;
}

0 comments on commit d32aff6

Please sign in to comment.