Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experimental] Stream Store #620

Merged
merged 15 commits into from
Jul 29, 2024
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,17 @@ test: phpunit
benchmark: vendor ## run benchmarks
DB_URL=sqlite3:///:memory: vendor/bin/phpbench run tests/Benchmark --report=default

.PHONY: benchmark-diff-test
benchmark-diff-test: vendor ## run benchmarks
.PHONY: benchmark-base
benchmark-base: vendor ## run benchmarks
DB_URL=sqlite3:///:memory: vendor/bin/phpbench run tests/Benchmark --revs=1 --report=default --progress=none --tag=base

.PHONY: benchmark-diff
benchmark-diff: vendor ## run benchmarks
DB_URL=sqlite3:///:memory: vendor/bin/phpbench run tests/Benchmark --revs=1 --report=diff --progress=none --ref=base

.PHONY: benchmark-diff-test
benchmark-diff-test: benchmark-base benchmark-diff ## run benchmarks

.PHONY: dev
dev: static test ## run dev tools

Expand Down
72 changes: 71 additions & 1 deletion baseline.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="5.23.1@8471a896ccea3526b26d082f4461eeea467f10a4">
<files psalm-version="5.25.0@01a8eb06b9e9cc6cfb6a320bf9fb14331919d505">
<file src="src/Aggregate/AggregateRootBehaviour.php">
<UnsafeInstantiation>
<code><![CDATA[new static()]]></code>
Expand Down Expand Up @@ -86,6 +86,11 @@
<code><![CDATA[$dateTimeType->convertToPHPValue($data['recorded_on'], $platform)]]></code>
</MixedArgument>
</file>
<file src="src/Store/StreamDoctrineDbalStoreStream.php">
<ArgumentTypeCoercion>
<code><![CDATA[$data['playhead'] === null ? null : (int)$data['playhead']]]></code>
</ArgumentTypeCoercion>
</file>
<file src="src/Subscription/Store/DoctrineSubscriptionStore.php">
<MixedArgument>
<code><![CDATA[$context]]></code>
Expand Down Expand Up @@ -131,6 +136,14 @@
<code><![CDATA[$store]]></code>
</MissingConstructor>
</file>
<file src="tests/Benchmark/SimpleSetupStreamStoreBench.php">
<MissingConstructor>
<code><![CDATA[$multipleEventsId]]></code>
<code><![CDATA[$repository]]></code>
<code><![CDATA[$singleEventId]]></code>
<code><![CDATA[$store]]></code>
</MissingConstructor>
</file>
<file src="tests/Benchmark/SnapshotsBench.php">
<MissingConstructor>
<code><![CDATA[$adapter]]></code>
Expand Down Expand Up @@ -320,6 +333,63 @@
)]]></code>
</InternalMethod>
</file>
<file src="tests/Unit/Store/StreamDoctrineDbalStoreTest.php">
<DeprecatedMethod>
<code><![CDATA[addMethods]]></code>
</DeprecatedMethod>
<InternalMethod>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
</InternalMethod>
</file>
<file src="tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php">
<PossiblyUndefinedArrayOffset>
<code><![CDATA[$update1]]></code>
Expand Down
6 changes: 6 additions & 0 deletions deptrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ deptrac:
collectors:
- type: directory
value: src/Subscription/.*
- name: Test
collectors:
- type: directory
value: src/Test/.*

ruleset:
Aggregate:
Expand Down Expand Up @@ -175,7 +179,9 @@ deptrac:
Store:
- Aggregate
- Attribute
- Clock
- Message
- Metadata
- Schema
- Serializer
Test:
117 changes: 96 additions & 21 deletions docs/pages/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ Each message contains an event and the associated headers.
More information about the message can be found [here](message.md).

The store is optimized to efficiently store and load events for aggregates.
We currently only offer one [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) store.

## Create DBAL connection

