Skip to content

Commit

Permalink
Add metrics exports for celery task manager; Add soft timeout for cel…
Browse files Browse the repository at this point in the history
…ery tasks. change worker concurrency to 3. (#42)
  • Loading branch information
HanFa authored Sep 10, 2024
1 parent b35c175 commit ef42772
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 4 deletions.
5 changes: 4 additions & 1 deletion configs/celery.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
"broker_url": "pyamqp://guest@localhost:5672//",
"result_backend": "file:///app/celery_results",
"worker_prefetch_multiplier": 1,
"task_acks_late": true
"task_acks_late": true,
"task_time_limit": 7200,
"task_soft_time_limit": 3600,
"metrics_server_port": 8081
}
2 changes: 1 addition & 1 deletion configs/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0

[program:celery]
command=celery -A src.task_manager.celery_tasks.celery_app worker --concurrency 1 -Q video_preview
command=celery -A src.task_manager.celery_tasks.celery_app worker --concurrency 3 -Q video_preview
directory=/app
autostart=true
autorestart=true
Expand Down
15 changes: 15 additions & 0 deletions src/task_manager/celery_tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
import json
import logging
import threading

from celery import Celery
from prometheus_client import start_http_server

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
Expand All @@ -20,6 +22,9 @@ def load_celery_config():
config['task_acks_late'] = os.getenv('CELERY_TASK_ACKS_LATE', str(config['task_acks_late']).lower()) in ['true',
'1', 't',
'y', 'yes']
config['task_time_limit'] = os.getenv('CELERY_TASK_TIME_LIMIT', config['task_time_limit'])
config['task_soft_time_limit'] = os.getenv('CELERY_TASK_SOFT_TIME_LIMIT', config['task_soft_time_limit'])
config['metrics_server_port'] = os.getenv('CELERY_METRICS_SERVER_PORT', config['metrics_server_port'])
return config


Expand All @@ -35,3 +40,13 @@ def load_tasks_route():
celery_app = Celery(__name__, include=["src.task_manager.celery_tasks.tasks"])
celery_app.conf.update(load_celery_config())
celery_app.conf.task_routes = load_tasks_route()


# Start the metrics server in a separate thread
def start_metrics_server():
metrics_port = celery_config['metrics_server_port']
logger.warning("Celery Metrics Server running on port: %s", metrics_port)
start_http_server(port=int(metrics_port))


threading.Thread(target=start_metrics_server).start()
33 changes: 31 additions & 2 deletions src/task_manager/celery_tasks/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
import time

from src.service.video_synthesis.video_preview import zhVideoPreview
from src.task_manager.celery_tasks import celery_app

from prometheus_client import Counter, Histogram
from celery.exceptions import SoftTimeLimitExceeded

# Define Prometheus metrics
VIDEO_PREVIEW_TASK_INVOKED = Counter(
'video_preview_task_invoked_total', 'Total number of times video preview task is invoked')
VIDEO_PREVIEW_TASK_FAILED = Counter(
'video_preview_task_failed_total', 'Total number of times video preview task failed')
VIDEO_PREVIEW_TASK_SOFT_TIMEOUT = Counter(
'video_preview_task_soft_timeout_total', 'Total number of times video preview task failed due to soft timeout')
VIDEO_PREVIEW_TASK_DURATION = Histogram(
'video_preview_task_duration_seconds', 'Duration of video preview task in seconds')


@celery_app.task(bind=True)
def video_preview_task(self, video_path, voice_path, audio_bg_path, video_out_path):
print(f"Invoke video preview task {self.request.id}.")
_ = zhVideoPreview(None, video_path, voice_path, audio_bg_path,
"暂时没有处理字幕文件,所以随便写", video_out_path)
VIDEO_PREVIEW_TASK_INVOKED.inc()
start_time = time.time()

try:
_ = zhVideoPreview(None, video_path, voice_path, audio_bg_path,
"暂时没有处理字幕文件,所以随便写", video_out_path)
except SoftTimeLimitExceeded as soft_exception:
VIDEO_PREVIEW_TASK_SOFT_TIMEOUT.inc()
print(f"Invoke video preview task {self.request.id} failed with soft timeout: {soft_exception}")
except Exception as exception:
VIDEO_PREVIEW_TASK_FAILED.inc()
print(f"Invoke video preview task {self.request.id} failed with exception: {exception}")
finally:
duration = time.time() - start_time
VIDEO_PREVIEW_TASK_DURATION.observe(duration)
print(f"Invoke video preview task {self.request.id} took {duration:.2f} seconds.")

0 comments on commit ef42772

Please sign in to comment.