anderson-ufrj
feat(security): implement API key rotation and advanced rate limiting
97c535b
raw
history blame
7.9 kB
"""
Module: infrastructure.queue.celery_app
Description: Celery application configuration and task definitions
Author: Anderson H. Silva
Date: 2025-01-25
License: Proprietary - All rights reserved
"""
import os
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from functools import wraps
from celery import Celery, Task
from celery.utils.log import get_task_logger
from kombu import Queue, Exchange
from src.core.config import get_settings
from src.infrastructure.queue.priority_queue import priority_queue, TaskPriority
# Get settings
settings = get_settings()
# Configure Celery
celery_app = Celery(
"cidadao_ai",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
include=[
"src.infrastructure.queue.tasks.investigation_tasks",
"src.infrastructure.queue.tasks.analysis_tasks",
"src.infrastructure.queue.tasks.report_tasks",
"src.infrastructure.queue.tasks.export_tasks",
"src.infrastructure.queue.tasks.monitoring_tasks",
"src.infrastructure.queue.tasks.maintenance_tasks",
]
)
# Celery configuration
celery_app.conf.update(
# Task execution
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="America/Sao_Paulo",
enable_utc=True,
# Task routing
task_routes={
"tasks.critical.*": {"queue": "critical"},
"tasks.high.*": {"queue": "high"},
"tasks.normal.*": {"queue": "default"},
"tasks.low.*": {"queue": "low"},
"tasks.background.*": {"queue": "background"},
},
# Performance
worker_prefetch_multiplier=4,
worker_max_tasks_per_child=1000,
# Result backend
result_expires=3600, # 1 hour
result_persistent=True,
# Task execution limits
task_soft_time_limit=300, # 5 minutes
task_time_limit=600, # 10 minutes
# Retries
task_acks_late=True,
task_reject_on_worker_lost=True,
)
# Define queues with priorities
celery_app.conf.task_queues = (
Queue("critical", Exchange("critical"), routing_key="critical", priority=10),
Queue("high", Exchange("high"), routing_key="high", priority=7),
Queue("default", Exchange("default"), routing_key="default", priority=5),
Queue("low", Exchange("low"), routing_key="low", priority=3),
Queue("background", Exchange("background"), routing_key="background", priority=1),
)
# Logger
logger = get_task_logger(__name__)
class BaseTask(Task):
"""Base task with error handling and monitoring."""
def __init__(self):
"""Initialize base task."""
super().__init__()
self._task_start_time = None
def before_start(self, task_id, args, kwargs):
"""Called before task execution."""
self._task_start_time = datetime.now()
logger.info(
"task_started",
task_id=task_id,
task_name=self.name,
args=args,
kwargs=kwargs
)
def on_success(self, retval, task_id, args, kwargs):
"""Called on successful task completion."""
duration = (datetime.now() - self._task_start_time).total_seconds()
logger.info(
"task_completed",
task_id=task_id,
task_name=self.name,
duration=duration,
result_size=len(str(retval)) if retval else 0
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Called on task failure."""
duration = (datetime.now() - self._task_start_time).total_seconds()
logger.error(
"task_failed",
task_id=task_id,
task_name=self.name,
duration=duration,
error=str(exc),
exc_info=einfo
)
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""Called when task is retried."""
logger.warning(
"task_retry",
task_id=task_id,
task_name=self.name,
error=str(exc),
retry_count=self.request.retries
)
# Set default base task
celery_app.Task = BaseTask
def priority_task(priority: TaskPriority = TaskPriority.NORMAL):
"""Decorator to create priority-aware tasks."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Extract task metadata
task_id = kwargs.pop("task_id", None)
callback_url = kwargs.pop("callback_url", None)
# Execute task
result = func(*args, **kwargs)
# Handle callback if provided
if callback_url and task_id:
send_task_callback.delay(
task_id=task_id,
callback_url=callback_url,
result=result,
status="completed"
)
return result
# Set task options based on priority
queue_name = {
TaskPriority.CRITICAL: "critical",
TaskPriority.HIGH: "high",
TaskPriority.NORMAL: "default",
TaskPriority.LOW: "low",
TaskPriority.BACKGROUND: "background"
}.get(priority, "default")
task_options = {
"queue": queue_name,
"priority": priority.value,
"max_retries": 3,
"default_retry_delay": 60, # 1 minute
}
# Create Celery task
return celery_app.task(**task_options)(wrapper)
return decorator
@celery_app.task(name="tasks.send_callback", queue="high")
def send_task_callback(
task_id: str,
callback_url: str,
result: Any,
status: str
) -> Dict[str, Any]:
"""Send task completion callback."""
import httpx
try:
with httpx.Client() as client:
response = client.post(
callback_url,
json={
"task_id": task_id,
"status": status,
"result": result,
"completed_at": datetime.now().isoformat()
},
timeout=30.0
)
return {
"success": response.status_code < 400,
"status_code": response.status_code
}
except Exception as e:
logger.error(
"callback_failed",
task_id=task_id,
callback_url=callback_url,
error=str(e)
)
return {"success": False, "error": str(e)}
@celery_app.task(name="tasks.cleanup_old_results", queue="background")
def cleanup_old_results(days: int = 7) -> Dict[str, Any]:
"""Clean up old task results."""
cutoff_date = datetime.now() - timedelta(days=days)
# This would integrate with your result backend
# For now, just log the action
logger.info(
"cleanup_started",
cutoff_date=cutoff_date.isoformat(),
days=days
)
return {
"status": "completed",
"cutoff_date": cutoff_date.isoformat()
}
# Schedule periodic tasks
celery_app.conf.beat_schedule = {
"cleanup-old-results": {
"task": "tasks.cleanup_old_results",
"schedule": timedelta(hours=24), # Daily
"args": (7,) # Keep 7 days
},
"health-check": {
"task": "tasks.health_check",
"schedule": timedelta(minutes=5), # Every 5 minutes
}
}
@celery_app.task(name="tasks.health_check", queue="high")
def health_check() -> Dict[str, Any]:
"""Periodic health check task."""
stats = celery_app.control.inspect().stats()
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"workers": len(stats) if stats else 0
}
def get_celery_app() -> Celery:
"""Get Celery application instance."""
return celery_app