Skip to content

Commit

Permalink
feat(crons): Add max-workers option (#70261)
Browse files Browse the repository at this point in the history
Follow up to GH-64825
  • Loading branch information
evanpurkhiser authored May 3, 2024
1 parent f0156b9 commit ec26096
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
6 changes: 6 additions & 0 deletions src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ def ingest_monitors_options() -> list[click.Option]:
default=1,
help="Maximum time spent batching check-ins to batch before processing in parallel.",
),
click.Option(
["--max-workers", "max_workers"],
type=int,
default=None,
help="The maximum number of threads to spawn in parallel mode.",
),
]
return options

Expand Down
14 changes: 11 additions & 3 deletions src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,11 @@ class StoreMonitorCheckInStrategyFactory(ProcessingStrategyFactory[KafkaPayload]
Does the consumer process unrelated check-ins in parallel?
"""

max_workers: int | None = None
"""
Number of Executor workers to use when running in parallel
"""

max_batch_size = 500
"""
How many messages will be batched at once when in parallel mode.
Expand All @@ -944,6 +949,7 @@ def __init__(
mode: Literal["parallel", "serial"] | None = None,
max_batch_size: int | None = None,
max_batch_time: int | None = None,
max_workers: int | None = None,
) -> None:
if mode == "parallel":
self.parallel = True
Expand All @@ -952,13 +958,15 @@ def __init__(
self.max_batch_size = max_batch_size
if max_batch_time is not None:
self.max_batch_time = max_batch_time
if max_workers is not None:
self.max_workers = max_workers

def shutdown(self) -> None:
if self.parallel_executor:
self.parallel_executor.shutdown()

def create_paralell_worker(self, commit: Commit) -> ProcessingStrategy[KafkaPayload]:
self.parallel_executor = ThreadPoolExecutor()
def create_parallel_worker(self, commit: Commit) -> ProcessingStrategy[KafkaPayload]:
self.parallel_executor = ThreadPoolExecutor(max_workers=self.max_workers)

batch_processor = RunTask(
function=partial(process_batch, self.parallel_executor),
Expand All @@ -982,6 +990,6 @@ def create_with_partitions(
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
if self.parallel:
return self.create_paralell_worker(commit)
return self.create_parallel_worker(commit)
else:
return self.create_synchronous_worker(commit)
6 changes: 5 additions & 1 deletion tests/sentry/monitors/consumers/test_monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ def test_parallel(self, process_checkin_group) -> None:
Validates that the consumer in parallel mode correctly groups check-ins
into groups by their monitor slug / environment
"""
factory = StoreMonitorCheckInStrategyFactory(mode="parallel", max_batch_size=4)
factory = StoreMonitorCheckInStrategyFactory(
mode="parallel",
max_batch_size=4,
max_workers=1,
)
commit = mock.Mock()
consumer = factory.create_with_partitions(commit, {self.partition: 0})

Expand Down

0 comments on commit ec26096

Please sign in to comment.