|
|
""" |
|
|
Module: services.batch_service |
|
|
Description: Batch processing service integrating Celery and priority queue |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-25 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, List, Optional, Callable |
|
|
from datetime import datetime, timedelta |
|
|
from enum import Enum |
|
|
import asyncio |
|
|
|
|
|
from pydantic import BaseModel, Field |
|
|
from celery import group, chain, chord |
|
|
from celery.result import AsyncResult |
|
|
|
|
|
from src.core import get_logger |
|
|
from src.infrastructure.queue.celery_app import celery_app, get_celery_app |
|
|
from src.infrastructure.queue.priority_queue import ( |
|
|
priority_queue, |
|
|
TaskPriority, |
|
|
TaskStatus, |
|
|
QueueStats |
|
|
) |
|
|
from src.infrastructure.queue.tasks import ( |
|
|
run_investigation, |
|
|
analyze_contracts_batch, |
|
|
detect_anomalies_batch, |
|
|
analyze_patterns, |
|
|
generate_report, |
|
|
export_to_pdf, |
|
|
monitor_anomalies |
|
|
) |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class BatchType(str, Enum): |
|
|
"""Batch processing types.""" |
|
|
INVESTIGATION = "investigation" |
|
|
ANALYSIS = "analysis" |
|
|
REPORT = "report" |
|
|
EXPORT = "export" |
|
|
MONITORING = "monitoring" |
|
|
|
|
|
|
|
|
class BatchJobRequest(BaseModel): |
|
|
"""Batch job request model.""" |
|
|
batch_type: BatchType |
|
|
items: List[Dict[str, Any]] |
|
|
priority: TaskPriority = TaskPriority.NORMAL |
|
|
parallel: bool = True |
|
|
max_workers: int = 5 |
|
|
callback_url: Optional[str] = None |
|
|
metadata: Dict[str, Any] = Field(default_factory=dict) |
|
|
|
|
|
|
|
|
class BatchJobStatus(BaseModel): |
|
|
"""Batch job status model.""" |
|
|
job_id: str |
|
|
batch_type: BatchType |
|
|
total_items: int |
|
|
completed: int |
|
|
failed: int |
|
|
pending: int |
|
|
status: str |
|
|
started_at: datetime |
|
|
completed_at: Optional[datetime] = None |
|
|
duration_seconds: Optional[float] = None |
|
|
results: List[Dict[str, Any]] = Field(default_factory=list) |
|
|
|
|
|
|
|
|
class BatchProcessingService: |
|
|
"""Service for batch processing operations.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize batch processing service.""" |
|
|
self.celery_app = get_celery_app() |
|
|
self._active_jobs: Dict[str, BatchJobStatus] = {} |
|
|
self._job_results: Dict[str, List[AsyncResult]] = {} |
|
|
|
|
|
logger.info("batch_service_initialized") |
|
|
|
|
|
async def start(self): |
|
|
"""Start batch processing service.""" |
|
|
|
|
|
await priority_queue.start() |
|
|
|
|
|
|
|
|
self._register_handlers() |
|
|
|
|
|
logger.info("batch_service_started") |
|
|
|
|
|
async def stop(self): |
|
|
"""Stop batch processing service.""" |
|
|
|
|
|
await priority_queue.stop() |
|
|
|
|
|
|
|
|
for job_id, results in self._job_results.items(): |
|
|
for result in results: |
|
|
if not result.ready(): |
|
|
result.revoke(terminate=True) |
|
|
|
|
|
logger.info("batch_service_stopped") |
|
|
|
|
|
def _register_handlers(self): |
|
|
"""Register task handlers with priority queue.""" |
|
|
|
|
|
async def investigation_handler(payload: Dict[str, Any], metadata: Dict[str, Any]): |
|
|
result = run_investigation.delay( |
|
|
investigation_id=payload["investigation_id"], |
|
|
query=payload["query"], |
|
|
config=payload.get("config") |
|
|
) |
|
|
return result.id |
|
|
|
|
|
priority_queue.register_handler("investigation", investigation_handler) |
|
|
|
|
|
|
|
|
async def analysis_handler(payload: Dict[str, Any], metadata: Dict[str, Any]): |
|
|
result = analyze_patterns.delay( |
|
|
data_type=payload["data_type"], |
|
|
time_range=payload["time_range"], |
|
|
pattern_types=payload.get("pattern_types"), |
|
|
min_confidence=payload.get("min_confidence", 0.7) |
|
|
) |
|
|
return result.id |
|
|
|
|
|
priority_queue.register_handler("analysis", analysis_handler) |
|
|
|
|
|
async def submit_batch_job(self, request: BatchJobRequest) -> BatchJobStatus: |
|
|
""" |
|
|
Submit a batch job for processing. |
|
|
|
|
|
Args: |
|
|
request: Batch job request |
|
|
|
|
|
Returns: |
|
|
Batch job status |
|
|
""" |
|
|
job_id = f"BATCH-{datetime.now().strftime('%Y%m%d%H%M%S')}" |
|
|
|
|
|
|
|
|
job_status = BatchJobStatus( |
|
|
job_id=job_id, |
|
|
batch_type=request.batch_type, |
|
|
total_items=len(request.items), |
|
|
completed=0, |
|
|
failed=0, |
|
|
pending=len(request.items), |
|
|
status="submitted", |
|
|
started_at=datetime.now() |
|
|
) |
|
|
|
|
|
self._active_jobs[job_id] = job_status |
|
|
|
|
|
logger.info( |
|
|
"batch_job_submitted", |
|
|
job_id=job_id, |
|
|
batch_type=request.batch_type.value, |
|
|
items=len(request.items), |
|
|
priority=request.priority.name |
|
|
) |
|
|
|
|
|
|
|
|
if request.batch_type == BatchType.INVESTIGATION: |
|
|
await self._process_investigation_batch(job_id, request) |
|
|
elif request.batch_type == BatchType.ANALYSIS: |
|
|
await self._process_analysis_batch(job_id, request) |
|
|
elif request.batch_type == BatchType.REPORT: |
|
|
await self._process_report_batch(job_id, request) |
|
|
elif request.batch_type == BatchType.EXPORT: |
|
|
await self._process_export_batch(job_id, request) |
|
|
elif request.batch_type == BatchType.MONITORING: |
|
|
await self._process_monitoring_batch(job_id, request) |
|
|
|
|
|
|
|
|
job_status.status = "processing" |
|
|
|
|
|
return job_status |
|
|
|
|
|
async def _process_investigation_batch( |
|
|
self, |
|
|
job_id: str, |
|
|
request: BatchJobRequest |
|
|
): |
|
|
"""Process investigation batch.""" |
|
|
tasks = [] |
|
|
|
|
|
for item in request.items: |
|
|
task = run_investigation.s( |
|
|
investigation_id=item.get("id", f"{job_id}-{len(tasks)}"), |
|
|
query=item["query"], |
|
|
config=item.get("config", {}) |
|
|
) |
|
|
tasks.append(task) |
|
|
|
|
|
|
|
|
if request.parallel: |
|
|
job = group(tasks) |
|
|
else: |
|
|
job = chain(tasks) |
|
|
|
|
|
|
|
|
result = job.apply_async( |
|
|
priority=request.priority.value, |
|
|
link=self._create_callback_task(job_id, request.callback_url) |
|
|
) |
|
|
|
|
|
self._job_results[job_id] = [result] |
|
|
|
|
|
async def _process_analysis_batch( |
|
|
self, |
|
|
job_id: str, |
|
|
request: BatchJobRequest |
|
|
): |
|
|
"""Process analysis batch.""" |
|
|
tasks = [] |
|
|
|
|
|
for item in request.items: |
|
|
if item.get("type") == "contracts": |
|
|
task = analyze_contracts_batch.s( |
|
|
contract_ids=item["contract_ids"], |
|
|
analysis_type=item.get("analysis_type", "anomaly"), |
|
|
threshold=item.get("threshold", 0.7) |
|
|
) |
|
|
elif item.get("type") == "patterns": |
|
|
task = analyze_patterns.s( |
|
|
data_type=item["data_type"], |
|
|
time_range=item["time_range"], |
|
|
pattern_types=item.get("pattern_types"), |
|
|
min_confidence=item.get("min_confidence", 0.7) |
|
|
) |
|
|
else: |
|
|
continue |
|
|
|
|
|
tasks.append(task) |
|
|
|
|
|
|
|
|
job = group(tasks) |
|
|
result = job.apply_async( |
|
|
priority=request.priority.value, |
|
|
link=self._create_callback_task(job_id, request.callback_url) |
|
|
) |
|
|
|
|
|
self._job_results[job_id] = [result] |
|
|
|
|
|
async def _process_report_batch( |
|
|
self, |
|
|
job_id: str, |
|
|
request: BatchJobRequest |
|
|
): |
|
|
"""Process report batch.""" |
|
|
tasks = [] |
|
|
|
|
|
for item in request.items: |
|
|
task = generate_report.s( |
|
|
report_id=item.get("id", f"{job_id}-{len(tasks)}"), |
|
|
report_type=item["report_type"], |
|
|
investigation_ids=item["investigation_ids"], |
|
|
config=item.get("config", {}) |
|
|
) |
|
|
tasks.append(task) |
|
|
|
|
|
|
|
|
job = group(tasks) |
|
|
result = job.apply_async( |
|
|
priority=request.priority.value, |
|
|
link=self._create_callback_task(job_id, request.callback_url) |
|
|
) |
|
|
|
|
|
self._job_results[job_id] = [result] |
|
|
|
|
|
async def _process_export_batch( |
|
|
self, |
|
|
job_id: str, |
|
|
request: BatchJobRequest |
|
|
): |
|
|
"""Process export batch.""" |
|
|
tasks = [] |
|
|
|
|
|
for item in request.items: |
|
|
task = export_to_pdf.s( |
|
|
content_type=item["content_type"], |
|
|
content_id=item["content_id"], |
|
|
options=item.get("options", {}) |
|
|
) |
|
|
tasks.append(task) |
|
|
|
|
|
|
|
|
job = group(tasks) |
|
|
result = job.apply_async( |
|
|
priority=request.priority.value, |
|
|
link=self._create_callback_task(job_id, request.callback_url), |
|
|
queue="normal" |
|
|
) |
|
|
|
|
|
self._job_results[job_id] = [result] |
|
|
|
|
|
async def _process_monitoring_batch( |
|
|
self, |
|
|
job_id: str, |
|
|
request: BatchJobRequest |
|
|
): |
|
|
"""Process monitoring batch.""" |
|
|
tasks = [] |
|
|
|
|
|
for item in request.items: |
|
|
task = monitor_anomalies.s( |
|
|
monitoring_config=item["config"], |
|
|
alert_threshold=item.get("threshold", 0.8) |
|
|
) |
|
|
tasks.append(task) |
|
|
|
|
|
|
|
|
job = group(tasks) |
|
|
result = job.apply_async( |
|
|
priority=request.priority.value, |
|
|
link=self._create_callback_task(job_id, request.callback_url) |
|
|
) |
|
|
|
|
|
self._job_results[job_id] = [result] |
|
|
|
|
|
def _create_callback_task(self, job_id: str, callback_url: Optional[str]): |
|
|
"""Create callback task for job completion.""" |
|
|
if not callback_url: |
|
|
return None |
|
|
|
|
|
@celery_app.task |
|
|
def batch_completion_callback(results): |
|
|
|
|
|
job_status = self._active_jobs.get(job_id) |
|
|
if job_status: |
|
|
job_status.completed_at = datetime.now() |
|
|
job_status.duration_seconds = ( |
|
|
job_status.completed_at - job_status.started_at |
|
|
).total_seconds() |
|
|
job_status.status = "completed" |
|
|
job_status.results = results |
|
|
|
|
|
|
|
|
import httpx |
|
|
with httpx.Client() as client: |
|
|
client.post( |
|
|
callback_url, |
|
|
json={ |
|
|
"job_id": job_id, |
|
|
"status": "completed", |
|
|
"results": results, |
|
|
"completed_at": datetime.now().isoformat() |
|
|
}, |
|
|
timeout=30.0 |
|
|
) |
|
|
|
|
|
return batch_completion_callback.s() |
|
|
|
|
|
async def get_job_status(self, job_id: str) -> Optional[BatchJobStatus]: |
|
|
""" |
|
|
Get batch job status. |
|
|
|
|
|
Args: |
|
|
job_id: Job ID |
|
|
|
|
|
Returns: |
|
|
Job status or None |
|
|
""" |
|
|
job_status = self._active_jobs.get(job_id) |
|
|
if not job_status: |
|
|
return None |
|
|
|
|
|
|
|
|
if job_id in self._job_results: |
|
|
results = self._job_results[job_id] |
|
|
completed = 0 |
|
|
failed = 0 |
|
|
|
|
|
for result in results: |
|
|
if result.ready(): |
|
|
if result.successful(): |
|
|
completed += 1 |
|
|
else: |
|
|
failed += 1 |
|
|
|
|
|
job_status.completed = completed |
|
|
job_status.failed = failed |
|
|
job_status.pending = job_status.total_items - completed - failed |
|
|
|
|
|
if job_status.pending == 0: |
|
|
job_status.status = "completed" if failed == 0 else "completed_with_errors" |
|
|
if not job_status.completed_at: |
|
|
job_status.completed_at = datetime.now() |
|
|
job_status.duration_seconds = ( |
|
|
job_status.completed_at - job_status.started_at |
|
|
).total_seconds() |
|
|
|
|
|
return job_status |
|
|
|
|
|
async def cancel_job(self, job_id: str) -> bool: |
|
|
""" |
|
|
Cancel a batch job. |
|
|
|
|
|
Args: |
|
|
job_id: Job ID |
|
|
|
|
|
Returns: |
|
|
True if cancelled |
|
|
""" |
|
|
if job_id not in self._job_results: |
|
|
return False |
|
|
|
|
|
|
|
|
for result in self._job_results[job_id]: |
|
|
if not result.ready(): |
|
|
result.revoke(terminate=True) |
|
|
|
|
|
|
|
|
job_status = self._active_jobs.get(job_id) |
|
|
if job_status: |
|
|
job_status.status = "cancelled" |
|
|
job_status.completed_at = datetime.now() |
|
|
job_status.duration_seconds = ( |
|
|
job_status.completed_at - job_status.started_at |
|
|
).total_seconds() |
|
|
|
|
|
logger.info("batch_job_cancelled", job_id=job_id) |
|
|
|
|
|
return True |
|
|
|
|
|
async def get_queue_stats(self) -> QueueStats: |
|
|
"""Get queue statistics.""" |
|
|
return await priority_queue.get_stats() |
|
|
|
|
|
async def cleanup_old_jobs(self, days: int = 7): |
|
|
"""Clean up old completed jobs.""" |
|
|
cutoff_time = datetime.now() - timedelta(days=days) |
|
|
|
|
|
jobs_to_remove = [] |
|
|
for job_id, job_status in self._active_jobs.items(): |
|
|
if (job_status.completed_at and |
|
|
job_status.completed_at < cutoff_time): |
|
|
jobs_to_remove.append(job_id) |
|
|
|
|
|
for job_id in jobs_to_remove: |
|
|
del self._active_jobs[job_id] |
|
|
if job_id in self._job_results: |
|
|
del self._job_results[job_id] |
|
|
|
|
|
logger.info( |
|
|
"old_jobs_cleaned", |
|
|
removed=len(jobs_to_remove), |
|
|
remaining=len(self._active_jobs) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
batch_service = BatchProcessingService() |