Skip to content

Commit

Permalink
Merge pull request #5 from spatie/symfony-process
Browse files Browse the repository at this point in the history
Symfony process
  • Loading branch information
brendt authored Dec 24, 2017
2 parents 86a0a5f + e3593ee commit f6adaba
Show file tree
Hide file tree
Showing 22 changed files with 595 additions and 544 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
build
composer.lock
docs
vendor
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: php

php:
- 7.0
- 7.1

env:
Expand Down
103 changes: 60 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Asynchronous and parallel PHP with PCNTL
# Asynchronous and parallel PHP

[![Latest Version on Packagist](https://img.shields.io/packagist/v/spatie/async.svg?style=flat-square)](https://packagist.org/packages/spatie/async)
[![Build Status](https://img.shields.io/travis/spatie/async/master.svg?style=flat-square)](https://travis-ci.org/spatie/async)
[![Quality Score](https://img.shields.io/scrutinizer/g/spatie/async.svg?style=flat-square)](https://scrutinizer-ci.com/g/spatie/async)
[![StyleCI](https://styleci.io/repos/114228700/shield?branch=master)](https://styleci.io/repos/114228700)
[![Total Downloads](https://img.shields.io/packagist/dt/spatie/async.svg?style=flat-square)](https://packagist.org/packages/spatie/async)

This library provides a small and easy wrapper around PHP's PCNTL extension.
This library provides a small and easy wrapper around PHP's PCNTL extension.
It allows for running difference processes in parallel, with an easy-to-use API.

## Installation
Expand Down Expand Up @@ -47,34 +47,24 @@ A pool is configurable by the developer:
use Spatie\Async\Pool;

$pool = Pool::create()
->concurrency(20) // The maximum amount of processes which can run simultaneously.
->maximumExecutionTime(200); // The maximum amount of time a process may take to finish in seconds.
```

### Processes

You can just add closures to the pool, but in some cases you want a class to represent a process.
// The maximum amount of processes which can run simultaneously.
->concurrency(20)

```php
use Spatie\Async\Process;
// The maximum amount of time a process may take to finish in seconds.
->timeout(15)

class MyProcess extends Process
{
public function __construct()
{
// You can add your own dependencies.
}

public function execute()
{
// You can do whatever you like in here.
}
}
// Configure which autoloader sub processes should use.
->autoload(__DIR__ . '/../../vendor/autoload.php')

// Configure how long the loop should sleep before re-checking the process statuses in milliseconds.
->sleepTime(50000)
;
```

### Event listeners

When adding a process or a callable to a pool, you'll get an instance of `Process` returned.
When creating asynchronous processes, you'll get an instance of `ParallelProcess` returned.
You can add the following event hooks on a process.

```php
Expand Down Expand Up @@ -103,7 +93,7 @@ use Spatie\Async\Process;

$pool = Pool::create();

for ($i = 0; $i < 5; $i++) {
foreach (range(1, 5) as $i) {
$pool[] = async(function () {
usleep(random_int(10, 1000));

Expand All @@ -116,48 +106,75 @@ for ($i = 0; $i < 5; $i++) {
await($pool);
```

### Error handling

If an exception is thrown from within a child process, and not caught using the `->catch()` callback,
it will be thrown as `Spatie\Async\ParallelError` when calling `await()` or `$pool->wait()`.

## Behind the curtains

When using this package, you're probably wondering what's happening underneath the surface.

PHP has an extension called [PCNTL](http://php.net/manual/en/book.pcntl.php) which can spawn forks of its current process.
PCNTL directly uses your system's `fork` call to create a copy of the process, as a child process.

We're using the `symfony/process` component to create and manage child processes in PHP.
By creating child processes on the fly, we're able to execute PHP scripts in parallel.
This parallelism can improve performance significantly when dealing with multiple synchronous tasks,
This parallelism can improve performance significantly when dealing with multiple synchronous tasks,
which don't really need to wait for each other.
By giving these tasks a separate process to run on, the underlying operating system can take care of running them in parallel.

There's a caveat when dynamically spawning processes: you need to make sure that there won't be too many processes at once,
or the application might crash.
The `Pool` class provided by this package takes care of handling as many processes as you want
The `Pool` class provided by this package takes care of handling as many processes as you want
by scheduling and running them when it's possible.

That's the part that `async()` or `$pool->add()` do. Now let's look at what `await()` or `$pool->wait()` does.
That's the part that `async()` or `$pool->add()` does. Now let's look at what `await()` or `$pool->wait()` does.

When multiple processes are spawned, each can have a separate time to completion.
When multiple processes are spawned, each can have a separate time to completion.
One process might eg. have to wait for a HTTP call, while the other has to process large amounts of data.
Sometimes you also have points in your code which have to wait until the result of a process is returned.

This is why we have to wait at a certain point in time: for all processes on a pool to finish,
This is why we have to wait at a certain point in time: for all processes on a pool to finish,
so we can be sure it's safe to continue without accidentally killing the child processes which aren't done yet.

"Waiting" for all processes is done in a `while` loop, which will check the status of every process once in a while.
When a process is finished, its success event is triggered, which you can hook into with the `->then()` function.
Likewise, when a process fails or times out, the loop will update that process' status and move on.
Waiting for all processes is done by using a `while` loop, which will wait until all processes are finished.
Determining when a process is finished is done by using a listener on the `SIGCHLD` signal.
This signal is emitted when a child process is finished by the OS kernel.
As of PHP 7.1, there's much better support for listening and handling signals,
making this approach more performant than eg. using process forks or sockets for communication.
You can read more about it [here](https://wiki.php.net/rfc/async_signals).

When a process is finished, its success event is triggered, which you can hook into with the `->then()` function.
Likewise, when a process fails or times out, the loop will update that process' status and move on.
When all processes are finished, the while loop will see that there's nothing more to wait for, and stop.
This is the moment your parent process can continue to execute.

Because we're working with separate processes, we need a way of communication between the parent and child processes.
You might for example want to use the result generated by your child processes, in the parent process.
## Comparison to other libraries

There are two very well-known asynchronous libraries in PHP:

- [ReactPHP](https://github.com/reactphp)
- [Amp](https://github.com/amphp)

Both have their own take on parallel processing and offer a much wider range of features than this library.
Our implementation aims for better performance and ease of development, at the cost of a smaller feature set.

I've personally ran some benchmarks against both libraries, for which the code can be found [here](https://github.com/spatie/async-benchmark).
The benchmark consists of 30 iterations of executing the same script.
The script itself will spawn 30 child processes which will sleep for either 1, 2 or 3 seconds,
depending on their position in the queue.
This way there's no random element in the sleep time, though there is variation.
These are the results, plotting the executing time of every iteration, in seconds.

![Comaring spatie/async to Amp and ReactPHP](./docs/benchmarks.png)

Our package uses UNIX sockets for this communication.
Once a process is executed, we'll serialize its output and send it via a socket to the parent process,
who can handle it further in the while loop we spoke about earlier.
You can see that both Amp and our implementation are less performant than ReactPHP.
If you're looking for pure performance, ReactPHP might be a better choice.
Our package though has the benefit of a much simpler API, in our opinion.
We're also still improving this package, so chances are performance will be better in the future.

When a process throws an exception or fails, we can also catch that output and send it via the socket to the parent.
That's how you can also listen for unhandled exceptions thrown in a child process, and handle them yourself.
So when should you use this library?
The benchmarks show that we're in the same league as Amp and ReactPHP for performing processes in parallel.
Both other libraries offer a lot more functionality, though often at the cost of simplicity or performance.
This package aims to solve only part of the bigger picture, but tries to solve it in a performant and easy-to-use way.

## Testing

Expand Down Expand Up @@ -194,7 +211,7 @@ We publish all received postcards [on our company website](https://spatie.be/en/

Spatie is a webdesign agency based in Antwerp, Belgium. You'll find an overview of all our open source projects [on our website](https://spatie.be/opensource).

Does your business depend on our contributions? Reach out and support us on [Patreon](https://www.patreon.com/spatie).
Does your business depend on our contributions? Reach out and support us on [Patreon](https://www.patreon.com/spatie).
All pledges will be dedicated to allocating workforce on maintenance and new awesome stuff.

## License
Expand Down
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
}
],
"require": {
"php": "^7.1"
"php": "^7.1",
"opis/closure": "^3.0",
"symfony/process": "^4.0"
},
"require-dev": {
"larapack/dd": "^1.1",
"phpunit/phpunit": "^6.0"
},
"autoload": {
Expand Down
Empty file removed config/.gitkeep
Empty file.
Binary file added docs/benchmarks.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 0 additions & 18 deletions src/CallableProcess.php

This file was deleted.

39 changes: 0 additions & 39 deletions src/Output/ErrorProcessOutput.php

This file was deleted.

47 changes: 0 additions & 47 deletions src/Output/ProcessOutput.php

This file was deleted.

32 changes: 32 additions & 0 deletions src/Output/SerializableException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace Spatie\Async\Output;

use Throwable;

class SerializableException
{
/** @var string */
protected $class;

/** @var string */
protected $message;

/** @var string */
protected $trace;

public function __construct(Throwable $exception)
{
$this->class = get_class($exception);
$this->message = $exception->getMessage();
$this->trace = $exception->getTraceAsString();
}

public function asThrowable(): Throwable
{
/** @var Throwable $throwable */
$throwable = new $this->class($this->message);

return $throwable;
}
}
13 changes: 13 additions & 0 deletions src/ParallelError.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Spatie\Async;

use Exception;

class ParallelError extends Exception
{
public static function fromException($exception): self
{
return new self($exception);
}
}
Loading

0 comments on commit f6adaba

Please sign in to comment.