Skip to content

Commit

Permalink
feat(distro): add new queue per distro
Browse files Browse the repository at this point in the history
  • Loading branch information
saber-solooki committed Jan 6, 2025
1 parent 3bacb07 commit 58c486c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
29 changes: 29 additions & 0 deletions arq/connections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import functools
import logging
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta
from operator import attrgetter
Expand Down Expand Up @@ -85,6 +86,9 @@ def __repr__(self) -> str:
else:
BaseRedis = Redis

enqueued_jobs = 0



class ArqRedis(BaseRedis):
"""
Expand Down Expand Up @@ -126,6 +130,7 @@ async def enqueue_job(
_defer_by: Union[None, int, float, timedelta] = None,
_expires: Union[None, int, float, timedelta] = None,
_job_try: Optional[int] = None,
distribution: str = None, # example 5:2
**kwargs: Any,
) -> Optional[Job]:
"""
Expand All @@ -143,8 +148,18 @@ async def enqueue_job(
:param kwargs: any keyword arguments to pass to the function
:return: :class:`arq.jobs.Job` instance or ``None`` if a job with this ID already exists
"""
global enqueued_jobs

if _queue_name is None:
_queue_name = self.default_queue_name

if distribution:
queue_index = self._get_queue_index(distribution)
_queue_name = f'{_queue_name}_{queue_index}'
if enqueued_jobs >= sys.maxsize:
enqueued_jobs = 0
enqueued_jobs += 1

job_id = _job_id or uuid4().hex
job_key = job_key_prefix + job_id
if _defer_until and _defer_by:
Expand Down Expand Up @@ -180,6 +195,20 @@ async def enqueue_job(
return None
return Job(job_id, redis=self, _queue_name=_queue_name, _deserializer=self.job_deserializer)

def _get_queue_index(self, distribution) -> int:
ratios = list(map(lambda x: int(x), distribution.split(':')))
ratios_sum = sum(ratios)
up_to_ratio = ratios[0]
queue_index = 0
for i, _ in enumerate(ratios, 1):
if enqueued_jobs % ratios_sum >= up_to_ratio:
up_to_ratio = up_to_ratio + ratios[i]
queue_index = i
else:
break

return queue_index

async def _get_job_result(self, key: bytes) -> JobResult:
job_id = key[len(result_key_prefix) :].decode()
job = Job(job_id, self, _deserializer=self.job_deserializer)
Expand Down
5 changes: 5 additions & 0 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def __init__(
self,
functions: Sequence[Union[Function, 'WorkerCoroutine']] = (),
*,
distribution_index: int = None,
queue_name: Optional[str] = default_queue_name,
cron_jobs: Optional[Sequence[CronJob]] = None,
redis_settings: Optional[RedisSettings] = None,
Expand Down Expand Up @@ -224,6 +225,10 @@ def __init__(
queue_name = redis_pool.default_queue_name
else:
raise ValueError('If queue_name is absent, redis_pool must be present.')

if distribution_index is not None:
queue_name = f'{queue_name}_{distribution_index}'

self.queue_name = queue_name
self.cron_jobs: List[CronJob] = []
if cron_jobs is not None:
Expand Down

0 comments on commit 58c486c

Please sign in to comment.