From 4be854ae57b6f4b3c0e454f70e84c776364c8e2f Mon Sep 17 00:00:00 2001 From: dusterio Date: Tue, 11 Apr 2017 12:07:22 +1000 Subject: [PATCH] + Copy files over --- src/Integrations/LaravelServiceProvider.php | 104 ++++ src/Queue/ListenCommand.php | 93 +--- src/Queue/Listener.php | 207 +------- src/Queue/ListenerOptions.php | 14 +- src/Queue/NoJobsAvailable.php | 24 + src/Queue/QueueExceptionOccurred.php | 43 ++ src/Queue/Sleeping.php | 22 + src/Queue/WorkCommand.php | 130 +---- src/Queue/Worker.php | 559 +------------------- 9 files changed, 214 insertions(+), 982 deletions(-) create mode 100644 src/Integrations/LaravelServiceProvider.php create mode 100644 src/Queue/NoJobsAvailable.php create mode 100644 src/Queue/QueueExceptionOccurred.php create mode 100644 src/Queue/Sleeping.php diff --git a/src/Integrations/LaravelServiceProvider.php b/src/Integrations/LaravelServiceProvider.php new file mode 100644 index 0000000..dcad20d --- /dev/null +++ b/src/Integrations/LaravelServiceProvider.php @@ -0,0 +1,104 @@ +registerWorker(); + $this->registerListener(); + } + + /** + * Register the queue worker. + * + * @return void + */ + protected function registerWorker() + { + $this->registerWorkCommand(); + + $this->app->singleton('queue.worker', function ($app) { + return new Worker( + $app['queue'], $app['events'], + $app['Illuminate\Contracts\Debug\ExceptionHandler'] + ); + }); + } + + /** + * Register the queue worker console command. + * + * @return void + */ + protected function registerWorkCommand() + { + $this->app->singleton('command.queue.work', function ($app) { + return new WorkCommand($app['queue.worker']); + }); + + $this->commands('command.queue.work'); + } + + /** + * Register the queue listener. + * + * @return void + */ + protected function registerListener() + { + $this->registerListenCommand(); + + $this->app->singleton('queue.listener', function ($app) { + return new Listener($app->basePath()); + }); + } + + /** + * Register the queue listener console command. + * + * @return void + */ + protected function registerListenCommand() + { + $this->app->singleton('command.queue.listen', function ($app) { + return new ListenCommand($app['queue.listener']); + }); + + $this->commands('command.queue.listen'); + } + + /** + * @return void + */ + public function boot() + { + } + + /** + * Get the services provided by the provider. + * + * @return array + */ + public function provides() + { + return [ + 'queue.worker', 'queue.listener', + 'command.queue.work', 'command.queue.listen' + ]; + } +} diff --git a/src/Queue/ListenCommand.php b/src/Queue/ListenCommand.php index c98e25b..3998be0 100755 --- a/src/Queue/ListenCommand.php +++ b/src/Queue/ListenCommand.php @@ -1,89 +1,13 @@ setOutputHandler($this->listener = $listener); - } - - /** - * Execute the console command. - * - * @return void - */ - public function fire() - { - // We need to get the right queue for the connection which is set in the queue - // configuration file for the application. We will pull it based on the set - // connection being run for the queue operation currently being executed. - $queue = $this->getQueue( - $connection = $this->input->getArgument('connection') - ); - - $this->listener->listen( - $connection, $queue, $this->gatherOptions() - ); - } - - /** - * Get the name of the queue connection to listen on. - * - * @param string $connection - * @return string - */ - protected function getQueue($connection) - { - $connection = $connection ?: $this->laravel['config']['queue.default']; - - return $this->input->getOption('queue') ?: $this->laravel['config']->get( - "queue.connections.{$connection}.queue", 'default' - ); - } - /** * Get the listener options for the command. * @@ -99,19 +23,6 @@ protected function gatherOptions() ); } - /** - * Set the options on the queue listener. - * - * @param \Illuminate\Queue\Listener $listener - * @return void - */ - protected function setOutputHandler(Listener $listener) - { - $listener->setOutputHandler(function ($type, $line) { - $this->output->write($line); - }); - } - /** * Resolve a Symfony verbosity level back to its CLI parameter. * diff --git a/src/Queue/Listener.php b/src/Queue/Listener.php index 7d2f693..5adb6ce 100755 --- a/src/Queue/Listener.php +++ b/src/Queue/Listener.php @@ -1,120 +1,20 @@ commandPath = $commandPath; - $this->workerCommand = $this->buildCommandTemplate(); - } - - /** - * Build the environment specific worker command. - * - * @return string - */ - protected function buildCommandTemplate() - { - $command = 'queue:work %s --once --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s'; - - return "{$this->phpBinary()} {$this->artisanBinary()} {$command}"; - } - - /** - * Get the PHP binary. - * - * @return string - */ - protected function phpBinary() - { - return ProcessUtils::escapeArgument( - (new PhpExecutableFinder)->find(false) - ); - } - - /** - * Get the Artisan binary. - * - * @return string - */ - protected function artisanBinary() - { - return defined('ARTISAN_BINARY') - ? ProcessUtils::escapeArgument(ARTISAN_BINARY) - : 'artisan'; - } - - /** - * Listen to the given queue connection. - * - * @param string $connection - * @param string $queue - * @param \Illuminate\Queue\ListenerOptions $options - * @return void - */ - public function listen($connection, $queue, ListenerOptions $options) - { - $process = $this->makeProcess($connection, $queue, $options); - - while (true) { - $this->runProcess($process, $options->memory); - } - } - /** * Create a new Symfony process for the worker. * * @param string $connection * @param string $queue - * @param \Illuminate\Queue\ListenerOptions $options + * @param ListenerOptions $options * @return \Symfony\Component\Process\Process */ public function makeProcess($connection, $queue, ListenerOptions $options) @@ -144,114 +44,15 @@ public function makeProcess($connection, $queue, ListenerOptions $options) ); } - /** - * Add the environment option to the given command. - * - * @param string $command - * @param \Illuminate\Queue\ListenerOptions $options - * @return string - */ - protected function addEnvironment($command, ListenerOptions $options) - { - return $command.' --env='.ProcessUtils::escapeArgument($options->environment); - } - /** * Resolve a Symfony verbosity level back to its CLI parameter. * * @param string $command - * @param \Illuminate\Queue\ListenerOptions $options + * @param ListenerOptions $options * @return string */ protected function addVerbosity($command, ListenerOptions $options) { return $command.' -'.$options->verbosity; } - - /** - * Format the given command with the listener options. - * - * @param string $command - * @param string $connection - * @param string $queue - * @param \Illuminate\Queue\ListenerOptions $options - * @return string - */ - protected function formatCommand($command, $connection, $queue, ListenerOptions $options) - { - return sprintf( - $command, - ProcessUtils::escapeArgument($connection), - ProcessUtils::escapeArgument($queue), - $options->delay, $options->memory, - $options->sleep, $options->maxTries - ); - } - - /** - * Run the given process. - * - * @param \Symfony\Component\Process\Process $process - * @param int $memory - * @return void - */ - public function runProcess(Process $process, $memory) - { - $process->run(function ($type, $line) { - $this->handleWorkerOutput($type, $line); - }); - - // Once we have run the job we'll go check if the memory limit has been exceeded - // for the script. If it has, we will kill this script so the process manager - // will restart this with a clean slate of memory automatically on exiting. - if ($this->memoryExceeded($memory)) { - $this->stop(); - } - } - - /** - * Handle output from the worker process. - * - * @param int $type - * @param string $line - * @return void - */ - protected function handleWorkerOutput($type, $line) - { - if (isset($this->outputHandler)) { - call_user_func($this->outputHandler, $type, $line); - } - } - - /** - * Determine if the memory limit has been exceeded. - * - * @param int $memoryLimit - * @return bool - */ - public function memoryExceeded($memoryLimit) - { - return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; - } - - /** - * Stop listening and bail out of the script. - * - * @return void - */ - public function stop() - { - die; - } - - /** - * Set the output handler callback. - * - * @param \Closure $outputHandler - * @return void - */ - public function setOutputHandler(Closure $outputHandler) - { - $this->outputHandler = $outputHandler; - } } diff --git a/src/Queue/ListenerOptions.php b/src/Queue/ListenerOptions.php index ebe4ca8..cc4c571 100644 --- a/src/Queue/ListenerOptions.php +++ b/src/Queue/ListenerOptions.php @@ -1,16 +1,9 @@ environment = $environment; $this->verbosity = $verbosity; - parent::__construct($delay, $memory, $timeout, $sleep, $maxTries, $force); + parent::__construct($environment, $delay, $memory, $timeout, $sleep, $maxTries, $force); } } diff --git a/src/Queue/NoJobsAvailable.php b/src/Queue/NoJobsAvailable.php new file mode 100644 index 0000000..a221ba5 --- /dev/null +++ b/src/Queue/NoJobsAvailable.php @@ -0,0 +1,24 @@ +connectionName = $connectionName; + } +} diff --git a/src/Queue/QueueExceptionOccurred.php b/src/Queue/QueueExceptionOccurred.php new file mode 100644 index 0000000..5aab38a --- /dev/null +++ b/src/Queue/QueueExceptionOccurred.php @@ -0,0 +1,43 @@ +exception = $exception; + $this->connectionName = $connectionName; + } + + /** + * @return string + */ + public function getMessage() + { + return Str::limit($this->exception->getMessage(), 128); + } +} diff --git a/src/Queue/Sleeping.php b/src/Queue/Sleeping.php new file mode 100644 index 0000000..72dc960 --- /dev/null +++ b/src/Queue/Sleeping.php @@ -0,0 +1,22 @@ +seconds = $seconds; + } +} diff --git a/src/Queue/WorkCommand.php b/src/Queue/WorkCommand.php index a0c37be..6311039 100644 --- a/src/Queue/WorkCommand.php +++ b/src/Queue/WorkCommand.php @@ -1,65 +1,15 @@ worker = $worker; - } - /** * Execute the console command. * @@ -77,7 +27,7 @@ public function fire() $this->listenForEvents(); $connection = $this->argument('connection') - ?: $this->laravel['config']['queue.default']; + ?: $this->laravel['config']['queue.default']; // We need to get the right queue for the connection which is set in the queue // configuration file for the application. We will pull it based on the set @@ -92,36 +42,6 @@ public function fire() ); } - /** - * Run the worker instance. - * - * @param string $connection - * @param string $queue - * @return array - */ - protected function runWorker($connection, $queue) - { - $this->worker->setCache($this->laravel['cache']->driver()); - - return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}( - $connection, $queue, $this->gatherWorkerOptions() - ); - } - - /** - * Gather all of the queue worker options as a single object. - * - * @return \Illuminate\Queue\WorkerOptions - */ - protected function gatherWorkerOptions() - { - return new WorkerOptions( - $this->option('delay'), $this->option('memory'), - $this->option('timeout'), $this->option('sleep'), - $this->option('tries'), $this->option('force') - ); - } - /** * Listen for the queue events in order to update the console output. * @@ -134,15 +54,15 @@ protected function listenForEvents() }); $this->laravel['events']->listen(JobProcessing::class, function ($event) { - $this->output->writeln('Popped a job from the queue: '.$event->job->resolveName(), OutputInterface::VERBOSITY_VERY_VERBOSE); + $this->output->writeln('Popped a job from the queue: ' . $event->job->resolveName(), OutputInterface::VERBOSITY_VERY_VERBOSE); }); $this->laravel['events']->listen(JobProcessed::class, function ($event) { - $this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Processed: '.$event->job->resolveName()); + $this->output->writeln('[' . Carbon::now()->format('Y-m-d H:i:s') . '] Processed: ' . $event->job->resolveName()); }); $this->laravel['events']->listen(JobFailed::class, function ($event) { - $this->output->writeln('['.Carbon::now()->format('Y-m-d H:i:s').'] Failed: '.$event->job->resolveName()); + $this->output->writeln('[' . Carbon::now()->format('Y-m-d H:i:s') . '] Failed: ' . $event->job->resolveName()); $this->logFailedJob($event); }); @@ -156,41 +76,5 @@ protected function listenForEvents() $this->output->writeln("Couldn't fetch a job from the queue. See the log file for more information.", OutputInterface::VERBOSITY_VERBOSE); }); } - - /** - * Store a failed job event. - * - * @param JobFailed $event - * @return void - */ - protected function logFailedJob(JobFailed $event) - { - $this->laravel['queue.failer']->log( - $event->connectionName, $event->job->getQueue(), - $event->job->getRawBody(), $event->exception - ); - } - - /** - * Get the queue name for the worker. - * - * @param string $connection - * @return string - */ - protected function getQueue($connection) - { - return $this->option('queue') ?: $this->laravel['config']->get( - "queue.connections.{$connection}.queue", 'default' - ); - } - - /** - * Determine if the worker should run in maintenance mode. - * - * @return bool - */ - protected function downForMaintenance() - { - return $this->option('force') ? false : $this->laravel->isDownForMaintenance(); - } } + diff --git a/src/Queue/Worker.php b/src/Queue/Worker.php index e00215c..432979b 100644 --- a/src/Queue/Worker.php +++ b/src/Queue/Worker.php @@ -1,227 +1,13 @@ events = $events; - $this->manager = $manager; - $this->exceptions = $exceptions; - } - - /** - * Listen to the given queue in a loop. - * - * @param string $connectionName - * @param string $queue - * @param \Illuminate\Queue\WorkerOptions $options - * @return void - */ - public function daemon($connectionName, $queue, WorkerOptions $options) - { - $this->listenForSignals(); - - $lastRestart = $this->getTimestampOfLastQueueRestart(); - - while (true) { - // Before reserving any jobs, we will make sure this queue is not paused and - // if it is we will just pause this worker for a given amount of time and - // make sure we do not need to kill this worker process off completely. - if (! $this->daemonShouldRun($options)) { - $this->pauseWorker($options, $lastRestart); - - continue; - } - - // First, we will attempt to get the next job off of the queue. We will also - // register the timeout handler and reset the alarm for this job so it is - // not stuck in a frozen state forever. Then, we can fire off this job. - $job = $this->getNextJob( - $this->manager->connection($connectionName), $queue - ); - - $this->registerTimeoutHandler($job, $options); - - // If the daemon should run (not in maintenance mode, etc.), then we can run - // fire off this job for processing. Otherwise, we will need to sleep the - // worker so no more jobs are processed until they should be processed. - if ($job) { - $this->runJob($job, $connectionName, $options); - } else { - $this->sleep($options->sleep); - } - - // Finally, we will check to see if we have exceeded our memory limits or if - // the queue should restart based on other indications. If so, we'll stop - // this worker and let whatever is "monitoring" it restart the process. - $this->stopIfNecessary($options, $lastRestart); - } - } - - /** - * Register the worker timeout handler (PHP 7.1+). - * - * @param \Illuminate\Contracts\Queue\Job|null $job - * @param WorkerOptions $options - * @return void - */ - protected function registerTimeoutHandler($job, WorkerOptions $options) - { - if ($options->timeout > 0 && $this->supportsAsyncSignals()) { - // We will register a signal handler for the alarm signal so that we can kill this - // process if it is running too long because it has frozen. This uses the async - // signals supported in recent versions of PHP to accomplish it conveniently. - pcntl_signal(SIGALRM, function () { - $this->kill(1); - }); - - pcntl_alarm($this->timeoutForJob($job, $options) + $options->sleep); - } - } - - /** - * Get the appropriate timeout for the given job. - * - * @param \Illuminate\Contracts\Queue\Job|null $job - * @param WorkerOptions $options - * @return int - */ - protected function timeoutForJob($job, WorkerOptions $options) - { - return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout; - } - - /** - * Determine if the daemon should process on this iteration. - * - * @param WorkerOptions $options - * @return bool - */ - protected function daemonShouldRun(WorkerOptions $options) - { - return ! (($this->manager->isDownForMaintenance() && ! $options->force) || - $this->paused || - $this->events->until(new Events\Looping) === false); - } - - /** - * Pause the worker for the current loop. - * - * @param WorkerOptions $options - * @param int $lastRestart - * @return void - */ - protected function pauseWorker(WorkerOptions $options, $lastRestart) - { - $this->sleep($options->sleep > 0 ? $options->sleep : 1); - - $this->stopIfNecessary($options, $lastRestart); - } - - /** - * Stop the process if necessary. - * - * @param WorkerOptions $options - * @param int $lastRestart - */ - protected function stopIfNecessary(WorkerOptions $options, $lastRestart) - { - if ($this->shouldQuit) { - $this->kill(); - } - - if ($this->memoryExceeded($options->memory)) { - $this->stop(12); - } elseif ($this->queueShouldRestart($lastRestart)) { - $this->stop(); - } - } - - /** - * Process the next job on the queue. - * - * @param string $connectionName - * @param string $queue - * @param \Illuminate\Queue\WorkerOptions $options - * @return void - */ - public function runNextJob($connectionName, $queue, WorkerOptions $options) - { - $job = $this->getNextJob( - $this->manager->connection($connectionName), $queue - ); - - // If we're able to pull a job off of the stack, we will process it and then return - // from this method. If there is no job on the queue, we will "sleep" the worker - // for the specified number of seconds, then keep processing jobs after sleep. - if ($job) { - return $this->runJob($job, $connectionName, $options); - } - - $this->sleep($options->sleep); - } - /** * Get the next job from the queue connection. * @@ -247,154 +33,6 @@ protected function getNextJob($connection, $queue) } } - /** - * Process the given job. - * - * @param \Illuminate\Contracts\Queue\Job $job - * @param string $connectionName - * @param \Illuminate\Queue\WorkerOptions $options - * @return void - */ - protected function runJob($job, $connectionName, WorkerOptions $options) - { - try { - return $this->process($connectionName, $job, $options); - } catch (Exception $e) { - $this->exceptions->report($e); - } catch (Throwable $e) { - $this->exceptions->report(new FatalThrowableError($e)); - } - } - - /** - * Process the given job from the queue. - * - * @param string $connectionName - * @param \Illuminate\Contracts\Queue\Job $job - * @param \Illuminate\Queue\WorkerOptions $options - * @return void - * - * @throws \Throwable - */ - public function process($connectionName, $job, WorkerOptions $options) - { - try { - // First we will raise the before job event and determine if the job has already ran - // over the its maximum attempt limit, which could primarily happen if the job is - // continually timing out and not actually throwing any exceptions from itself. - $this->raiseBeforeJobEvent($connectionName, $job); - - $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( - $connectionName, $job, (int) $options->maxTries - ); - - // Here we will fire off the job and let it process. We will catch any exceptions so - // they can be reported to the developers logs, etc. Once the job is finished the - // proper events will be fired to let any listeners know this job has finished. - $job->fire(); - - $this->raiseAfterJobEvent($connectionName, $job); - } catch (Exception $e) { - $this->handleJobException($connectionName, $job, $options, $e); - } catch (Throwable $e) { - $this->handleJobException( - $connectionName, $job, $options, new FatalThrowableError($e) - ); - } - } - - /** - * Handle an exception that occurred while the job was running. - * - * @param string $connectionName - * @param \Illuminate\Contracts\Queue\Job $job - * @param \Illuminate\Queue\WorkerOptions $options - * @param \Exception $e - * @return void - * - * @throws \Exception - */ - protected function handleJobException($connectionName, $job, WorkerOptions $options, $e) - { - try { - // First, we will go ahead and mark the job as failed if it will exceed the maximum - // attempts it is allowed to run the next time we process it. If so we will just - // go ahead and mark it as failed now so we do not have to release this again. - $this->markJobAsFailedIfWillExceedMaxAttempts( - $connectionName, $job, (int) $options->maxTries, $e - ); - - $this->raiseExceptionOccurredJobEvent( - $connectionName, $job, $e - ); - } finally { - // If we catch an exception, we will attempt to release the job back onto the queue - // so it is not lost entirely. This'll let the job be retried at a later time by - // another listener (or this same one). We will re-throw this exception after. - if (! $job->isDeleted()) { - $job->release($options->delay); - } - } - - throw $e; - } - - /** - * Mark the given job as failed if it has exceeded the maximum allowed attempts. - * - * This will likely be because the job previously exceeded a timeout. - * - * @param string $connectionName - * @param \Illuminate\Contracts\Queue\Job $job - * @param int $maxTries - * @return void - */ - protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries) - { - $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; - - if ($maxTries === 0 || $job->attempts() <= $maxTries) { - return; - } - - $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException( - 'A queued job has been attempted too many times. The job may have previously timed out.' - )); - - throw $e; - } - - /** - * Mark the given job as failed if it has exceeded the maximum allowed attempts. - * - * @param string $connectionName - * @param \Illuminate\Contracts\Queue\Job $job - * @param int $maxTries - * @param \Exception $e - * @return void - */ - protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e) - { - $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; - - if ($maxTries > 0 && $job->attempts() >= $maxTries) { - $this->failJob($connectionName, $job, $e); - } - } - - /** - * Mark the given job as failed and raise the relevant event. - * - * @param string $connectionName - * @param \Illuminate\Contracts\Queue\Job $job - * @param \Exception $e - * @return void - */ - protected function failJob($connectionName, $job, $e) - { - return FailingJob::handle($connectionName, $job, $e); - } - /** * Raise the before queue job event. * @@ -403,7 +41,7 @@ protected function failJob($connectionName, $job, $e) */ protected function raiseEmptyQueueEvent($connectionName) { - $this->events->fire(new Events\NoJobsAvailable( + $this->events->fire(new NoJobsAvailable( $connectionName )); } @@ -417,7 +55,7 @@ protected function raiseEmptyQueueEvent($connectionName) */ protected function raiseQueueExceptionOccurredEvent($connectionName, $e) { - $this->events->fire(new Events\QueueExceptionOccurred( + $this->events->fire(new QueueExceptionOccurred( $connectionName, $e )); } @@ -430,166 +68,11 @@ protected function raiseQueueExceptionOccurredEvent($connectionName, $e) */ protected function raiseSleepingEvent($seconds) { - $this->events->fire(new Events\Sleeping( + $this->events->fire(new Sleeping( $seconds )); } - /** - * Raise the before queue job event. - * - * @param string $connectionName - * @param \Illuminate\Contracts\Queue\Job $job - * @return void - */ - protected function raiseBeforeJobEvent($connectionName, $job) - { - $this->events->fire(new Events\JobProcessing( - $connectionName, $job - )); - } - - /** - * Raise the after queue job event. - * - * @param string $connectionName - * @param \Illuminate\Contracts\Queue\Job $job - * @return void - */ - protected function raiseAfterJobEvent($connectionName, $job) - { - $this->events->fire(new Events\JobProcessed( - $connectionName, $job - )); - } - - /** - * Raise the exception occurred queue job event. - * - * @param string $connectionName - * @param \Illuminate\Contracts\Queue\Job $job - * @param \Exception $e - * @return void - */ - protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e) - { - $this->events->fire(new Events\JobExceptionOccurred( - $connectionName, $job, $e - )); - } - - /** - * Raise the failed queue job event. - * - * @param string $connectionName - * @param \Illuminate\Contracts\Queue\Job $job - * @param \Exception $e - * @return void - */ - protected function raiseFailedJobEvent($connectionName, $job, $e) - { - $this->events->fire(new Events\JobFailed( - $connectionName, $job, $e - )); - } - - /** - * Determine if the queue worker should restart. - * - * @param int|null $lastRestart - * @return bool - */ - protected function queueShouldRestart($lastRestart) - { - return $this->getTimestampOfLastQueueRestart() != $lastRestart; - } - - /** - * Get the last queue restart timestamp, or null. - * - * @return int|null - */ - protected function getTimestampOfLastQueueRestart() - { - if ($this->cache) { - return $this->cache->get('illuminate:queue:restart'); - } - } - - /** - * Enable async signals for the process. - * - * @return void - */ - protected function listenForSignals() - { - if ($this->supportsAsyncSignals()) { - pcntl_async_signals(true); - - pcntl_signal(SIGTERM, function () { - $this->shouldQuit = true; - }); - - pcntl_signal(SIGUSR2, function () { - $this->paused = true; - }); - - pcntl_signal(SIGCONT, function () { - $this->paused = false; - }); - } - } - - /** - * Determine if "async" signals are supported. - * - * @return bool - */ - protected function supportsAsyncSignals() - { - return version_compare(PHP_VERSION, '7.1.0') >= 0 && - extension_loaded('pcntl'); - } - - /** - * Determine if the memory limit has been exceeded. - * - * @param int $memoryLimit - * @return bool - */ - public function memoryExceeded($memoryLimit) - { - return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; - } - - /** - * Stop listening and bail out of the script. - * - * @param int $status - * @return void - */ - public function stop($status = 0) - { - $this->events->fire(new Events\WorkerStopping); - - exit($status); - } - - /** - * Kill the process. - * - * @param int $status - * @return void - */ - public function kill($status = 0) - { - if (extension_loaded('posix')) { - posix_kill(getmypid(), SIGKILL); - } - - exit($status); - } - /** * Sleep the script for a given number of seconds. * @@ -601,36 +84,4 @@ public function sleep($seconds) $this->raiseSleepingEvent($seconds); sleep($seconds); } - - /** - * Set the cache repository implementation. - * - * @param \Illuminate\Contracts\Cache\Repository $cache - * @return void - */ - public function setCache(CacheContract $cache) - { - $this->cache = $cache; - } - - /** - * Get the queue manager instance. - * - * @return \Illuminate\Queue\QueueManager - */ - public function getManager() - { - return $this->manager; - } - - /** - * Set the queue manager instance. - * - * @param \Illuminate\Queue\QueueManager $manager - * @return void - */ - public function setManager(QueueManager $manager) - { - $this->manager = $manager; - } }