Skip to content

Commit

Permalink
Merge pull request #242 from unixslayer/master
Browse files Browse the repository at this point in the history
enable to set amount of event being loaded in single run
  • Loading branch information
prolic authored Sep 8, 2022
2 parents 695a2f4 + 3882c81 commit 0a29d32
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/Projection/MariaDbProjectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public function createProjection(
$options[PdoEventStoreProjector::OPTION_CACHE_SIZE] ?? PdoEventStoreProjector::DEFAULT_CACHE_SIZE,
$options[PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreProjector::DEFAULT_PERSIST_BLOCK_SIZE,
$options[PdoEventStoreProjector::OPTION_SLEEP] ?? PdoEventStoreProjector::DEFAULT_SLEEP,
$options[PdoEventStoreProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreProjector::DEFAULT_LOAD_COUNT,
$options[PdoEventStoreProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreProjector::DEFAULT_PCNTL_DISPATCH,
$options[PdoEventStoreProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreProjector::DEFAULT_UPDATE_LOCK_THRESHOLD,
$options[PdoEventStoreProjector::OPTION_GAP_DETECTION] ?? null
Expand All @@ -115,6 +116,7 @@ public function createReadModelProjection(
$options[PdoEventStoreReadModelProjector::OPTION_LOCK_TIMEOUT_MS] ?? PdoEventStoreReadModelProjector::DEFAULT_LOCK_TIMEOUT_MS,
$options[PdoEventStoreReadModelProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreReadModelProjector::DEFAULT_PERSIST_BLOCK_SIZE,
$options[PdoEventStoreReadModelProjector::OPTION_SLEEP] ?? PdoEventStoreReadModelProjector::DEFAULT_SLEEP,
$options[PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreReadModelProjector::DEFAULT_LOAD_COUNT,
$options[PdoEventStoreReadModelProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreReadModelProjector::DEFAULT_PCNTL_DISPATCH,
$options[PdoEventStoreReadModelProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreReadModelProjector::DEFAULT_UPDATE_LOCK_THRESHOLD,
$options[PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION] ?? null
Expand Down
2 changes: 2 additions & 0 deletions src/Projection/MySqlProjectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public function createProjection(
$options[PdoEventStoreProjector::OPTION_CACHE_SIZE] ?? PdoEventStoreProjector::DEFAULT_CACHE_SIZE,
$options[PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreProjector::DEFAULT_PERSIST_BLOCK_SIZE,
$options[PdoEventStoreProjector::OPTION_SLEEP] ?? PdoEventStoreProjector::DEFAULT_SLEEP,
$options[PdoEventStoreProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreProjector::DEFAULT_LOAD_COUNT,
$options[PdoEventStoreProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreProjector::DEFAULT_PCNTL_DISPATCH,
$options[PdoEventStoreProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreProjector::DEFAULT_UPDATE_LOCK_THRESHOLD,
$options[PdoEventStoreProjector::OPTION_GAP_DETECTION] ?? null
Expand All @@ -116,6 +117,7 @@ public function createReadModelProjection(
$options[PdoEventStoreReadModelProjector::OPTION_LOCK_TIMEOUT_MS] ?? PdoEventStoreReadModelProjector::DEFAULT_LOCK_TIMEOUT_MS,
$options[PdoEventStoreReadModelProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreReadModelProjector::DEFAULT_PERSIST_BLOCK_SIZE,
$options[PdoEventStoreReadModelProjector::OPTION_SLEEP] ?? PdoEventStoreReadModelProjector::DEFAULT_SLEEP,
$options[PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreReadModelProjector::DEFAULT_LOAD_COUNT,
$options[PdoEventStoreReadModelProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreReadModelProjector::DEFAULT_PCNTL_DISPATCH,
$options[PdoEventStoreReadModelProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreReadModelProjector::DEFAULT_UPDATE_LOCK_THRESHOLD,
$options[PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION] ?? null
Expand Down
11 changes: 10 additions & 1 deletion src/Projection/PdoEventStoreProjector.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
final class PdoEventStoreProjector implements Projector
{
public const OPTION_GAP_DETECTION = 'gap_detection';
public const OPTION_LOAD_COUNT = 'load_count';
public const DEFAULT_LOAD_COUNT = null;

use PostgresHelper {
quoteIdent as pgQuoteIdent;
Expand Down Expand Up @@ -136,6 +138,11 @@ final class PdoEventStoreProjector implements Projector
*/
private $sleep;

/**
* @var int|null
*/
private $loadCount;

/**
* @var bool
*/
Expand Down Expand Up @@ -186,6 +193,7 @@ public function __construct(
int $cacheSize,
int $persistBlockSize,
int $sleep,
int $loadCount = null,
bool $triggerPcntlSignalDispatch = false,
int $updateLockThreshold = 0,
GapDetection $gapDetection = null
Expand All @@ -203,6 +211,7 @@ public function __construct(
$this->cachedStreamNames = new ArrayCache($cacheSize);
$this->persistBlockSize = $persistBlockSize;
$this->sleep = $sleep;
$this->loadCount = $loadCount;
$this->status = ProjectionStatus::IDLE();
$this->triggerPcntlSignalDispatch = $triggerPcntlSignalDispatch;
$this->updateLockThreshold = $updateLockThreshold;
Expand Down Expand Up @@ -526,7 +535,7 @@ public function run(bool $keepRunning = true): void

foreach ($this->streamPositions as $streamName => $position) {
try {
$eventStreams[$streamName] = $this->eventStore->load(new StreamName($streamName), $position + 1, null, $this->metadataMatcher);
$eventStreams[$streamName] = $this->eventStore->load(new StreamName($streamName), $position + 1, $this->loadCount, $this->metadataMatcher);
} catch (Exception\StreamNotFound $e) {
// ignore
continue;
Expand Down
11 changes: 10 additions & 1 deletion src/Projection/PdoEventStoreReadModelProjector.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
final class PdoEventStoreReadModelProjector implements ReadModelProjector
{
public const OPTION_GAP_DETECTION = 'gap_detection';
public const OPTION_LOAD_COUNT = 'load_count';
public const DEFAULT_LOAD_COUNT = null;

use PostgresHelper {
quoteIdent as pgQuoteIdent;
Expand Down Expand Up @@ -132,6 +134,11 @@ final class PdoEventStoreReadModelProjector implements ReadModelProjector
*/
private $sleep;

/**
* @var int|null
*/
private $loadCount;

/**
* @var bool
*/
Expand Down Expand Up @@ -177,6 +184,7 @@ public function __construct(
int $lockTimeoutMs,
int $persistBlockSize,
int $sleep,
int $loadCount = null,
bool $triggerPcntlSignalDispatch = false,
int $updateLockThreshold = 0,
GapDetection $gapDetection = null
Expand All @@ -194,6 +202,7 @@ public function __construct(
$this->lockTimeoutMs = $lockTimeoutMs;
$this->persistBlockSize = $persistBlockSize;
$this->sleep = $sleep;
$this->loadCount = $loadCount;
$this->status = ProjectionStatus::IDLE();
$this->triggerPcntlSignalDispatch = $triggerPcntlSignalDispatch;
$this->updateLockThreshold = $updateLockThreshold;
Expand Down Expand Up @@ -489,7 +498,7 @@ public function run(bool $keepRunning = true): void

foreach ($this->streamPositions as $streamName => $position) {
try {
$eventStreams[$streamName] = $this->eventStore->load(new StreamName($streamName), $position + 1, null, $this->metadataMatcher);
$eventStreams[$streamName] = $this->eventStore->load(new StreamName($streamName), $position + 1, $this->loadCount, $this->metadataMatcher);
} catch (Exception\StreamNotFound $e) {
// ignore
continue;
Expand Down
2 changes: 2 additions & 0 deletions src/Projection/PostgresProjectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public function createProjection(
$options[PdoEventStoreProjector::OPTION_CACHE_SIZE] ?? PdoEventStoreProjector::DEFAULT_CACHE_SIZE,
$options[PdoEventStoreProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreProjector::DEFAULT_PERSIST_BLOCK_SIZE,
$options[PdoEventStoreProjector::OPTION_SLEEP] ?? PdoEventStoreProjector::DEFAULT_SLEEP,
$options[PdoEventStoreProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreProjector::DEFAULT_LOAD_COUNT,
$options[PdoEventStoreProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreProjector::DEFAULT_PCNTL_DISPATCH,
$options[PdoEventStoreProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreProjector::DEFAULT_UPDATE_LOCK_THRESHOLD,
$options[PdoEventStoreProjector::OPTION_GAP_DETECTION] ?? null
Expand All @@ -118,6 +119,7 @@ public function createReadModelProjection(
$options[PdoEventStoreReadModelProjector::OPTION_LOCK_TIMEOUT_MS] ?? PdoEventStoreReadModelProjector::DEFAULT_LOCK_TIMEOUT_MS,
$options[PdoEventStoreReadModelProjector::OPTION_PERSIST_BLOCK_SIZE] ?? PdoEventStoreReadModelProjector::DEFAULT_PERSIST_BLOCK_SIZE,
$options[PdoEventStoreReadModelProjector::OPTION_SLEEP] ?? PdoEventStoreReadModelProjector::DEFAULT_SLEEP,
$options[PdoEventStoreReadModelProjector::OPTION_LOAD_COUNT] ?? PdoEventStoreReadModelProjector::DEFAULT_LOAD_COUNT,
$options[PdoEventStoreReadModelProjector::OPTION_PCNTL_DISPATCH] ?? PdoEventStoreReadModelProjector::DEFAULT_PCNTL_DISPATCH,
$options[PdoEventStoreReadModelProjector::OPTION_UPDATE_LOCK_THRESHOLD] ?? PdoEventStoreReadModelProjector::DEFAULT_UPDATE_LOCK_THRESHOLD,
$options[PdoEventStoreReadModelProjector::OPTION_GAP_DETECTION] ?? null
Expand Down

0 comments on commit 0a29d32

Please sign in to comment.