From 2286d7b7a12f513ddc3c7da90d45fc72bde95a40 Mon Sep 17 00:00:00 2001 From: dusterio Date: Tue, 11 Apr 2017 11:52:12 +1000 Subject: [PATCH] + First files --- .gitignore | 3 + composer.json | 28 ++ phpunit.xml.dist | 24 ++ src/Queue/ListenCommand.php | 127 +++++++ src/Queue/Listener.php | 257 ++++++++++++++ src/Queue/ListenerOptions.php | 40 +++ src/Queue/WorkCommand.php | 196 +++++++++++ src/Queue/Worker.php | 636 ++++++++++++++++++++++++++++++++++ 8 files changed, 1311 insertions(+) create mode 100644 .gitignore create mode 100644 composer.json create mode 100644 phpunit.xml.dist create mode 100755 src/Queue/ListenCommand.php create mode 100755 src/Queue/Listener.php create mode 100644 src/Queue/ListenerOptions.php create mode 100644 src/Queue/WorkCommand.php create mode 100644 src/Queue/Worker.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9912bd5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +composer.lock +vendor/ +.idea/ diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..0671dbd --- /dev/null +++ b/composer.json @@ -0,0 +1,28 @@ +{ + "name": "dusterio/laravel-verbose", + "type": "library", + "description": "Package that adds verbosity to Laravel/Lumen built-in console commands", + "keywords": ["php","laravel","verbosity","verbose","console"], + "homepage": "https://github.com/dusterio/laravel-verbose", + "license": "MIT", + "authors": [ + { + "name": "Denis Mysenko", + "email": "denis@mysenko.com", + "homepage": "https://www.mysenko.com" + } + ], + "require": { + "php": ">=5.5.0", + "laravel/framework": "5.4.*" + }, + "require-dev": { + "phpunit/phpunit": "3.7.*", + "codeclimate/php-test-reporter": "dev-master" + }, + "autoload": { + "psr-4": { + "Dusterio\\LaravelVerbose\\": "src/" + } + } +} diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..8174f12 --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,24 @@ + + + + + tests/ + + + + + + src/ + + + diff --git a/src/Queue/ListenCommand.php b/src/Queue/ListenCommand.php new file mode 100755 index 0000000..c98e25b --- /dev/null +++ b/src/Queue/ListenCommand.php @@ -0,0 +1,127 @@ +setOutputHandler($this->listener = $listener); + } + + /** + * Execute the console command. + * + * @return void + */ + public function fire() + { + // We need to get the right queue for the connection which is set in the queue + // configuration file for the application. We will pull it based on the set + // connection being run for the queue operation currently being executed. + $queue = $this->getQueue( + $connection = $this->input->getArgument('connection') + ); + + $this->listener->listen( + $connection, $queue, $this->gatherOptions() + ); + } + + /** + * Get the name of the queue connection to listen on. + * + * @param string $connection + * @return string + */ + protected function getQueue($connection) + { + $connection = $connection ?: $this->laravel['config']['queue.default']; + + return $this->input->getOption('queue') ?: $this->laravel['config']->get( + "queue.connections.{$connection}.queue", 'default' + ); + } + + /** + * Get the listener options for the command. + * + * @return \Illuminate\Queue\ListenerOptions + */ + protected function gatherOptions() + { + return new ListenerOptions( + $this->option('env'), $this->option('delay'), + $this->option('memory'), $this->option('timeout'), + $this->option('sleep'), $this->option('tries'), + $this->option('force'), $this->resolveVerbosityParameter() + ); + } + + /** + * Set the options on the queue listener. + * + * @param \Illuminate\Queue\Listener $listener + * @return void + */ + protected function setOutputHandler(Listener $listener) + { + $listener->setOutputHandler(function ($type, $line) { + $this->output->write($line); + }); + } + + /** + * Resolve a Symfony verbosity level back to its CLI parameter. + * + * @return string|null + */ + private function resolveVerbosityParameter() + { + $currentVerbosity = $this->output->getVerbosity(); + $parameter = array_search($currentVerbosity, $this->verbosityMap); + + return $parameter ?: null; + } +} diff --git a/src/Queue/Listener.php b/src/Queue/Listener.php new file mode 100755 index 0000000..7d2f693 --- /dev/null +++ b/src/Queue/Listener.php @@ -0,0 +1,257 @@ +commandPath = $commandPath; + $this->workerCommand = $this->buildCommandTemplate(); + } + + /** + * Build the environment specific worker command. + * + * @return string + */ + protected function buildCommandTemplate() + { + $command = 'queue:work %s --once --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s'; + + return "{$this->phpBinary()} {$this->artisanBinary()} {$command}"; + } + + /** + * Get the PHP binary. + * + * @return string + */ + protected function phpBinary() + { + return ProcessUtils::escapeArgument( + (new PhpExecutableFinder)->find(false) + ); + } + + /** + * Get the Artisan binary. + * + * @return string + */ + protected function artisanBinary() + { + return defined('ARTISAN_BINARY') + ? ProcessUtils::escapeArgument(ARTISAN_BINARY) + : 'artisan'; + } + + /** + * Listen to the given queue connection. + * + * @param string $connection + * @param string $queue + * @param \Illuminate\Queue\ListenerOptions $options + * @return void + */ + public function listen($connection, $queue, ListenerOptions $options) + { + $process = $this->makeProcess($connection, $queue, $options); + + while (true) { + $this->runProcess($process, $options->memory); + } + } + + /** + * Create a new Symfony process for the worker. + * + * @param string $connection + * @param string $queue + * @param \Illuminate\Queue\ListenerOptions $options + * @return \Symfony\Component\Process\Process + */ + public function makeProcess($connection, $queue, ListenerOptions $options) + { + $command = $this->workerCommand; + + // If the environment is set, we will append it to the command string so the + // workers will run under the specified environment. Otherwise, they will + // just run under the production environment which is not always right. + if (isset($options->environment)) { + $command = $this->addEnvironment($command, $options); + } + + if (isset($options->verbosity)) { + $command = $this->addVerbosity($command, $options); + } + + // Next, we will just format out the worker commands with all of the various + // options available for the command. This will produce the final command + // line that we will pass into a Symfony process object for processing. + $command = $this->formatCommand( + $command, $connection, $queue, $options + ); + + return new Process( + $command, $this->commandPath, null, null, $options->timeout + ); + } + + /** + * Add the environment option to the given command. + * + * @param string $command + * @param \Illuminate\Queue\ListenerOptions $options + * @return string + */ + protected function addEnvironment($command, ListenerOptions $options) + { + return $command.' --env='.ProcessUtils::escapeArgument($options->environment); + } + + /** + * Resolve a Symfony verbosity level back to its CLI parameter. + * + * @param string $command + * @param \Illuminate\Queue\ListenerOptions $options + * @return string + */ + protected function addVerbosity($command, ListenerOptions $options) + { + return $command.' -'.$options->verbosity; + } + + /** + * Format the given command with the listener options. + * + * @param string $command + * @param string $connection + * @param string $queue + * @param \Illuminate\Queue\ListenerOptions $options + * @return string + */ + protected function formatCommand($command, $connection, $queue, ListenerOptions $options) + { + return sprintf( + $command, + ProcessUtils::escapeArgument($connection), + ProcessUtils::escapeArgument($queue), + $options->delay, $options->memory, + $options->sleep, $options->maxTries + ); + } + + /** + * Run the given process. + * + * @param \Symfony\Component\Process\Process $process + * @param int $memory + * @return void + */ + public function runProcess(Process $process, $memory) + { + $process->run(function ($type, $line) { + $this->handleWorkerOutput($type, $line); + }); + + // Once we have run the job we'll go check if the memory limit has been exceeded + // for the script. If it has, we will kill this script so the process manager + // will restart this with a clean slate of memory automatically on exiting. + if ($this->memoryExceeded($memory)) { + $this->stop(); + } + } + + /** + * Handle output from the worker process. + * + * @param int $type + * @param string $line + * @return void + */ + protected function handleWorkerOutput($type, $line) + { + if (isset($this->outputHandler)) { + call_user_func($this->outputHandler, $type, $line); + } + } + + /** + * Determine if the memory limit has been exceeded. + * + * @param int $memoryLimit + * @return bool + */ + public function memoryExceeded($memoryLimit) + { + return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; + } + + /** + * Stop listening and bail out of the script. + * + * @return void + */ + public function stop() + { + die; + } + + /** + * Set the output handler callback. + * + * @param \Closure $outputHandler + * @return void + */ + public function setOutputHandler(Closure $outputHandler) + { + $this->outputHandler = $outputHandler; + } +} diff --git a/src/Queue/ListenerOptions.php b/src/Queue/ListenerOptions.php new file mode 100644 index 0000000..ebe4ca8 --- /dev/null +++ b/src/Queue/ListenerOptions.php @@ -0,0 +1,40 @@ +environment = $environment; + $this->verbosity = $verbosity; + + parent::__construct($delay, $memory, $timeout, $sleep, $maxTries, $force); + } +} diff --git a/src/Queue/WorkCommand.php b/src/Queue/WorkCommand.php new file mode 100644 index 0000000..a0c37be --- /dev/null +++ b/src/Queue/WorkCommand.php @@ -0,0 +1,196 @@ +worker = $worker; + } + + /** + * Execute the console command. + * + * @return void + */ + public function fire() + { + if ($this->downForMaintenance() && $this->option('once')) { + return $this->worker->sleep($this->option('sleep')); + } + + // We'll listen to the processed and failed events so we can write information + // to the console as jobs are processed, which will let the developer watch + // which jobs are coming through a queue and be informed on its progress. + $this->listenForEvents(); + + $connection = $this->argument('connection') + ?: $this->laravel['config']['queue.default']; + + // We need to get the right queue for the connection which is set in the queue + // configuration file for the application. We will pull it based on the set + // connection being run for the queue operation currently being executed. + $queue = $this->getQueue($connection); + + $this->output->writeln("Using connection: {$connection}", OutputInterface::VERBOSITY_VERBOSE); + $this->output->writeln("Using queue: {$queue}", OutputInterface::VERBOSITY_VERBOSE); + + $this->runWorker( + $connection, $queue + ); + } + + /** + * Run the worker instance. + * + * @param string $connection + * @param string $queue + * @return array + */ + protected function runWorker($connection, $queue) + { + $this->worker->setCache($this->laravel['cache']->driver()); + + return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}( + $connection, $queue, $this->gatherWorkerOptions() + ); + } + + /** + * Gather all of the queue worker options as a single object. + * + * @return \Illuminate\Queue\WorkerOptions + */ + protected function gatherWorkerOptions() + { + return new WorkerOptions( + $this->option('delay'), $this->option('memory'), + $this->option('timeout'), $this->option('sleep'), + $this->option('tries'), $this->option('force') + ); + } + + /** + * Listen for the queue events in order to update the console output. + * + * @return void + */ + protected function listenForEvents() + { + $this->laravel['events']->listen(NoJobsAvailable::class, function () { + $this->output->writeln('The queue seems to be empty.', OutputInterface::VERBOSITY_VERBOSE); + }); + + $this->laravel['events']->listen(JobProcessing::class, function ($event) { + $this->output->writeln('Popped a job from the queue: '.$event->job->resolveName(), OutputInterface::VERBOSITY_VERY_VERBOSE); + }); + + $this->laravel['events']->listen(JobProcessed::class, function ($event) { + $this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Processed: '.$event->job->resolveName()); + }); + + $this->laravel['events']->listen(JobFailed::class, function ($event) { + $this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Failed: '.$event->job->resolveName()); + + $this->logFailedJob($event); + }); + + $this->laravel['events']->listen(Sleeping::class, function ($event) { + $this->output->writeln("Sleeping for {$event->seconds} seconds.", OutputInterface::VERBOSITY_VERY_VERBOSE); + }); + + $this->laravel['events']->listen(QueueExceptionOccurred::class, function ($event) { + $this->output->writeln($event->getMessage(), OutputInterface::VERBOSITY_VERY_VERBOSE); + $this->output->writeln("Couldn't fetch a job from the queue. See the log file for more information.", OutputInterface::VERBOSITY_VERBOSE); + }); + } + + /** + * Store a failed job event. + * + * @param JobFailed $event + * @return void + */ + protected function logFailedJob(JobFailed $event) + { + $this->laravel['queue.failer']->log( + $event->connectionName, $event->job->getQueue(), + $event->job->getRawBody(), $event->exception + ); + } + + /** + * Get the queue name for the worker. + * + * @param string $connection + * @return string + */ + protected function getQueue($connection) + { + return $this->option('queue') ?: $this->laravel['config']->get( + "queue.connections.{$connection}.queue", 'default' + ); + } + + /** + * Determine if the worker should run in maintenance mode. + * + * @return bool + */ + protected function downForMaintenance() + { + return $this->option('force') ? false : $this->laravel->isDownForMaintenance(); + } +} diff --git a/src/Queue/Worker.php b/src/Queue/Worker.php new file mode 100644 index 0000000..e00215c --- /dev/null +++ b/src/Queue/Worker.php @@ -0,0 +1,636 @@ +events = $events; + $this->manager = $manager; + $this->exceptions = $exceptions; + } + + /** + * Listen to the given queue in a loop. + * + * @param string $connectionName + * @param string $queue + * @param \Illuminate\Queue\WorkerOptions $options + * @return void + */ + public function daemon($connectionName, $queue, WorkerOptions $options) + { + $this->listenForSignals(); + + $lastRestart = $this->getTimestampOfLastQueueRestart(); + + while (true) { + // Before reserving any jobs, we will make sure this queue is not paused and + // if it is we will just pause this worker for a given amount of time and + // make sure we do not need to kill this worker process off completely. + if (! $this->daemonShouldRun($options)) { + $this->pauseWorker($options, $lastRestart); + + continue; + } + + // First, we will attempt to get the next job off of the queue. We will also + // register the timeout handler and reset the alarm for this job so it is + // not stuck in a frozen state forever. Then, we can fire off this job. + $job = $this->getNextJob( + $this->manager->connection($connectionName), $queue + ); + + $this->registerTimeoutHandler($job, $options); + + // If the daemon should run (not in maintenance mode, etc.), then we can run + // fire off this job for processing. Otherwise, we will need to sleep the + // worker so no more jobs are processed until they should be processed. + if ($job) { + $this->runJob($job, $connectionName, $options); + } else { + $this->sleep($options->sleep); + } + + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $this->stopIfNecessary($options, $lastRestart); + } + } + + /** + * Register the worker timeout handler (PHP 7.1+). + * + * @param \Illuminate\Contracts\Queue\Job|null $job + * @param WorkerOptions $options + * @return void + */ + protected function registerTimeoutHandler($job, WorkerOptions $options) + { + if ($options->timeout > 0 && $this->supportsAsyncSignals()) { + // We will register a signal handler for the alarm signal so that we can kill this + // process if it is running too long because it has frozen. This uses the async + // signals supported in recent versions of PHP to accomplish it conveniently. + pcntl_signal(SIGALRM, function () { + $this->kill(1); + }); + + pcntl_alarm($this->timeoutForJob($job, $options) + $options->sleep); + } + } + + /** + * Get the appropriate timeout for the given job. + * + * @param \Illuminate\Contracts\Queue\Job|null $job + * @param WorkerOptions $options + * @return int + */ + protected function timeoutForJob($job, WorkerOptions $options) + { + return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout; + } + + /** + * Determine if the daemon should process on this iteration. + * + * @param WorkerOptions $options + * @return bool + */ + protected function daemonShouldRun(WorkerOptions $options) + { + return ! (($this->manager->isDownForMaintenance() && ! $options->force) || + $this->paused || + $this->events->until(new Events\Looping) === false); + } + + /** + * Pause the worker for the current loop. + * + * @param WorkerOptions $options + * @param int $lastRestart + * @return void + */ + protected function pauseWorker(WorkerOptions $options, $lastRestart) + { + $this->sleep($options->sleep > 0 ? $options->sleep : 1); + + $this->stopIfNecessary($options, $lastRestart); + } + + /** + * Stop the process if necessary. + * + * @param WorkerOptions $options + * @param int $lastRestart + */ + protected function stopIfNecessary(WorkerOptions $options, $lastRestart) + { + if ($this->shouldQuit) { + $this->kill(); + } + + if ($this->memoryExceeded($options->memory)) { + $this->stop(12); + } elseif ($this->queueShouldRestart($lastRestart)) { + $this->stop(); + } + } + + /** + * Process the next job on the queue. + * + * @param string $connectionName + * @param string $queue + * @param \Illuminate\Queue\WorkerOptions $options + * @return void + */ + public function runNextJob($connectionName, $queue, WorkerOptions $options) + { + $job = $this->getNextJob( + $this->manager->connection($connectionName), $queue + ); + + // If we're able to pull a job off of the stack, we will process it and then return + // from this method. If there is no job on the queue, we will "sleep" the worker + // for the specified number of seconds, then keep processing jobs after sleep. + if ($job) { + return $this->runJob($job, $connectionName, $options); + } + + $this->sleep($options->sleep); + } + + /** + * Get the next job from the queue connection. + * + * @param \Illuminate\Contracts\Queue\Queue $connection + * @param string $queue + * @return \Illuminate\Contracts\Queue\Job|null + */ + protected function getNextJob($connection, $queue) + { + try { + foreach (explode(',', $queue) as $queue) { + if (! is_null($job = $connection->pop($queue))) { + return $job; + } + } + + $this->raiseEmptyQueueEvent($connection); + } catch (Exception $e) { + $this->raiseQueueExceptionOccurredEvent($connection, $e); + $this->exceptions->report($e); + } catch (Throwable $e) { + $this->exceptions->report(new FatalThrowableError($e)); + } + } + + /** + * Process the given job. + * + * @param \Illuminate\Contracts\Queue\Job $job + * @param string $connectionName + * @param \Illuminate\Queue\WorkerOptions $options + * @return void + */ + protected function runJob($job, $connectionName, WorkerOptions $options) + { + try { + return $this->process($connectionName, $job, $options); + } catch (Exception $e) { + $this->exceptions->report($e); + } catch (Throwable $e) { + $this->exceptions->report(new FatalThrowableError($e)); + } + } + + /** + * Process the given job from the queue. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param \Illuminate\Queue\WorkerOptions $options + * @return void + * + * @throws \Throwable + */ + public function process($connectionName, $job, WorkerOptions $options) + { + try { + // First we will raise the before job event and determine if the job has already ran + // over the its maximum attempt limit, which could primarily happen if the job is + // continually timing out and not actually throwing any exceptions from itself. + $this->raiseBeforeJobEvent($connectionName, $job); + + $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( + $connectionName, $job, (int) $options->maxTries + ); + + // Here we will fire off the job and let it process. We will catch any exceptions so + // they can be reported to the developers logs, etc. Once the job is finished the + // proper events will be fired to let any listeners know this job has finished. + $job->fire(); + + $this->raiseAfterJobEvent($connectionName, $job); + } catch (Exception $e) { + $this->handleJobException($connectionName, $job, $options, $e); + } catch (Throwable $e) { + $this->handleJobException( + $connectionName, $job, $options, new FatalThrowableError($e) + ); + } + } + + /** + * Handle an exception that occurred while the job was running. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param \Illuminate\Queue\WorkerOptions $options + * @param \Exception $e + * @return void + * + * @throws \Exception + */ + protected function handleJobException($connectionName, $job, WorkerOptions $options, $e) + { + try { + // First, we will go ahead and mark the job as failed if it will exceed the maximum + // attempts it is allowed to run the next time we process it. If so we will just + // go ahead and mark it as failed now so we do not have to release this again. + $this->markJobAsFailedIfWillExceedMaxAttempts( + $connectionName, $job, (int) $options->maxTries, $e + ); + + $this->raiseExceptionOccurredJobEvent( + $connectionName, $job, $e + ); + } finally { + // If we catch an exception, we will attempt to release the job back onto the queue + // so it is not lost entirely. This'll let the job be retried at a later time by + // another listener (or this same one). We will re-throw this exception after. + if (! $job->isDeleted()) { + $job->release($options->delay); + } + } + + throw $e; + } + + /** + * Mark the given job as failed if it has exceeded the maximum allowed attempts. + * + * This will likely be because the job previously exceeded a timeout. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param int $maxTries + * @return void + */ + protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries) + { + $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; + + if ($maxTries === 0 || $job->attempts() <= $maxTries) { + return; + } + + $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException( + 'A queued job has been attempted too many times. The job may have previously timed out.' + )); + + throw $e; + } + + /** + * Mark the given job as failed if it has exceeded the maximum allowed attempts. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param int $maxTries + * @param \Exception $e + * @return void + */ + protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e) + { + $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; + + if ($maxTries > 0 && $job->attempts() >= $maxTries) { + $this->failJob($connectionName, $job, $e); + } + } + + /** + * Mark the given job as failed and raise the relevant event. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param \Exception $e + * @return void + */ + protected function failJob($connectionName, $job, $e) + { + return FailingJob::handle($connectionName, $job, $e); + } + + /** + * Raise the before queue job event. + * + * @param string $connectionName + * @return void + */ + protected function raiseEmptyQueueEvent($connectionName) + { + $this->events->fire(new Events\NoJobsAvailable( + $connectionName + )); + } + + /** + * Raise the before queue job event. + * + * @param string $connectionName + * @param \Exception $e + * @return void + */ + protected function raiseQueueExceptionOccurredEvent($connectionName, $e) + { + $this->events->fire(new Events\QueueExceptionOccurred( + $connectionName, $e + )); + } + + /** + * Raise the before queue job event. + * + * @param int $seconds + * @return void + */ + protected function raiseSleepingEvent($seconds) + { + $this->events->fire(new Events\Sleeping( + $seconds + )); + } + + /** + * Raise the before queue job event. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @return void + */ + protected function raiseBeforeJobEvent($connectionName, $job) + { + $this->events->fire(new Events\JobProcessing( + $connectionName, $job + )); + } + + /** + * Raise the after queue job event. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @return void + */ + protected function raiseAfterJobEvent($connectionName, $job) + { + $this->events->fire(new Events\JobProcessed( + $connectionName, $job + )); + } + + /** + * Raise the exception occurred queue job event. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param \Exception $e + * @return void + */ + protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e) + { + $this->events->fire(new Events\JobExceptionOccurred( + $connectionName, $job, $e + )); + } + + /** + * Raise the failed queue job event. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param \Exception $e + * @return void + */ + protected function raiseFailedJobEvent($connectionName, $job, $e) + { + $this->events->fire(new Events\JobFailed( + $connectionName, $job, $e + )); + } + + /** + * Determine if the queue worker should restart. + * + * @param int|null $lastRestart + * @return bool + */ + protected function queueShouldRestart($lastRestart) + { + return $this->getTimestampOfLastQueueRestart() != $lastRestart; + } + + /** + * Get the last queue restart timestamp, or null. + * + * @return int|null + */ + protected function getTimestampOfLastQueueRestart() + { + if ($this->cache) { + return $this->cache->get('illuminate:queue:restart'); + } + } + + /** + * Enable async signals for the process. + * + * @return void + */ + protected function listenForSignals() + { + if ($this->supportsAsyncSignals()) { + pcntl_async_signals(true); + + pcntl_signal(SIGTERM, function () { + $this->shouldQuit = true; + }); + + pcntl_signal(SIGUSR2, function () { + $this->paused = true; + }); + + pcntl_signal(SIGCONT, function () { + $this->paused = false; + }); + } + } + + /** + * Determine if "async" signals are supported. + * + * @return bool + */ + protected function supportsAsyncSignals() + { + return version_compare(PHP_VERSION, '7.1.0') >= 0 && + extension_loaded('pcntl'); + } + + /** + * Determine if the memory limit has been exceeded. + * + * @param int $memoryLimit + * @return bool + */ + public function memoryExceeded($memoryLimit) + { + return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; + } + + /** + * Stop listening and bail out of the script. + * + * @param int $status + * @return void + */ + public function stop($status = 0) + { + $this->events->fire(new Events\WorkerStopping); + + exit($status); + } + + /** + * Kill the process. + * + * @param int $status + * @return void + */ + public function kill($status = 0) + { + if (extension_loaded('posix')) { + posix_kill(getmypid(), SIGKILL); + } + + exit($status); + } + + /** + * Sleep the script for a given number of seconds. + * + * @param int $seconds + * @return void + */ + public function sleep($seconds) + { + $this->raiseSleepingEvent($seconds); + sleep($seconds); + } + + /** + * Set the cache repository implementation. + * + * @param \Illuminate\Contracts\Cache\Repository $cache + * @return void + */ + public function setCache(CacheContract $cache) + { + $this->cache = $cache; + } + + /** + * Get the queue manager instance. + * + * @return \Illuminate\Queue\QueueManager + */ + public function getManager() + { + return $this->manager; + } + + /** + * Set the queue manager instance. + * + * @param \Illuminate\Queue\QueueManager $manager + * @return void + */ + public function setManager(QueueManager $manager) + { + $this->manager = $manager; + } +}