|
|
""" |
|
|
Module: infrastructure.queue.tasks.report_tasks |
|
|
Description: Celery tasks for report generation and processing |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-25 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, List, Optional |
|
|
from datetime import datetime |
|
|
import asyncio |
|
|
|
|
|
from celery import chain, group |
|
|
from celery.utils.log import get_task_logger |
|
|
|
|
|
from src.infrastructure.queue.celery_app import celery_app, priority_task, TaskPriority |
|
|
from src.services.report_service import ReportService |
|
|
from src.services.export_service import ExportService |
|
|
from src.core.dependencies import get_db_session |
|
|
from src.agents import get_agent_pool |
|
|
|
|
|
logger = get_task_logger(__name__) |
|
|
|
|
|
|
|
|
@celery_app.task(name="tasks.generate_report", bind=True, queue="normal") |
|
|
def generate_report( |
|
|
self, |
|
|
report_id: str, |
|
|
report_type: str, |
|
|
investigation_ids: List[str], |
|
|
config: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate a comprehensive report. |
|
|
|
|
|
Args: |
|
|
report_id: Unique report ID |
|
|
report_type: Type of report to generate |
|
|
investigation_ids: List of investigation IDs to include |
|
|
config: Report configuration |
|
|
|
|
|
Returns: |
|
|
Generated report data |
|
|
""" |
|
|
try: |
|
|
logger.info( |
|
|
"report_generation_started", |
|
|
report_id=report_id, |
|
|
report_type=report_type, |
|
|
investigations=len(investigation_ids) |
|
|
) |
|
|
|
|
|
|
|
|
self.update_state( |
|
|
state="PROGRESS", |
|
|
meta={ |
|
|
"current": 0, |
|
|
"total": 100, |
|
|
"status": "Initializing report generation..." |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
|
|
|
try: |
|
|
result = loop.run_until_complete( |
|
|
_generate_report_async( |
|
|
self, |
|
|
report_id, |
|
|
report_type, |
|
|
investigation_ids, |
|
|
config |
|
|
) |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
"report_generation_completed", |
|
|
report_id=report_id, |
|
|
word_count=result.get("word_count", 0) |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
finally: |
|
|
loop.close() |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"report_generation_failed", |
|
|
report_id=report_id, |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
|
|
|
|
|
|
raise self.retry( |
|
|
exc=e, |
|
|
countdown=60 * (2 ** self.request.retries), |
|
|
max_retries=3 |
|
|
) |
|
|
|
|
|
|
|
|
async def _generate_report_async( |
|
|
task, |
|
|
report_id: str, |
|
|
report_type: str, |
|
|
investigation_ids: List[str], |
|
|
config: Optional[Dict[str, Any]] |
|
|
) -> Dict[str, Any]: |
|
|
"""Async report generation implementation.""" |
|
|
async with get_db_session() as db: |
|
|
report_service = ReportService(db) |
|
|
agent_pool = get_agent_pool() |
|
|
|
|
|
|
|
|
tiradentes = agent_pool.get_agent("tiradentes") |
|
|
if not tiradentes: |
|
|
raise RuntimeError("Report generation agent not available") |
|
|
|
|
|
|
|
|
task.update_state( |
|
|
state="PROGRESS", |
|
|
meta={ |
|
|
"current": 20, |
|
|
"total": 100, |
|
|
"status": "Loading investigation data..." |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
investigations = await report_service.load_investigations(investigation_ids) |
|
|
|
|
|
|
|
|
task.update_state( |
|
|
state="PROGRESS", |
|
|
meta={ |
|
|
"current": 40, |
|
|
"total": 100, |
|
|
"status": "Analyzing findings..." |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
report_content = await tiradentes.generate_report( |
|
|
report_type=report_type, |
|
|
investigations=investigations, |
|
|
config=config or {} |
|
|
) |
|
|
|
|
|
|
|
|
task.update_state( |
|
|
state="PROGRESS", |
|
|
meta={ |
|
|
"current": 80, |
|
|
"total": 100, |
|
|
"status": "Finalizing report..." |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
report = await report_service.save_report( |
|
|
report_id=report_id, |
|
|
report_type=report_type, |
|
|
content=report_content, |
|
|
metadata={ |
|
|
"investigation_ids": investigation_ids, |
|
|
"generated_by": "tiradentes", |
|
|
"config": config |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
task.update_state( |
|
|
state="PROGRESS", |
|
|
meta={ |
|
|
"current": 100, |
|
|
"total": 100, |
|
|
"status": "Report completed!" |
|
|
} |
|
|
) |
|
|
|
|
|
return { |
|
|
"report_id": report.id, |
|
|
"report_type": report_type, |
|
|
"title": report.title, |
|
|
"word_count": len(report_content.split()), |
|
|
"status": "completed", |
|
|
"created_at": report.created_at.isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
@celery_app.task(name="tasks.generate_executive_summary", queue="high") |
|
|
def generate_executive_summary( |
|
|
investigation_ids: List[str], |
|
|
max_length: int = 500 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate executive summary from investigations. |
|
|
|
|
|
Args: |
|
|
investigation_ids: Investigation IDs to summarize |
|
|
max_length: Maximum summary length in words |
|
|
|
|
|
Returns: |
|
|
Executive summary |
|
|
""" |
|
|
logger.info( |
|
|
"executive_summary_started", |
|
|
investigations=len(investigation_ids), |
|
|
max_length=max_length |
|
|
) |
|
|
|
|
|
try: |
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
|
|
|
try: |
|
|
result = loop.run_until_complete( |
|
|
_generate_executive_summary_async( |
|
|
investigation_ids, |
|
|
max_length |
|
|
) |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
finally: |
|
|
loop.close() |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"executive_summary_failed", |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
raise |
|
|
|
|
|
|
|
|
async def _generate_executive_summary_async( |
|
|
investigation_ids: List[str], |
|
|
max_length: int |
|
|
) -> Dict[str, Any]: |
|
|
"""Async executive summary generation.""" |
|
|
async with get_db_session() as db: |
|
|
report_service = ReportService(db) |
|
|
agent_pool = get_agent_pool() |
|
|
|
|
|
|
|
|
tiradentes = agent_pool.get_agent("tiradentes") |
|
|
if not tiradentes: |
|
|
raise RuntimeError("Report agent not available") |
|
|
|
|
|
|
|
|
investigations = await report_service.load_investigations(investigation_ids) |
|
|
|
|
|
|
|
|
summary = await tiradentes.generate_executive_summary( |
|
|
investigations=investigations, |
|
|
max_length=max_length |
|
|
) |
|
|
|
|
|
return { |
|
|
"summary": summary, |
|
|
"word_count": len(summary.split()), |
|
|
"investigation_count": len(investigations), |
|
|
"key_findings": await tiradentes.extract_key_findings(investigations), |
|
|
"generated_at": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
@celery_app.task(name="tasks.batch_report_generation", queue="normal") |
|
|
def batch_report_generation( |
|
|
report_configs: List[Dict[str, Any]] |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate multiple reports in batch. |
|
|
|
|
|
Args: |
|
|
report_configs: List of report configurations |
|
|
|
|
|
Returns: |
|
|
Batch generation results |
|
|
""" |
|
|
logger.info( |
|
|
"batch_report_generation_started", |
|
|
report_count=len(report_configs) |
|
|
) |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
for config in report_configs: |
|
|
task = generate_report.s( |
|
|
report_id=config["report_id"], |
|
|
report_type=config["report_type"], |
|
|
investigation_ids=config["investigation_ids"], |
|
|
config=config.get("config") |
|
|
) |
|
|
tasks.append(task) |
|
|
|
|
|
|
|
|
job = group(tasks) |
|
|
results = job.apply_async() |
|
|
|
|
|
|
|
|
report_results = results.get(timeout=1800) |
|
|
|
|
|
|
|
|
summary = { |
|
|
"total_reports": len(report_configs), |
|
|
"completed": sum(1 for r in report_results if r.get("status") == "completed"), |
|
|
"failed": sum(1 for r in report_results if r.get("status") == "failed"), |
|
|
"total_words": sum(r.get("word_count", 0) for r in report_results), |
|
|
"results": report_results |
|
|
} |
|
|
|
|
|
logger.info( |
|
|
"batch_report_generation_completed", |
|
|
total=summary["total_reports"], |
|
|
completed=summary["completed"] |
|
|
) |
|
|
|
|
|
return summary |
|
|
|
|
|
|
|
|
@priority_task(priority=TaskPriority.HIGH) |
|
|
def generate_urgent_report( |
|
|
investigation_id: str, |
|
|
reason: str, |
|
|
recipients: List[str] |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate urgent report with notifications. |
|
|
|
|
|
Args: |
|
|
investigation_id: Investigation to report on |
|
|
reason: Reason for urgency |
|
|
recipients: Email recipients for notification |
|
|
|
|
|
Returns: |
|
|
Report generation results |
|
|
""" |
|
|
logger.warning( |
|
|
"urgent_report_requested", |
|
|
investigation_id=investigation_id, |
|
|
reason=reason, |
|
|
recipients=len(recipients) |
|
|
) |
|
|
|
|
|
|
|
|
report_id = f"URGENT-{datetime.now().strftime('%Y%m%d%H%M%S')}" |
|
|
|
|
|
|
|
|
workflow = chain( |
|
|
generate_report.s( |
|
|
report_id=report_id, |
|
|
report_type="urgent", |
|
|
investigation_ids=[investigation_id], |
|
|
config={"reason": reason, "priority": "urgent"} |
|
|
), |
|
|
export_report_to_pdf.s(), |
|
|
send_report_notifications.s(recipients=recipients) |
|
|
) |
|
|
|
|
|
result = workflow.apply_async(priority=9) |
|
|
return result.get() |
|
|
|
|
|
|
|
|
@celery_app.task(name="tasks.export_report_to_pdf", queue="normal") |
|
|
def export_report_to_pdf(report_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Export report to PDF format.""" |
|
|
try: |
|
|
export_service = ExportService() |
|
|
|
|
|
pdf_content = asyncio.run( |
|
|
export_service.generate_pdf( |
|
|
content=report_data.get("content", ""), |
|
|
title=report_data.get("title", "Report"), |
|
|
metadata=report_data |
|
|
) |
|
|
) |
|
|
|
|
|
return { |
|
|
**report_data, |
|
|
"pdf_size": len(pdf_content), |
|
|
"pdf_generated": True |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"pdf_export_failed", |
|
|
report_id=report_data.get("report_id"), |
|
|
error=str(e) |
|
|
) |
|
|
raise |
|
|
|
|
|
|
|
|
@celery_app.task(name="tasks.send_report_notifications", queue="high") |
|
|
def send_report_notifications( |
|
|
report_data: Dict[str, Any], |
|
|
recipients: List[str] |
|
|
) -> Dict[str, Any]: |
|
|
"""Send report notifications.""" |
|
|
logger.info( |
|
|
"sending_notifications", |
|
|
report_id=report_data.get("report_id"), |
|
|
recipients=len(recipients) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return { |
|
|
"report_id": report_data.get("report_id"), |
|
|
"notifications_sent": len(recipients), |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |