anderson-ufrj
feat(cli): implement complete CLI commands and batch processing system
138f7cb
"""
Module: infrastructure.queue.tasks.report_tasks
Description: Celery tasks for report generation and processing
Author: Anderson H. Silva
Date: 2025-01-25
License: Proprietary - All rights reserved
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
import asyncio
from celery import chain, group
from celery.utils.log import get_task_logger
from src.infrastructure.queue.celery_app import celery_app, priority_task, TaskPriority
from src.services.report_service import ReportService
from src.services.export_service import ExportService
from src.core.dependencies import get_db_session
from src.agents import get_agent_pool
logger = get_task_logger(__name__)
@celery_app.task(name="tasks.generate_report", bind=True, queue="normal")
def generate_report(
self,
report_id: str,
report_type: str,
investigation_ids: List[str],
config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Generate a comprehensive report.
Args:
report_id: Unique report ID
report_type: Type of report to generate
investigation_ids: List of investigation IDs to include
config: Report configuration
Returns:
Generated report data
"""
try:
logger.info(
"report_generation_started",
report_id=report_id,
report_type=report_type,
investigations=len(investigation_ids)
)
# Update task state
self.update_state(
state="PROGRESS",
meta={
"current": 0,
"total": 100,
"status": "Initializing report generation..."
}
)
# Run async report generation
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
_generate_report_async(
self,
report_id,
report_type,
investigation_ids,
config
)
)
logger.info(
"report_generation_completed",
report_id=report_id,
word_count=result.get("word_count", 0)
)
return result
finally:
loop.close()
except Exception as e:
logger.error(
"report_generation_failed",
report_id=report_id,
error=str(e),
exc_info=True
)
# Retry with exponential backoff
raise self.retry(
exc=e,
countdown=60 * (2 ** self.request.retries),
max_retries=3
)
async def _generate_report_async(
task,
report_id: str,
report_type: str,
investigation_ids: List[str],
config: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Async report generation implementation."""
async with get_db_session() as db:
report_service = ReportService(db)
agent_pool = get_agent_pool()
# Get Tiradentes agent for report generation
tiradentes = agent_pool.get_agent("tiradentes")
if not tiradentes:
raise RuntimeError("Report generation agent not available")
# Update progress
task.update_state(
state="PROGRESS",
meta={
"current": 20,
"total": 100,
"status": "Loading investigation data..."
}
)
# Load investigations
investigations = await report_service.load_investigations(investigation_ids)
# Update progress
task.update_state(
state="PROGRESS",
meta={
"current": 40,
"total": 100,
"status": "Analyzing findings..."
}
)
# Generate report content
report_content = await tiradentes.generate_report(
report_type=report_type,
investigations=investigations,
config=config or {}
)
# Update progress
task.update_state(
state="PROGRESS",
meta={
"current": 80,
"total": 100,
"status": "Finalizing report..."
}
)
# Save report
report = await report_service.save_report(
report_id=report_id,
report_type=report_type,
content=report_content,
metadata={
"investigation_ids": investigation_ids,
"generated_by": "tiradentes",
"config": config
}
)
# Update progress
task.update_state(
state="PROGRESS",
meta={
"current": 100,
"total": 100,
"status": "Report completed!"
}
)
return {
"report_id": report.id,
"report_type": report_type,
"title": report.title,
"word_count": len(report_content.split()),
"status": "completed",
"created_at": report.created_at.isoformat()
}
@celery_app.task(name="tasks.generate_executive_summary", queue="high")
def generate_executive_summary(
investigation_ids: List[str],
max_length: int = 500
) -> Dict[str, Any]:
"""
Generate executive summary from investigations.
Args:
investigation_ids: Investigation IDs to summarize
max_length: Maximum summary length in words
Returns:
Executive summary
"""
logger.info(
"executive_summary_started",
investigations=len(investigation_ids),
max_length=max_length
)
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
_generate_executive_summary_async(
investigation_ids,
max_length
)
)
return result
finally:
loop.close()
except Exception as e:
logger.error(
"executive_summary_failed",
error=str(e),
exc_info=True
)
raise
async def _generate_executive_summary_async(
investigation_ids: List[str],
max_length: int
) -> Dict[str, Any]:
"""Async executive summary generation."""
async with get_db_session() as db:
report_service = ReportService(db)
agent_pool = get_agent_pool()
# Get Tiradentes agent
tiradentes = agent_pool.get_agent("tiradentes")
if not tiradentes:
raise RuntimeError("Report agent not available")
# Load investigations
investigations = await report_service.load_investigations(investigation_ids)
# Generate summary
summary = await tiradentes.generate_executive_summary(
investigations=investigations,
max_length=max_length
)
return {
"summary": summary,
"word_count": len(summary.split()),
"investigation_count": len(investigations),
"key_findings": await tiradentes.extract_key_findings(investigations),
"generated_at": datetime.now().isoformat()
}
@celery_app.task(name="tasks.batch_report_generation", queue="normal")
def batch_report_generation(
report_configs: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Generate multiple reports in batch.
Args:
report_configs: List of report configurations
Returns:
Batch generation results
"""
logger.info(
"batch_report_generation_started",
report_count=len(report_configs)
)
# Create subtasks for each report
tasks = []
for config in report_configs:
task = generate_report.s(
report_id=config["report_id"],
report_type=config["report_type"],
investigation_ids=config["investigation_ids"],
config=config.get("config")
)
tasks.append(task)
# Execute in parallel
job = group(tasks)
results = job.apply_async()
# Wait for results
report_results = results.get(timeout=1800) # 30 minutes timeout
# Aggregate results
summary = {
"total_reports": len(report_configs),
"completed": sum(1 for r in report_results if r.get("status") == "completed"),
"failed": sum(1 for r in report_results if r.get("status") == "failed"),
"total_words": sum(r.get("word_count", 0) for r in report_results),
"results": report_results
}
logger.info(
"batch_report_generation_completed",
total=summary["total_reports"],
completed=summary["completed"]
)
return summary
@priority_task(priority=TaskPriority.HIGH)
def generate_urgent_report(
investigation_id: str,
reason: str,
recipients: List[str]
) -> Dict[str, Any]:
"""
Generate urgent report with notifications.
Args:
investigation_id: Investigation to report on
reason: Reason for urgency
recipients: Email recipients for notification
Returns:
Report generation results
"""
logger.warning(
"urgent_report_requested",
investigation_id=investigation_id,
reason=reason,
recipients=len(recipients)
)
# Generate report with high priority
report_id = f"URGENT-{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Chain tasks: generate report → export to PDF → send notifications
workflow = chain(
generate_report.s(
report_id=report_id,
report_type="urgent",
investigation_ids=[investigation_id],
config={"reason": reason, "priority": "urgent"}
),
export_report_to_pdf.s(),
send_report_notifications.s(recipients=recipients)
)
result = workflow.apply_async(priority=9)
return result.get()
@celery_app.task(name="tasks.export_report_to_pdf", queue="normal")
def export_report_to_pdf(report_data: Dict[str, Any]) -> Dict[str, Any]:
"""Export report to PDF format."""
try:
export_service = ExportService()
pdf_content = asyncio.run(
export_service.generate_pdf(
content=report_data.get("content", ""),
title=report_data.get("title", "Report"),
metadata=report_data
)
)
return {
**report_data,
"pdf_size": len(pdf_content),
"pdf_generated": True
}
except Exception as e:
logger.error(
"pdf_export_failed",
report_id=report_data.get("report_id"),
error=str(e)
)
raise
@celery_app.task(name="tasks.send_report_notifications", queue="high")
def send_report_notifications(
report_data: Dict[str, Any],
recipients: List[str]
) -> Dict[str, Any]:
"""Send report notifications."""
logger.info(
"sending_notifications",
report_id=report_data.get("report_id"),
recipients=len(recipients)
)
# In production, this would send actual emails
# For now, just log the action
return {
"report_id": report_data.get("report_id"),
"notifications_sent": len(recipients),
"timestamp": datetime.now().isoformat()
}