diff --git a/src/Projection/MariaDbProjectionManager.php b/src/Projection/MariaDbProjectionManager.php index 37bfea6..63d6af6 100644 --- a/src/Projection/MariaDbProjectionManager.php +++ b/src/Projection/MariaDbProjectionManager.php @@ -94,6 +94,7 @@ public function createProjection( $options[PdoEventStoreProjector::OPTION_CACHE_SIZE] ?? PdoEventStoreProjector::DEFAULT_CACHE_SIZE, $options[PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreProjector::DEFAULT_PERSIST_BLOCK_SIZE, $options[PdoEventStoreProjector::OPTION_SLEEP] ?? PdoEventStoreProjector::DEFAULT_SLEEP, + $options[PdoEventStoreProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreProjector::DEFAULT_LOAD_COUNT, $options[PdoEventStoreProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreProjector::DEFAULT_PCNTL_DISPATCH, $options[PdoEventStoreProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreProjector::DEFAULT_UPDATE_LOCK_THRESHOLD, $options[PdoEventStoreProjector::OPTION_GAP_DETECTION] ?? null @@ -115,6 +116,7 @@ public function createReadModelProjection( $options[PdoEventStoreReadModelProjector::OPTION_LOCK_TIMEOUT_MS] ?? PdoEventStoreReadModelProjector::DEFAULT_LOCK_TIMEOUT_MS, $options[PdoEventStoreReadModelProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreReadModelProjector::DEFAULT_PERSIST_BLOCK_SIZE, $options[PdoEventStoreReadModelProjector::OPTION_SLEEP] ?? PdoEventStoreReadModelProjector::DEFAULT_SLEEP, + $options[PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreReadModelProjector::DEFAULT_LOAD_COUNT, $options[PdoEventStoreReadModelProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreReadModelProjector::DEFAULT_PCNTL_DISPATCH, $options[PdoEventStoreReadModelProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreReadModelProjector::DEFAULT_UPDATE_LOCK_THRESHOLD, $options[PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION] ?? null diff --git a/src/Projection/MySqlProjectionManager.php b/src/Projection/MySqlProjectionManager.php index fa35b30..e64e7eb 100644 --- a/src/Projection/MySqlProjectionManager.php +++ b/src/Projection/MySqlProjectionManager.php @@ -95,6 +95,7 @@ public function createProjection( $options[PdoEventStoreProjector::OPTION_CACHE_SIZE] ?? PdoEventStoreProjector::DEFAULT_CACHE_SIZE, $options[PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreProjector::DEFAULT_PERSIST_BLOCK_SIZE, $options[PdoEventStoreProjector::OPTION_SLEEP] ?? PdoEventStoreProjector::DEFAULT_SLEEP, + $options[PdoEventStoreProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreProjector::DEFAULT_LOAD_COUNT, $options[PdoEventStoreProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreProjector::DEFAULT_PCNTL_DISPATCH, $options[PdoEventStoreProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreProjector::DEFAULT_UPDATE_LOCK_THRESHOLD, $options[PdoEventStoreProjector::OPTION_GAP_DETECTION] ?? null @@ -116,6 +117,7 @@ public function createReadModelProjection( $options[PdoEventStoreReadModelProjector::OPTION_LOCK_TIMEOUT_MS] ?? PdoEventStoreReadModelProjector::DEFAULT_LOCK_TIMEOUT_MS, $options[PdoEventStoreReadModelProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreReadModelProjector::DEFAULT_PERSIST_BLOCK_SIZE, $options[PdoEventStoreReadModelProjector::OPTION_SLEEP] ?? PdoEventStoreReadModelProjector::DEFAULT_SLEEP, + $options[PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreReadModelProjector::DEFAULT_LOAD_COUNT, $options[PdoEventStoreReadModelProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreReadModelProjector::DEFAULT_PCNTL_DISPATCH, $options[PdoEventStoreReadModelProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreReadModelProjector::DEFAULT_UPDATE_LOCK_THRESHOLD, $options[PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION] ?? null diff --git a/src/Projection/PdoEventStoreProjector.php b/src/Projection/PdoEventStoreProjector.php index 7a228b2..0b0e3c8 100644 --- a/src/Projection/PdoEventStoreProjector.php +++ b/src/Projection/PdoEventStoreProjector.php @@ -40,6 +40,8 @@ final class PdoEventStoreProjector implements Projector { public const OPTION_GAP_DETECTION = 'gap_detection'; + public const OPTION_LOAD_COUNT = 'load_count'; + public const DEFAULT_LOAD_COUNT = null; use PostgresHelper { quoteIdent as pgQuoteIdent; @@ -136,6 +138,11 @@ final class PdoEventStoreProjector implements Projector */ private $sleep; + /** + * @var int|null + */ + private $loadCount; + /** * @var bool */ @@ -186,6 +193,7 @@ public function __construct( int $cacheSize, int $persistBlockSize, int $sleep, + int $loadCount = null, bool $triggerPcntlSignalDispatch = false, int $updateLockThreshold = 0, GapDetection $gapDetection = null @@ -203,6 +211,7 @@ public function __construct( $this->cachedStreamNames = new ArrayCache($cacheSize); $this->persistBlockSize = $persistBlockSize; $this->sleep = $sleep; + $this->loadCount = $loadCount; $this->status = ProjectionStatus::IDLE(); $this->triggerPcntlSignalDispatch = $triggerPcntlSignalDispatch; $this->updateLockThreshold = $updateLockThreshold; @@ -526,7 +535,7 @@ public function run(bool $keepRunning = true): void foreach ($this->streamPositions as $streamName => $position) { try { - $eventStreams[$streamName] = $this->eventStore->load(new StreamName($streamName), $position + 1, null, $this->metadataMatcher); + $eventStreams[$streamName] = $this->eventStore->load(new StreamName($streamName), $position + 1, $this->loadCount, $this->metadataMatcher); } catch (Exception\StreamNotFound $e) { // ignore continue; diff --git a/src/Projection/PdoEventStoreReadModelProjector.php b/src/Projection/PdoEventStoreReadModelProjector.php index a45fd18..836abc2 100644 --- a/src/Projection/PdoEventStoreReadModelProjector.php +++ b/src/Projection/PdoEventStoreReadModelProjector.php @@ -37,6 +37,8 @@ final class PdoEventStoreReadModelProjector implements ReadModelProjector { public const OPTION_GAP_DETECTION = 'gap_detection'; + public const OPTION_LOAD_COUNT = 'load_count'; + public const DEFAULT_LOAD_COUNT = null; use PostgresHelper { quoteIdent as pgQuoteIdent; @@ -132,6 +134,11 @@ final class PdoEventStoreReadModelProjector implements ReadModelProjector */ private $sleep; + /** + * @var int|null + */ + private $loadCount; + /** * @var bool */ @@ -177,6 +184,7 @@ public function __construct( int $lockTimeoutMs, int $persistBlockSize, int $sleep, + int $loadCount = null, bool $triggerPcntlSignalDispatch = false, int $updateLockThreshold = 0, GapDetection $gapDetection = null @@ -194,6 +202,7 @@ public function __construct( $this->lockTimeoutMs = $lockTimeoutMs; $this->persistBlockSize = $persistBlockSize; $this->sleep = $sleep; + $this->loadCount = $loadCount; $this->status = ProjectionStatus::IDLE(); $this->triggerPcntlSignalDispatch = $triggerPcntlSignalDispatch; $this->updateLockThreshold = $updateLockThreshold; @@ -489,7 +498,7 @@ public function run(bool $keepRunning = true): void foreach ($this->streamPositions as $streamName => $position) { try { - $eventStreams[$streamName] = $this->eventStore->load(new StreamName($streamName), $position + 1, null, $this->metadataMatcher); + $eventStreams[$streamName] = $this->eventStore->load(new StreamName($streamName), $position + 1, $this->loadCount, $this->metadataMatcher); } catch (Exception\StreamNotFound $e) { // ignore continue; diff --git a/src/Projection/PostgresProjectionManager.php b/src/Projection/PostgresProjectionManager.php index 6250487..81cd347 100644 --- a/src/Projection/PostgresProjectionManager.php +++ b/src/Projection/PostgresProjectionManager.php @@ -97,6 +97,7 @@ public function createProjection( $options[PdoEventStoreProjector::OPTION_CACHE_SIZE] ?? PdoEventStoreProjector::DEFAULT_CACHE_SIZE, $options[PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreProjector::DEFAULT_PERSIST_BLOCK_SIZE, $options[PdoEventStoreProjector::OPTION_SLEEP] ?? PdoEventStoreProjector::DEFAULT_SLEEP, + $options[PdoEventStoreProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreProjector::DEFAULT_LOAD_COUNT, $options[PdoEventStoreProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreProjector::DEFAULT_PCNTL_DISPATCH, $options[PdoEventStoreProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreProjector::DEFAULT_UPDATE_LOCK_THRESHOLD, $options[PdoEventStoreProjector::OPTION_GAP_DETECTION] ?? null @@ -118,6 +119,7 @@ public function createReadModelProjection( $options[PdoEventStoreReadModelProjector::OPTION_LOCK_TIMEOUT_MS] ?? PdoEventStoreReadModelProjector::DEFAULT_LOCK_TIMEOUT_MS, $options[PdoEventStoreReadModelProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreReadModelProjector::DEFAULT_PERSIST_BLOCK_SIZE, $options[PdoEventStoreReadModelProjector::OPTION_SLEEP] ?? PdoEventStoreReadModelProjector::DEFAULT_SLEEP, + $options[PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreReadModelProjector::DEFAULT_LOAD_COUNT, $options[PdoEventStoreReadModelProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreReadModelProjector::DEFAULT_PCNTL_DISPATCH, $options[PdoEventStoreReadModelProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreReadModelProjector::DEFAULT_UPDATE_LOCK_THRESHOLD, $options[PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION] ?? null