Skip to content

Commit

Permalink
add stream store
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Jul 24, 2024
1 parent e2232ef commit 767302d
Show file tree
Hide file tree
Showing 12 changed files with 1,110 additions and 37 deletions.
7 changes: 4 additions & 3 deletions src/Console/OutputStyle.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Patchlevel\EventSourcing\Serializer\Encoder\Encoder;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Symfony\Component\Console\Style\SymfonyStyle;
use Throwable;
Expand Down Expand Up @@ -47,7 +48,8 @@ public function message(

$customHeaders = array_filter(
$message->headers(),
static fn ($header) => !$header instanceof AggregateHeader
static fn ($header) => !$header instanceof StreamHeader
&& !$header instanceof AggregateHeader
&& !$header instanceof ArchivedHeader
&& !$header instanceof StreamStartHeader,
);
Expand All @@ -59,8 +61,7 @@ public function message(
$this->title($data->name);
$this->horizontalTable(
[
'aggregateName',
'aggregateId',
'stream',
'playhead',
'recordedOn',
'streamStart',
Expand Down
46 changes: 34 additions & 12 deletions src/Message/Translator/RecalculatePlayheadTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;

use function array_key_exists;
use function sprintf;

final class RecalculatePlayheadTranslator implements Translator
{
Expand All @@ -17,14 +20,37 @@ final class RecalculatePlayheadTranslator implements Translator
/** @return list<Message> */
public function __invoke(Message $message): array
{
$header = $message->header(AggregateHeader::class);
$playhead = $this->nextPlayhead($header->aggregateName, $header->aggregateId);
try {
$header = $message->header(AggregateHeader::class);
} catch (HeaderNotFound) {
try {
$header = $message->header(StreamHeader::class);
} catch (HeaderNotFound) {
return [$message];
}
}

if ($header instanceof StreamHeader) {
$stream = $header->streamName;
} else {
$stream = sprintf('%s-%s', $header->aggregateName, $header->aggregateId);
}

$playhead = $this->nextPlayhead($stream);

if ($header->playhead === $playhead) {
return [$message];
}

$header = $message->header(AggregateHeader::class);
if ($header instanceof StreamHeader) {
return [
$message->withHeader(new StreamHeader(
$header->streamName,
$playhead,
$header->recordedOn,
)),
];
}

return [
$message->withHeader(new AggregateHeader(
Expand All @@ -42,18 +68,14 @@ public function reset(): void
}

/** @return positive-int */

Check failure on line 70 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MoreSpecificReturnType

src/Message/Translator/RecalculatePlayheadTranslator.php:70:17: MoreSpecificReturnType: The declared return type 'int<1, max>' for Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator::nextPlayhead is more specific than the inferred return type 'float|int' (see https://psalm.dev/070)
private function nextPlayhead(string $aggregateName, string $aggregateId): int
private function nextPlayhead(string $stream): int
{
if (!array_key_exists($aggregateName, $this->index)) {
$this->index[$aggregateName] = [];
}

if (!array_key_exists($aggregateId, $this->index[$aggregateName])) {
$this->index[$aggregateName][$aggregateId] = 1;
if (!array_key_exists($stream, $this->index)) {
$this->index[$stream] = 1;

Check failure on line 74 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Property Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator::$index (array<string, array<string, int<1, max>>>) does not accept non-empty-array<string, 1|array<string, int<1, max>>>.

Check failure on line 74 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

InvalidPropertyAssignmentValue

src/Message/Translator/RecalculatePlayheadTranslator.php:74:13: InvalidPropertyAssignmentValue: $this->index with declared type 'array<string, array<string, int<1, max>>>' cannot be assigned type 'non-empty-array<string, 1|array<string, int<1, max>>>' (see https://psalm.dev/145)
} else {
$this->index[$aggregateName][$aggregateId]++;
$this->index[$stream]++;

Check failure on line 76 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Cannot use ++ on array<string, int<1, max>>.

Check failure on line 76 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

InvalidOperand

src/Message/Translator/RecalculatePlayheadTranslator.php:76:13: InvalidOperand: Cannot add an array to a non-array 1 (see https://psalm.dev/058)

Check failure on line 76 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

InvalidPropertyAssignmentValue

src/Message/Translator/RecalculatePlayheadTranslator.php:76:13: InvalidPropertyAssignmentValue: $this->index with declared type 'array<string, array<string, int<1, max>>>' cannot be assigned type 'non-empty-array<string, array<string, int<1, max>>|float|int>' (see https://psalm.dev/145)
}

return $this->index[$aggregateName][$aggregateId];
return $this->index[$stream];

Check failure on line 79 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

LessSpecificReturnStatement

src/Message/Translator/RecalculatePlayheadTranslator.php:79:16: LessSpecificReturnStatement: The type 'float|int' is more general than the declared return type 'int<1, max>' for Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator::nextPlayhead (see https://psalm.dev/129)
}
}
14 changes: 13 additions & 1 deletion src/Message/Translator/UntilEventTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

use DateTimeImmutable;
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;

final class UntilEventTranslator implements Translator
{
Expand All @@ -18,7 +20,17 @@ public function __construct(
/** @return list<Message> */
public function __invoke(Message $message): array
{
$recordedOn = $message->header(AggregateHeader::class)->recordedOn;
try {
$header = $message->header(AggregateHeader::class);
} catch (HeaderNotFound) {
try {
$header = $message->header(StreamHeader::class);
} catch (HeaderNotFound) {
return [$message];
}
}

$recordedOn = $header->recordedOn;

if ($recordedOn < $this->until) {
return [$message];
Expand Down
2 changes: 2 additions & 0 deletions src/Metadata/Message/MessageHeaderRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Debug\Trace\TraceHeader;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\StreamStartHeader;

use function array_flip;
Expand Down Expand Up @@ -73,6 +74,7 @@ public function headerNames(): array
public static function createWithInternalHeaders(array $headerNameToClassMap = []): self
{
$internalHeaders = [
'stream' => StreamHeader::class,
'aggregate' => AggregateHeader::class,
'trace' => TraceHeader::class,
'archived' => ArchivedHeader::class,
Expand Down
90 changes: 70 additions & 20 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Psr\Clock\ClockInterface;
use Psr\Log\LoggerInterface;
Expand All @@ -43,6 +45,8 @@ final class DefaultRepository implements Repository
/** @var WeakMap<T, bool> */
private WeakMap $aggregateIsValid;

private bool $useStreamHeader;

/** @param AggregateRootMetadata<T> $metadata */
public function __construct(
private Store $store,
Expand All @@ -56,6 +60,7 @@ public function __construct(
$this->clock = $clock ?? new SystemClock();
$this->logger = $logger ?? new NullLogger();
$this->aggregateIsValid = new WeakMap();
$this->useStreamHeader = $store instanceof StreamDoctrineDbalStore;
}

/** @return T */
Expand Down Expand Up @@ -103,11 +108,18 @@ public function load(AggregateRootId $id): AggregateRoot
}
}

$criteria = (new CriteriaBuilder())
->aggregateName($this->metadata->name)
->aggregateId($id->toString())
->archived(false)
->build();
if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName($this->streamName($this->metadata->name, $id->toString()))
->archived(false)
->build();
} else {
$criteria = (new CriteriaBuilder())
->aggregateName($this->metadata->name)
->aggregateId($id->toString())
->archived(false)
->build();
}

$stream = null;

Expand All @@ -128,10 +140,19 @@ public function load(AggregateRootId $id): AggregateRoot
throw new AggregateNotFound($this->metadata->className, $id);
}

$aggregateHeader = $firstMessage->header(AggregateHeader::class);
if ($this->useStreamHeader) {
$playhead = $firstMessage->header(StreamHeader::class)->playhead;

if ($playhead === null) {
throw new AggregateNotFound($this->metadata->className, $id);
}
} else {
$playhead = $firstMessage->header(AggregateHeader::class)->playhead;
}

$aggregate = $this->metadata->className::createFromEvents(
$this->unpack($stream),
$aggregateHeader->playhead - 1,
$playhead - 1,
);

if ($this->snapshotStore && $this->metadata->snapshot) {
Expand All @@ -156,10 +177,16 @@ public function load(AggregateRootId $id): AggregateRoot

public function has(AggregateRootId $id): bool
{
$criteria = (new CriteriaBuilder())
->aggregateName($this->metadata->name)
->aggregateId($id->toString())
->build();
if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName($this->streamName($this->metadata->name, $id->toString()))
->build();
} else {
$criteria = (new CriteriaBuilder())
->aggregateName($this->metadata->name)
->aggregateId($id->toString())
->build();
}

return $this->store->count($criteria) > 0;
}
Expand Down Expand Up @@ -217,15 +244,26 @@ public function save(AggregateRoot $aggregate): void

$aggregateName = $this->metadata->name;

$useStreamHeader = $this->useStreamHeader;

$messages = array_map(
static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $messageDecorator, $clock) {
$message = Message::create($event)
->withHeader(new AggregateHeader(
static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $messageDecorator, $clock, $useStreamHeader) {
if ($useStreamHeader) {
$header = new StreamHeader(
sprintf('%s-%s', $aggregateName, $aggregateId),
++$playhead,
$clock->now(),
);
} else {
$header = new AggregateHeader(
$aggregateName,
$aggregateId,
++$playhead,
$clock->now(),
));
);
}

$message = Message::create($event)->withHeader($header);

if ($messageDecorator) {
return $messageDecorator($message);
Expand Down Expand Up @@ -291,11 +329,18 @@ private function loadFromSnapshot(string $aggregateClass, AggregateRootId $id):

$aggregate = $this->snapshotStore->load($aggregateClass, $id);

$criteria = (new CriteriaBuilder())
->aggregateName($this->metadata->name)
->aggregateId($id->toString())
->fromPlayhead($aggregate->playhead())
->build();
if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName($this->streamName($this->metadata->name, $id->toString()))
->fromPlayhead($aggregate->playhead())
->build();
} else {
$criteria = (new CriteriaBuilder())
->aggregateName($this->metadata->name)
->aggregateId($id->toString())
->fromPlayhead($aggregate->playhead())
->build();
}

$stream = null;

Expand Down Expand Up @@ -369,4 +414,9 @@ private function unpack(Stream $stream): Traversable
yield $message->event();
}
}

private function streamName(string $aggregateName, string $aggregateId): string
{
return sprintf('%s-%s', $aggregateName, $aggregateId);
}
}
12 changes: 12 additions & 0 deletions src/Store/Criteria/CriteriaBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@

final class CriteriaBuilder
{
private string|null $streamName = null;
private string|null $aggregateName = null;
private string|null $aggregateId = null;
private int|null $fromIndex = null;
private int|null $fromPlayhead = null;
private bool|null $archived = null;

public function streamName(string|null $streamName): self
{
$this->streamName = $streamName;

return $this;
}

public function aggregateName(string|null $aggregateName): self
{
$this->aggregateName = $aggregateName;
Expand Down Expand Up @@ -51,6 +59,10 @@ public function build(): Criteria
{
$criteria = [];

if ($this->streamName !== null) {
$criteria[] = new StreamCriterion($this->streamName);
}

if ($this->aggregateName !== null) {
$criteria[] = new AggregateNameCriterion($this->aggregateName);
}
Expand Down
13 changes: 13 additions & 0 deletions src/Store/Criteria/StreamCriterion.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Criteria;

final class StreamCriterion
{
public function __construct(
public readonly string $streamName,
) {
}
}
Loading

0 comments on commit 767302d

Please sign in to comment.