Expand All @@ -29,8 +28,14 @@ $connection = DriverManager::getConnection(

## Configure Store

We currently offer two stores, both based on the [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) library.
The default store is the `DoctrineDbalStore` and the new experimental store is the `StreamDoctrineDbalStore`.

### DoctrineDbalStore

This is the current default store for event sourcing.
You can create a store with the `DoctrineDbalStore` class.
The store needs a dbal connection, an event serializer, an aggregate registry and some options.
The store needs a dbal connection, an event serializer and has some optional parameters like options.

```php
use Doctrine\DBAL\Connection;
Expand All @@ -41,21 +46,17 @@ use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
$store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
null,
[/** options */],
);
```
Following options are available in `DoctrineDbalStore`:

| Option | Type | Default | Description |
|-------------------|------------------|------------|----------------------------------------------|
| table_name | string | eventstore | The name of the table in the database |
| aggregate_id_type | "uuid"|"string" | uuid | The type of the `aggregate_id` column |
| locking | bool | true | If the store should use locking for writing |
| lock_id | int | 133742 | The id of the lock |
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |

## Schema
| Option | Type | Default | Description |
|-------------------|-----------------|------------|----------------------------------------------|
| table_name | string | eventstore | The name of the table in the database |
| aggregate_id_type | "uuid"/"string" | uuid | The type of the `aggregate_id` column |
| locking | bool | true | If the store should use locking for writing |
| lock_id | int | 133742 | The id of the lock |
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |
DavidBadura marked this conversation as resolved.
Show resolved Hide resolved

The table structure of the `DoctrineDbalStore` looks like this:

Expand All @@ -72,13 +73,59 @@ The table structure of the `DoctrineDbalStore` looks like this:
| archived | bool | If the event is archived |
| custom_headers | json | Custom headers for the event |

With the help of the `SchemaDirector`, the database structure can be created, updated and deleted.

!!! note

The default type of the `aggregate_id` column is `uuid` if the database supports it and `string` if not.
You can change the type with the `aggregate_id_type` to `string` if you want use custom id.

### StreamDoctrineDbalStore

We offer a new experimental store called `StreamDoctrineDbalStore`.
This store is decoupled from the aggregate and can be used to store events from other sources.
The difference to the `DoctrineDbalStore` is that the `StreamDoctrineDbalStore` merge the aggregate id
and the aggregate name into one column named `stream`. Additionally, the column `playhead` is nullable.
This store introduces two new methods `streams` and `remove`.

The store needs a dbal connection, an event serializer and has some optional parameters like options.

```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;

/** @var Connection $connection */
$store = new StreamDoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
);
```
Following options are available in `StreamDoctrineDbalStore`:

| Option | Type | Default | Description |
|-------------------|-----------------|-------------|----------------------------------------------|
| table_name | string | event_store | The name of the table in the database |
| locking | bool | true | If the store should use locking for writing |
| lock_id | int | 133742 | The id of the lock |
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |

The table structure of the `StreamDoctrineDbalStore` looks like this:

| Column | Type | Description |
|------------------|----------|--------------------------------------------------|
| id | bigint | The index of the whole stream (autoincrement) |
| stream | string | The name of the stream |
| playhead | ?int | The current playhead of the aggregate |
| event | string | The name of the event |
| payload | json | The payload of the event |
| recorded_on | datetime | The date when the event was recorded |
| new_stream_start | bool | If the event is the first event of the aggregate |
| archived | bool | If the event is archived |
| custom_headers | json | Custom headers for the event |

## Schema

With the help of the `SchemaDirector`, the database structure can be created, updated and deleted.

!!! tip

You can also use doctrine migration to create and keep your schema in sync.
Expand All @@ -92,11 +139,11 @@ Additionally, it implements the `DryRunSchemaDirector` interface, to show the sq
```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Store\Store;

/**
* @var Connection $connection
* @var DoctrineDbalStore $store
* @var Store $store
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
Expand Down Expand Up @@ -179,13 +226,13 @@ use Doctrine\Migrations\DependencyFactory;
use Doctrine\Migrations\Provider\SchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineMigrationSchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Store\Store;

// event sourcing schema director configuration

/**
* @var Connection $connection
* @var DoctrineDbalStore $store
* @var Store $store
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
Expand Down Expand Up @@ -355,11 +402,39 @@ $store->save(...$messages);

Use transactional method if you want call multiple save methods in a transaction.

### Delete & Update
### Update

It is not possible to delete or update events.
It is not possible to update events.
In event sourcing, the events are immutable.

### Remove

You can remove a stream with the `remove` method.

```php
use Patchlevel\EventSourcing\Store\StreamStore;

/** @var StreamStore $store */
$store->remove('profile-*');
```
!!! note

The method is only available in the `StreamStore` like `StreamDoctrineDbalStore`.

### List Streams

You can list all streams with the `streams` method.

```php
use Patchlevel\EventSourcing\Store\StreamStore;

/** @var StreamStore $store */
$streams = $store->streams(); // ['profile-1', 'profile-2', 'profile-3']
```
!!! note

The method is only available in the `StreamStore` like `StreamDoctrineDbalStore`.

### Transaction

There is also the possibility of executing a function in a transaction.
Expand Down
4 changes: 2 additions & 2 deletions phpbench.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"partition": "subject_name",
"cols":
{
"time-diff": "percent_diff(partition['result_time_avg'][1], partition['result_time_avg'][0])"
"time-diff": "percent_diff(coalesce(partition['result_time_avg']?[1], 0), coalesce(partition['result_time_avg']?[0], 0))"
DavidBadura marked this conversation as resolved.
Show resolved Hide resolved
}
},
"memory":
Expand All @@ -61,7 +61,7 @@
"partition": "subject_name",
"cols":
{
"memory-diff": "percent_diff(partition['result_mem_peak'][1], partition['result_mem_peak'][0])"
"memory-diff": "percent_diff(coalesce(partition['result_mem_peak']?[1], 0), coalesce(partition['result_mem_peak']?[0], 0))"
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ parameters:
count: 1
path: src/Store/DoctrineDbalStoreStream.php

-
message: "#^Parameter \\#2 \\$playhead of class Patchlevel\\\\EventSourcing\\\\Store\\\\StreamHeader constructor expects int\\<1, max\\>\\|null, int\\|null given\\.$#"
count: 1
path: src/Store/StreamDoctrineDbalStoreStream.php

-
message: "#^Ternary operator condition is always true\\.$#"
count: 1
path: src/Store/StreamDoctrineDbalStoreStream.php

-
message: "#^Parameter \\#3 \\$errorContext of class Patchlevel\\\\EventSourcing\\\\Subscription\\\\SubscriptionError constructor expects array\\<int, array\\{class\\: class\\-string, message\\: string, code\\: int\\|string, file\\: string, line\\: int, trace\\: array\\<int, array\\{file\\?\\: string, line\\?\\: int, function\\?\\: string, class\\?\\: string, type\\?\\: string, args\\?\\: array\\}\\>\\}\\>\\|null, mixed given\\.$#"
count: 1
Expand Down
18 changes: 18 additions & 0 deletions src/Aggregate/InvalidAggregateStreamName.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Aggregate;

use RuntimeException;

use function sprintf;

/** @experimental */
final class InvalidAggregateStreamName extends RuntimeException
{
public function __construct(string $stream)
{
parent::__construct(sprintf('Invalid aggregate stream name "%s". Expected format is "[aggregateName]-[aggregateId]".', $stream));
}
}
Loading
Loading