|
|
""" |
|
|
Module: infrastructure.queue.celery_app |
|
|
Description: Celery application configuration and task definitions |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-25 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
import os |
|
|
from typing import Dict, Any, Optional |
|
|
from datetime import datetime, timedelta |
|
|
from functools import wraps |
|
|
|
|
|
from celery import Celery, Task |
|
|
from celery.utils.log import get_task_logger |
|
|
from kombu import Queue, Exchange |
|
|
|
|
|
from src.core.config import get_settings |
|
|
from src.infrastructure.queue.priority_queue import priority_queue, TaskPriority |
|
|
|
|
|
|
|
|
settings = get_settings() |
|
|
|
|
|
|
|
|
celery_app = Celery( |
|
|
"cidadao_ai", |
|
|
broker=settings.REDIS_URL, |
|
|
backend=settings.REDIS_URL, |
|
|
include=[ |
|
|
"src.infrastructure.queue.tasks.investigation_tasks", |
|
|
"src.infrastructure.queue.tasks.analysis_tasks", |
|
|
"src.infrastructure.queue.tasks.report_tasks", |
|
|
"src.infrastructure.queue.tasks.export_tasks", |
|
|
"src.infrastructure.queue.tasks.monitoring_tasks", |
|
|
"src.infrastructure.queue.tasks.maintenance_tasks", |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
celery_app.conf.update( |
|
|
|
|
|
task_serializer="json", |
|
|
accept_content=["json"], |
|
|
result_serializer="json", |
|
|
timezone="America/Sao_Paulo", |
|
|
enable_utc=True, |
|
|
|
|
|
|
|
|
task_routes={ |
|
|
"tasks.critical.*": {"queue": "critical"}, |
|
|
"tasks.high.*": {"queue": "high"}, |
|
|
"tasks.normal.*": {"queue": "default"}, |
|
|
"tasks.low.*": {"queue": "low"}, |
|
|
"tasks.background.*": {"queue": "background"}, |
|
|
}, |
|
|
|
|
|
|
|
|
worker_prefetch_multiplier=4, |
|
|
worker_max_tasks_per_child=1000, |
|
|
|
|
|
|
|
|
result_expires=3600, |
|
|
result_persistent=True, |
|
|
|
|
|
|
|
|
task_soft_time_limit=300, |
|
|
task_time_limit=600, |
|
|
|
|
|
|
|
|
task_acks_late=True, |
|
|
task_reject_on_worker_lost=True, |
|
|
) |
|
|
|
|
|
|
|
|
celery_app.conf.task_queues = ( |
|
|
Queue("critical", Exchange("critical"), routing_key="critical", priority=10), |
|
|
Queue("high", Exchange("high"), routing_key="high", priority=7), |
|
|
Queue("default", Exchange("default"), routing_key="default", priority=5), |
|
|
Queue("low", Exchange("low"), routing_key="low", priority=3), |
|
|
Queue("background", Exchange("background"), routing_key="background", priority=1), |
|
|
) |
|
|
|
|
|
|
|
|
logger = get_task_logger(__name__) |
|
|
|
|
|
|
|
|
class BaseTask(Task): |
|
|
"""Base task with error handling and monitoring.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize base task.""" |
|
|
super().__init__() |
|
|
self._task_start_time = None |
|
|
|
|
|
def before_start(self, task_id, args, kwargs): |
|
|
"""Called before task execution.""" |
|
|
self._task_start_time = datetime.now() |
|
|
logger.info( |
|
|
"task_started", |
|
|
task_id=task_id, |
|
|
task_name=self.name, |
|
|
args=args, |
|
|
kwargs=kwargs |
|
|
) |
|
|
|
|
|
def on_success(self, retval, task_id, args, kwargs): |
|
|
"""Called on successful task completion.""" |
|
|
duration = (datetime.now() - self._task_start_time).total_seconds() |
|
|
logger.info( |
|
|
"task_completed", |
|
|
task_id=task_id, |
|
|
task_name=self.name, |
|
|
duration=duration, |
|
|
result_size=len(str(retval)) if retval else 0 |
|
|
) |
|
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo): |
|
|
"""Called on task failure.""" |
|
|
duration = (datetime.now() - self._task_start_time).total_seconds() |
|
|
logger.error( |
|
|
"task_failed", |
|
|
task_id=task_id, |
|
|
task_name=self.name, |
|
|
duration=duration, |
|
|
error=str(exc), |
|
|
exc_info=einfo |
|
|
) |
|
|
|
|
|
def on_retry(self, exc, task_id, args, kwargs, einfo): |
|
|
"""Called when task is retried.""" |
|
|
logger.warning( |
|
|
"task_retry", |
|
|
task_id=task_id, |
|
|
task_name=self.name, |
|
|
error=str(exc), |
|
|
retry_count=self.request.retries |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
celery_app.Task = BaseTask |
|
|
|
|
|
|
|
|
def priority_task(priority: TaskPriority = TaskPriority.NORMAL): |
|
|
"""Decorator to create priority-aware tasks.""" |
|
|
def decorator(func): |
|
|
@wraps(func) |
|
|
def wrapper(*args, **kwargs): |
|
|
|
|
|
task_id = kwargs.pop("task_id", None) |
|
|
callback_url = kwargs.pop("callback_url", None) |
|
|
|
|
|
|
|
|
result = func(*args, **kwargs) |
|
|
|
|
|
|
|
|
if callback_url and task_id: |
|
|
send_task_callback.delay( |
|
|
task_id=task_id, |
|
|
callback_url=callback_url, |
|
|
result=result, |
|
|
status="completed" |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
queue_name = { |
|
|
TaskPriority.CRITICAL: "critical", |
|
|
TaskPriority.HIGH: "high", |
|
|
TaskPriority.NORMAL: "default", |
|
|
TaskPriority.LOW: "low", |
|
|
TaskPriority.BACKGROUND: "background" |
|
|
}.get(priority, "default") |
|
|
|
|
|
task_options = { |
|
|
"queue": queue_name, |
|
|
"priority": priority.value, |
|
|
"max_retries": 3, |
|
|
"default_retry_delay": 60, |
|
|
} |
|
|
|
|
|
|
|
|
return celery_app.task(**task_options)(wrapper) |
|
|
|
|
|
return decorator |
|
|
|
|
|
|
|
|
@celery_app.task(name="tasks.send_callback", queue="high") |
|
|
def send_task_callback( |
|
|
task_id: str, |
|
|
callback_url: str, |
|
|
result: Any, |
|
|
status: str |
|
|
) -> Dict[str, Any]: |
|
|
"""Send task completion callback.""" |
|
|
import httpx |
|
|
|
|
|
try: |
|
|
with httpx.Client() as client: |
|
|
response = client.post( |
|
|
callback_url, |
|
|
json={ |
|
|
"task_id": task_id, |
|
|
"status": status, |
|
|
"result": result, |
|
|
"completed_at": datetime.now().isoformat() |
|
|
}, |
|
|
timeout=30.0 |
|
|
) |
|
|
|
|
|
return { |
|
|
"success": response.status_code < 400, |
|
|
"status_code": response.status_code |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"callback_failed", |
|
|
task_id=task_id, |
|
|
callback_url=callback_url, |
|
|
error=str(e) |
|
|
) |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
|
|
|
@celery_app.task(name="tasks.cleanup_old_results", queue="background") |
|
|
def cleanup_old_results(days: int = 7) -> Dict[str, Any]: |
|
|
"""Clean up old task results.""" |
|
|
cutoff_date = datetime.now() - timedelta(days=days) |
|
|
|
|
|
|
|
|
|
|
|
logger.info( |
|
|
"cleanup_started", |
|
|
cutoff_date=cutoff_date.isoformat(), |
|
|
days=days |
|
|
) |
|
|
|
|
|
return { |
|
|
"status": "completed", |
|
|
"cutoff_date": cutoff_date.isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
celery_app.conf.beat_schedule = { |
|
|
"cleanup-old-results": { |
|
|
"task": "tasks.cleanup_old_results", |
|
|
"schedule": timedelta(hours=24), |
|
|
"args": (7,) |
|
|
}, |
|
|
"health-check": { |
|
|
"task": "tasks.health_check", |
|
|
"schedule": timedelta(minutes=5), |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@celery_app.task(name="tasks.health_check", queue="high") |
|
|
def health_check() -> Dict[str, Any]: |
|
|
"""Periodic health check task.""" |
|
|
stats = celery_app.control.inspect().stats() |
|
|
|
|
|
return { |
|
|
"status": "healthy", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"workers": len(stats) if stats else 0 |
|
|
} |
|
|
|
|
|
|
|
|
def get_celery_app() -> Celery: |
|
|
"""Get Celery application instance.""" |
|
|
return celery_app |