-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathCalculateRootJobStatusProcessor.php
81 lines (65 loc) · 2.03 KB
/
CalculateRootJobStatusProcessor.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
<?php
namespace Enqueue\JobQueue;
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Client\ProducerInterface;
use Enqueue\Consumption\Result;
use Enqueue\JobQueue\Doctrine\JobStorage;
use Enqueue\Util\JSON;
use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Psr\Log\LoggerInterface;
class CalculateRootJobStatusProcessor implements Processor, CommandSubscriberInterface
{
/**
* @var JobStorage
*/
private $jobStorage;
/**
* @var CalculateRootJobStatusService
*/
private $calculateRootJobStatusService;
/**
* @var ProducerInterface
*/
private $producer;
/**
* @var LoggerInterface
*/
private $logger;
public function __construct(
JobStorage $jobStorage,
CalculateRootJobStatusService $calculateRootJobStatusCase,
ProducerInterface $producer,
LoggerInterface $logger,
) {
$this->jobStorage = $jobStorage;
$this->calculateRootJobStatusService = $calculateRootJobStatusCase;
$this->producer = $producer;
$this->logger = $logger;
}
public function process(Message $message, Context $context)
{
$data = JSON::decode($message->getBody());
if (!isset($data['jobId'])) {
$this->logger->critical(sprintf('Got invalid message. body: "%s"', $message->getBody()));
return Result::REJECT;
}
$job = $this->jobStorage->findJobById($data['jobId']);
if (!$job) {
$this->logger->critical(sprintf('Job was not found. id: "%s"', $data['jobId']));
return Result::REJECT;
}
$isRootJobStopped = $this->calculateRootJobStatusService->calculate($job);
if ($isRootJobStopped) {
$this->producer->sendEvent(Topics::ROOT_JOB_STOPPED, [
'jobId' => $job->getRootJob()->getId(),
]);
}
return Result::ACK;
}
public static function getSubscribedCommand()
{
return Commands::CALCULATE_ROOT_JOB_STATUS;
}
}