Skip to content

Commit

Permalink
Merge pull request #3 from andsm/subscribe
Browse files Browse the repository at this point in the history
Removed re-subscription
  • Loading branch information
KonstantinCodes authored Oct 6, 2019
2 parents 7ef9678 + 4716da4 commit 3f559f9
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions Messenger/KafkaTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class KafkaTransport implements TransportInterface
/** @var bool */
private $commitAsync;

/** @var bool */
private $subscribed;

public function __construct(
LoggerInterface $logger,
SerializerInterface $serializer,
Expand All @@ -52,15 +55,19 @@ public function __construct(
$this->topicName = $topicName;
$this->timeoutMs = $timeoutMs;
$this->commitAsync = $commitAsync;
$this->subscribed = false;
}

public function get(): iterable
{
$consumer = $this->getConsumer();

$consumer->subscribe([$this->topicName]);
if (false === $this->subscribed) {
$consumer->subscribe([$this->topicName]);
$this->logger->info('Partition assignment...');

$this->logger->info('Partition assignment...');
$this->subscribed = true;
}

$message = $consumer->consume($this->timeoutMs);
switch ($message->err) {
Expand Down

0 comments on commit 3f559f9

Please sign in to comment.