anderson-ufrj
feat(cli): implement complete CLI commands and batch processing system
138f7cb
"""
Module: infrastructure.queue.tasks.analysis_tasks
Description: Celery tasks for data analysis and pattern detection
Author: Anderson H. Silva
Date: 2025-01-25
License: Proprietary - All rights reserved
"""
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import asyncio
import numpy as np
from celery import chord
from celery.utils.log import get_task_logger
from src.infrastructure.queue.celery_app import celery_app, priority_task, TaskPriority
from src.services.data_service import DataService
from src.services.ml.pattern_detector import PatternDetector
from src.core.dependencies import get_db_session
from src.agents import get_agent_pool
logger = get_task_logger(__name__)
@celery_app.task(name="tasks.analyze_patterns", queue="normal")
def analyze_patterns(
data_type: str,
time_range: Dict[str, str],
pattern_types: Optional[List[str]] = None,
min_confidence: float = 0.7
) -> Dict[str, Any]:
"""
Analyze patterns in data.
Args:
data_type: Type of data to analyze
time_range: Time range for analysis
pattern_types: Specific patterns to look for
min_confidence: Minimum confidence threshold
Returns:
Pattern analysis results
"""
logger.info(
"pattern_analysis_started",
data_type=data_type,
time_range=time_range,
pattern_types=pattern_types
)
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
_analyze_patterns_async(
data_type,
time_range,
pattern_types,
min_confidence
)
)
logger.info(
"pattern_analysis_completed",
patterns_found=len(result.get("patterns", []))
)
return result
finally:
loop.close()
except Exception as e:
logger.error(
"pattern_analysis_failed",
error=str(e),
exc_info=True
)
raise
async def _analyze_patterns_async(
data_type: str,
time_range: Dict[str, str],
pattern_types: Optional[List[str]],
min_confidence: float
) -> Dict[str, Any]:
"""Async pattern analysis implementation."""
async with get_db_session() as db:
data_service = DataService(db)
agent_pool = get_agent_pool()
# Get Anita agent for pattern analysis
anita = agent_pool.get_agent("anita")
if not anita:
raise RuntimeError("Pattern analysis agent not available")
# Get data for analysis
if data_type == "contracts":
data = await data_service.get_contracts_in_range(
start_date=time_range.get("start"),
end_date=time_range.get("end")
)
elif data_type == "suppliers":
data = await data_service.get_supplier_activity(
start_date=time_range.get("start"),
end_date=time_range.get("end")
)
else:
raise ValueError(f"Unknown data type: {data_type}")
# Run pattern analysis
patterns = await anita.analyze_patterns(
data=data,
pattern_types=pattern_types or ["temporal", "value", "supplier"],
min_confidence=min_confidence
)
return {
"data_type": data_type,
"time_range": time_range,
"total_records": len(data),
"patterns": patterns,
"analysis_timestamp": datetime.now().isoformat()
}
@celery_app.task(name="tasks.correlation_analysis", queue="normal")
def correlation_analysis(
datasets: List[Dict[str, Any]],
correlation_type: str = "pearson",
min_correlation: float = 0.7
) -> Dict[str, Any]:
"""
Analyze correlations between datasets.
Args:
datasets: List of datasets to correlate
correlation_type: Type of correlation (pearson, spearman, kendall)
min_correlation: Minimum correlation threshold
Returns:
Correlation analysis results
"""
logger.info(
"correlation_analysis_started",
dataset_count=len(datasets),
correlation_type=correlation_type
)
try:
# Prepare data for correlation
prepared_data = []
for dataset in datasets:
values = [float(item.get("value", 0)) for item in dataset.get("data", [])]
prepared_data.append(values)
# Calculate correlations
correlations = []
for i in range(len(prepared_data)):
for j in range(i + 1, len(prepared_data)):
if len(prepared_data[i]) == len(prepared_data[j]):
if correlation_type == "pearson":
corr = np.corrcoef(prepared_data[i], prepared_data[j])[0, 1]
else:
# Simplified for example
corr = np.corrcoef(prepared_data[i], prepared_data[j])[0, 1]
if abs(corr) >= min_correlation:
correlations.append({
"dataset1": datasets[i].get("name", f"Dataset {i}"),
"dataset2": datasets[j].get("name", f"Dataset {j}"),
"correlation": float(corr),
"strength": "strong" if abs(corr) >= 0.8 else "moderate",
"direction": "positive" if corr > 0 else "negative"
})
return {
"correlation_type": correlation_type,
"datasets_analyzed": len(datasets),
"significant_correlations": len(correlations),
"correlations": correlations,
"min_correlation": min_correlation
}
except Exception as e:
logger.error(
"correlation_analysis_failed",
error=str(e),
exc_info=True
)
raise
@celery_app.task(name="tasks.temporal_analysis", queue="normal")
def temporal_analysis(
data_source: str,
time_window: str = "monthly",
metrics: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Analyze temporal trends and seasonality.
Args:
data_source: Source of temporal data
time_window: Analysis window (daily, weekly, monthly, yearly)
metrics: Specific metrics to analyze
Returns:
Temporal analysis results
"""
logger.info(
"temporal_analysis_started",
data_source=data_source,
time_window=time_window
)
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
_temporal_analysis_async(data_source, time_window, metrics)
)
return result
finally:
loop.close()
except Exception as e:
logger.error(
"temporal_analysis_failed",
error=str(e),
exc_info=True
)
raise
async def _temporal_analysis_async(
data_source: str,
time_window: str,
metrics: Optional[List[str]]
) -> Dict[str, Any]:
"""Async temporal analysis implementation."""
async with get_db_session() as db:
data_service = DataService(db)
# Define time windows
window_days = {
"daily": 1,
"weekly": 7,
"monthly": 30,
"yearly": 365
}
days = window_days.get(time_window, 30)
end_date = datetime.now()
start_date = end_date - timedelta(days=days * 12) # 12 periods
# Get temporal data
if data_source == "contracts":
data = await data_service.get_contracts_in_range(
start_date=start_date.isoformat(),
end_date=end_date.isoformat()
)
else:
raise ValueError(f"Unknown data source: {data_source}")
# Analyze trends
pattern_detector = PatternDetector()
trends = await pattern_detector.detect_temporal_patterns(
data=data,
window=time_window,
metrics=metrics or ["count", "total_value", "average_value"]
)
return {
"data_source": data_source,
"time_window": time_window,
"analysis_period": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
},
"trends": trends,
"seasonality_detected": any(t.get("seasonal") for t in trends),
"anomaly_periods": [t for t in trends if t.get("is_anomaly")]
}
@priority_task(priority=TaskPriority.HIGH)
def complex_analysis_pipeline(
investigation_id: str,
analysis_config: Dict[str, Any]
) -> Dict[str, Any]:
"""
Run complex analysis pipeline with multiple steps.
Args:
investigation_id: Investigation ID
analysis_config: Analysis configuration
Returns:
Combined analysis results
"""
logger.info(
"complex_analysis_started",
investigation_id=investigation_id,
steps=list(analysis_config.keys())
)
# Create analysis subtasks
tasks = []
if "patterns" in analysis_config:
tasks.append(
analyze_patterns.s(**analysis_config["patterns"])
)
if "correlations" in analysis_config:
tasks.append(
correlation_analysis.s(**analysis_config["correlations"])
)
if "temporal" in analysis_config:
tasks.append(
temporal_analysis.s(**analysis_config["temporal"])
)
# Execute in parallel and combine results
callback = combine_analysis_results.s(investigation_id=investigation_id)
job = chord(tasks)(callback)
return job.get()
@celery_app.task(name="tasks.combine_analysis_results", queue="normal")
def combine_analysis_results(
results: List[Dict[str, Any]],
investigation_id: str
) -> Dict[str, Any]:
"""Combine multiple analysis results."""
combined = {
"investigation_id": investigation_id,
"analysis_count": len(results),
"timestamp": datetime.now().isoformat(),
"results": {}
}
# Merge results by type
for result in results:
if "patterns" in result:
combined["results"]["patterns"] = result
elif "correlations" in result:
combined["results"]["correlations"] = result
elif "trends" in result:
combined["results"]["temporal"] = result
# Generate summary insights
combined["summary"] = {
"total_patterns": sum(
len(r.get("patterns", []))
for r in results
if "patterns" in r
),
"significant_correlations": sum(
r.get("significant_correlations", 0)
for r in results
if "correlations" in r
),
"anomaly_periods": sum(
len(r.get("anomaly_periods", []))
for r in results
if "anomaly_periods" in r
)
}
logger.info(
"analysis_combined",
investigation_id=investigation_id,
result_count=len(results)
)
return combined