diff --git a/configs/celery.json b/configs/celery.json index d4e9131..29f2441 100644 --- a/configs/celery.json +++ b/configs/celery.json @@ -3,5 +3,8 @@ "broker_url": "pyamqp://guest@localhost:5672//", "result_backend": "file:///app/celery_results", "worker_prefetch_multiplier": 1, - "task_acks_late": true + "task_acks_late": true, + "task_time_limit": 7200, + "task_soft_time_limit": 3600, + "metrics_server_port": 8081 } \ No newline at end of file diff --git a/configs/supervisord.conf b/configs/supervisord.conf index 55b0742..b46f8a8 100644 --- a/configs/supervisord.conf +++ b/configs/supervisord.conf @@ -12,7 +12,7 @@ stdout_logfile_maxbytes=0 stderr_logfile_maxbytes=0 [program:celery] -command=celery -A src.task_manager.celery_tasks.celery_app worker --concurrency 1 -Q video_preview +command=celery -A src.task_manager.celery_tasks.celery_app worker --concurrency 3 -Q video_preview directory=/app autostart=true autorestart=true diff --git a/src/task_manager/celery_tasks/__init__.py b/src/task_manager/celery_tasks/__init__.py index 1ec309f..5093cd6 100644 --- a/src/task_manager/celery_tasks/__init__.py +++ b/src/task_manager/celery_tasks/__init__.py @@ -1,8 +1,10 @@ import os import json import logging +import threading from celery import Celery +from prometheus_client import start_http_server logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(__name__) @@ -20,6 +22,9 @@ def load_celery_config(): config['task_acks_late'] = os.getenv('CELERY_TASK_ACKS_LATE', str(config['task_acks_late']).lower()) in ['true', '1', 't', 'y', 'yes'] + config['task_time_limit'] = os.getenv('CELERY_TASK_TIME_LIMIT', config['task_time_limit']) + config['task_soft_time_limit'] = os.getenv('CELERY_TASK_SOFT_TIME_LIMIT', config['task_soft_time_limit']) + config['metrics_server_port'] = os.getenv('CELERY_METRICS_SERVER_PORT', config['metrics_server_port']) return config @@ -35,3 +40,13 @@ def load_tasks_route(): celery_app = Celery(__name__, include=["src.task_manager.celery_tasks.tasks"]) celery_app.conf.update(load_celery_config()) celery_app.conf.task_routes = load_tasks_route() + + +# Start the metrics server in a separate thread +def start_metrics_server(): + metrics_port = celery_config['metrics_server_port'] + logger.warning("Celery Metrics Server running on port: %s", metrics_port) + start_http_server(port=int(metrics_port)) + + +threading.Thread(target=start_metrics_server).start() diff --git a/src/task_manager/celery_tasks/tasks.py b/src/task_manager/celery_tasks/tasks.py index 762cd45..fccfbfc 100644 --- a/src/task_manager/celery_tasks/tasks.py +++ b/src/task_manager/celery_tasks/tasks.py @@ -1,9 +1,38 @@ +import time + from src.service.video_synthesis.video_preview import zhVideoPreview from src.task_manager.celery_tasks import celery_app +from prometheus_client import Counter, Histogram +from celery.exceptions import SoftTimeLimitExceeded + +# Define Prometheus metrics +VIDEO_PREVIEW_TASK_INVOKED = Counter( + 'video_preview_task_invoked_total', 'Total number of times video preview task is invoked') +VIDEO_PREVIEW_TASK_FAILED = Counter( + 'video_preview_task_failed_total', 'Total number of times video preview task failed') +VIDEO_PREVIEW_TASK_SOFT_TIMEOUT = Counter( + 'video_preview_task_soft_timeout_total', 'Total number of times video preview task failed due to soft timeout') +VIDEO_PREVIEW_TASK_DURATION = Histogram( + 'video_preview_task_duration_seconds', 'Duration of video preview task in seconds') + @celery_app.task(bind=True) def video_preview_task(self, video_path, voice_path, audio_bg_path, video_out_path): print(f"Invoke video preview task {self.request.id}.") - _ = zhVideoPreview(None, video_path, voice_path, audio_bg_path, - "暂时没有处理字幕文件,所以随便写", video_out_path) + VIDEO_PREVIEW_TASK_INVOKED.inc() + start_time = time.time() + + try: + _ = zhVideoPreview(None, video_path, voice_path, audio_bg_path, + "暂时没有处理字幕文件,所以随便写", video_out_path) + except SoftTimeLimitExceeded as soft_exception: + VIDEO_PREVIEW_TASK_SOFT_TIMEOUT.inc() + print(f"Invoke video preview task {self.request.id} failed with soft timeout: {soft_exception}") + except Exception as exception: + VIDEO_PREVIEW_TASK_FAILED.inc() + print(f"Invoke video preview task {self.request.id} failed with exception: {exception}") + finally: + duration = time.time() - start_time + VIDEO_PREVIEW_TASK_DURATION.observe(duration) + print(f"Invoke video preview task {self.request.id} took {duration:.2f} seconds.")