From ec26096864824dafb87d65f66b37f182af73ffda Mon Sep 17 00:00:00 2001 From: Evan Purkhiser Date: Fri, 3 May 2024 14:38:21 -0400 Subject: [PATCH] feat(crons): Add max-workers option (#70261) Follow up to GH-64825 --- src/sentry/consumers/__init__.py | 6 ++++++ src/sentry/monitors/consumers/monitor_consumer.py | 14 +++++++++++--- .../monitors/consumers/test_monitor_consumer.py | 6 +++++- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 0756ee23f1ac83..bb7d6d1e0f0999 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -124,6 +124,12 @@ def ingest_monitors_options() -> list[click.Option]: default=1, help="Maximum time spent batching check-ins to batch before processing in parallel.", ), + click.Option( + ["--max-workers", "max_workers"], + type=int, + default=None, + help="The maximum number of threads to spawn in parallel mode.", + ), ] return options diff --git a/src/sentry/monitors/consumers/monitor_consumer.py b/src/sentry/monitors/consumers/monitor_consumer.py index a61fe91a82f0b4..79470956a0a8ce 100644 --- a/src/sentry/monitors/consumers/monitor_consumer.py +++ b/src/sentry/monitors/consumers/monitor_consumer.py @@ -929,6 +929,11 @@ class StoreMonitorCheckInStrategyFactory(ProcessingStrategyFactory[KafkaPayload] Does the consumer process unrelated check-ins in parallel? """ + max_workers: int | None = None + """ + Number of Executor workers to use when running in parallel + """ + max_batch_size = 500 """ How many messages will be batched at once when in parallel mode. @@ -944,6 +949,7 @@ def __init__( mode: Literal["parallel", "serial"] | None = None, max_batch_size: int | None = None, max_batch_time: int | None = None, + max_workers: int | None = None, ) -> None: if mode == "parallel": self.parallel = True @@ -952,13 +958,15 @@ def __init__( self.max_batch_size = max_batch_size if max_batch_time is not None: self.max_batch_time = max_batch_time + if max_workers is not None: + self.max_workers = max_workers def shutdown(self) -> None: if self.parallel_executor: self.parallel_executor.shutdown() - def create_paralell_worker(self, commit: Commit) -> ProcessingStrategy[KafkaPayload]: - self.parallel_executor = ThreadPoolExecutor() + def create_parallel_worker(self, commit: Commit) -> ProcessingStrategy[KafkaPayload]: + self.parallel_executor = ThreadPoolExecutor(max_workers=self.max_workers) batch_processor = RunTask( function=partial(process_batch, self.parallel_executor), @@ -982,6 +990,6 @@ def create_with_partitions( partitions: Mapping[Partition, int], ) -> ProcessingStrategy[KafkaPayload]: if self.parallel: - return self.create_paralell_worker(commit) + return self.create_parallel_worker(commit) else: return self.create_synchronous_worker(commit) diff --git a/tests/sentry/monitors/consumers/test_monitor_consumer.py b/tests/sentry/monitors/consumers/test_monitor_consumer.py index cb0b152a13b4fa..4fc5b278b288ab 100644 --- a/tests/sentry/monitors/consumers/test_monitor_consumer.py +++ b/tests/sentry/monitors/consumers/test_monitor_consumer.py @@ -164,7 +164,11 @@ def test_parallel(self, process_checkin_group) -> None: Validates that the consumer in parallel mode correctly groups check-ins into groups by their monitor slug / environment """ - factory = StoreMonitorCheckInStrategyFactory(mode="parallel", max_batch_size=4) + factory = StoreMonitorCheckInStrategyFactory( + mode="parallel", + max_batch_size=4, + max_workers=1, + ) commit = mock.Mock() consumer = factory.create_with_partitions(commit, {self.partition: 0})