Skip to content

Commit

Permalink
Merge pull request #26 from spatie/synchronous-fallback
Browse files Browse the repository at this point in the history
Synchronous fallback
  • Loading branch information
brendt authored Jan 29, 2018
2 parents f7060e2 + 313d421 commit 00f73ab
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 125 deletions.
24 changes: 4 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,29 +172,13 @@ $pool = Pool::create()
;
```

### Asynchronous support at runtime
### Synchronous fallback

The `Pool` class has a static method `isSupported` you can call to check whether your platform is able to run asynchronous processes. You can use this check in combination with a `Task` to be able to easily run the same code synchronous or asynchronous.
If the required extensions (`pctnl` and `posix`) are not installed in your current PHP runtime, the `Pool` will automatically fallback to synchronous execution of tasks.

```php
$pool = Pool::create();
The `Pool` class has a static method `isSupported` you can call to check whether your platform is able to run asynchronous processes.

foreach ($things as $thing) {
$task = new MyTask($thing);

if (! Pool::isSupported()) {
$task->execute();

continue;
}

$pool->add($task);
}

if (Pool::isSupported()) {
$pool->wait();
}
```
If you're using a `Task` to run processes, only the `execute` method of those tasks will be called when running in synchronous mode.

## Behind the curtains

Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
},
"require-dev": {
"larapack/dd": "^1.1",
"phpunit/phpunit": "^6.0"
"phpunit/phpunit": "^6.0",
"symfony/stopwatch": "^4.0"
},
"autoload": {
"files": [
Expand Down
2 changes: 1 addition & 1 deletion src/ParallelError.php → src/Output/ParallelError.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

namespace Spatie\Async;
namespace Spatie\Async\Output;

use Exception;

Expand Down
64 changes: 43 additions & 21 deletions src/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,33 @@

use ArrayAccess;
use InvalidArgumentException;
use Spatie\Async\Process\Runnable;
use Spatie\Async\Runtime\ParentRuntime;
use Spatie\Async\Process\ParallelProcess;
use Spatie\Async\Process\SynchronousProcess;

class Pool implements ArrayAccess
{
public static $forceSynchronous = false;

protected $concurrency = 20;
protected $tasksPerProcess = 1;
protected $timeout = 300;
protected $sleepTime = 50000;

/** @var \Spatie\Async\ParallelProcess[] */
/** @var \Spatie\Async\Process\Runnable[] */
protected $queue = [];

/** @var \Spatie\Async\ParallelProcess[] */
/** @var \Spatie\Async\Process\Runnable[] */
protected $inProgress = [];

/** @var \Spatie\Async\ParallelProcess[] */
/** @var \Spatie\Async\Process\Runnable[] */
protected $finished = [];

/** @var \Spatie\Async\ParallelProcess[] */
/** @var \Spatie\Async\Process\Runnable[] */
protected $failed = [];

/** @var \Spatie\Async\ParallelProcess[] */
/** @var \Spatie\Async\Process\Runnable[] */
protected $timeouts = [];

protected $results = [];
Expand All @@ -49,7 +54,10 @@ public static function create()

public static function isSupported(): bool
{
return function_exists('pcntl_async_signals') && function_exists('posix_kill');
return
function_exists('pcntl_async_signals')
&& function_exists('posix_kill')
&& ! self::$forceSynchronous;
}

public function concurrency(int $concurrency): self
Expand Down Expand Up @@ -96,18 +104,18 @@ public function notify()
}

/**
* @param \Spatie\Async\ParallelProcess|callable $process
* @param \Spatie\Async\Process\Runnable|callable $process
*
* @return \Spatie\Async\ParallelProcess
* @return \Spatie\Async\Process\Runnable
*/
public function add($process): ParallelProcess
public function add($process): Runnable
{
if (! is_callable($process) && ! $process instanceof ParallelProcess) {
if (! is_callable($process) && ! $process instanceof Runnable) {
throw new InvalidArgumentException('The process passed to Pool::add should be callable.');
}

if (! $process instanceof ParallelProcess) {
$process = ParentRuntime::createChildProcess($process);
if (! $process instanceof Runnable) {
$process = ParentRuntime::createProcess($process);
}

$this->putInQueue($process);
Expand All @@ -122,6 +130,10 @@ public function wait(): array
if ($process->getCurrentExecutionTime() > $this->timeout) {
$this->markAsTimedOut($process);
}

if ($process instanceof SynchronousProcess) {
$this->markAsFinished($process);
}
}

if (! $this->inProgress) {
Expand All @@ -134,16 +146,18 @@ public function wait(): array
return $this->results;
}

public function putInQueue(ParallelProcess $process)
public function putInQueue(Runnable $process)
{
$this->queue[$process->getId()] = $process;

$this->notify();
}

public function putInProgress(ParallelProcess $process)
public function putInProgress(Runnable $process)
{
$process->getProcess()->setTimeout($this->timeout);
if ($process instanceof ParallelProcess) {
$process->getProcess()->setTimeout($this->timeout);
}

$process->start();

Expand All @@ -152,7 +166,7 @@ public function putInProgress(ParallelProcess $process)
$this->inProgress[$process->getPid()] = $process;
}

public function markAsFinished(ParallelProcess $process)
public function markAsFinished(Runnable $process)
{
unset($this->inProgress[$process->getPid()]);

Expand All @@ -163,7 +177,7 @@ public function markAsFinished(ParallelProcess $process)
$this->finished[$process->getPid()] = $process;
}

public function markAsTimedOut(ParallelProcess $process)
public function markAsTimedOut(Runnable $process)
{
unset($this->inProgress[$process->getPid()]);

Expand All @@ -174,7 +188,7 @@ public function markAsTimedOut(ParallelProcess $process)
$this->timeouts[$process->getPid()] = $process;
}

public function markAsFailed(ParallelProcess $process)
public function markAsFailed(Runnable $process)
{
unset($this->inProgress[$process->getPid()]);

Expand Down Expand Up @@ -208,23 +222,31 @@ public function offsetUnset($offset)
}

/**
* @return \Spatie\Async\ParallelProcess[]
* @return \Spatie\Async\Process\Runnable[]
*/
public function getQueue(): array
{
return $this->queue;
}

/**
* @return \Spatie\Async\Process\Runnable[]
*/
public function getFinished(): array
{
return $this->finished;
}

/**
* @return \Spatie\Async\ParallelProcess[]
* @return \Spatie\Async\Process\Runnable[]
*/
public function getFailed(): array
{
return $this->failed;
}

/**
* @return \Spatie\Async\ParallelProcess[]
* @return \Spatie\Async\Process\Runnable[]
*/
public function getTimeouts(): array
{
Expand Down
6 changes: 5 additions & 1 deletion src/PoolStatus.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Spatie\Async;

use Spatie\Async\Process\ParallelProcess;
use Spatie\Async\Output\SerializableException;

class PoolStatus
Expand All @@ -28,11 +29,14 @@ protected function lines(string ...$lines): string

protected function summaryToString(): string
{
$queue = $this->pool->getQueue();
$finished = $this->pool->getFinished();
$failed = $this->pool->getFailed();
$timeouts = $this->pool->getTimeouts();

return 'finished: '.count($finished)
return
'queue: '.count($queue)
.' - finished: '.count($finished)
.' - failed: '.count($failed)
.' - timeout: '.count($timeouts);
}
Expand Down
77 changes: 9 additions & 68 deletions src/ParallelProcess.php → src/Process/ParallelProcess.php
Original file line number Diff line number Diff line change
@@ -1,58 +1,36 @@
<?php

namespace Spatie\Async;
namespace Spatie\Async\Process;

use Throwable;
use Spatie\Async\Output\ParallelError;
use Symfony\Component\Process\Process;
use Spatie\Async\Output\SerializableException;

class ParallelProcess
class ParallelProcess implements Runnable
{
protected $process;
protected $id;
protected $pid;

protected $successCallbacks = [];
protected $errorCallbacks = [];
protected $timeoutCallbacks = [];

protected $output;
protected $errorOutput;

protected $startTime;

public function __construct(Process $process, string $id)
use ProcessCallbacks;

public function __construct(Process $process, int $id)
{
$this->process = $process;
$this->id = $id;
}

public static function create(Process $process, string $id): self
public static function create(Process $process, int $id): self
{
return new self($process, $id);
}

public function then(callable $callback): self
{
$this->successCallbacks[] = $callback;

return $this;
}

public function catch(callable $callback): self
{
$this->errorCallbacks[] = $callback;

return $this;
}

public function timeout(callable $callback): self
{
$this->timeoutCallbacks[] = $callback;

return $this;
}

public function start(): self
{
$this->startTime = microtime(true);
Expand Down Expand Up @@ -121,12 +99,12 @@ public function getProcess(): Process
return $this->process;
}

public function getId(): string
public function getId(): int
{
return $this->id;
}

public function getPid(): ?string
public function getPid(): ?int
{
return $this->pid;
}
Expand All @@ -136,43 +114,6 @@ public function getCurrentExecutionTime(): float
return microtime(true) - $this->startTime;
}

public function triggerSuccess()
{
if ($this->getErrorOutput()) {
$this->triggerError();

return;
}

$output = $this->getOutput();

foreach ($this->successCallbacks as $callback) {
call_user_func_array($callback, [$output]);
}

return $output;
}

public function triggerError()
{
$exception = $this->resolveErrorOutput();

foreach ($this->errorCallbacks as $callback) {
call_user_func_array($callback, [$exception]);
}

if (! $this->errorCallbacks) {
throw $exception;
}
}

public function triggerTimeout()
{
foreach ($this->timeoutCallbacks as $callback) {
call_user_func_array($callback, []);
}
}

protected function resolveErrorOutput(): Throwable
{
$exception = $this->getErrorOutput();
Expand Down
Loading

0 comments on commit 00f73ab

Please sign in to comment.