|
|
""" |
|
|
Module: agents.anita |
|
|
Codinome: Anita Garibaldi - Roteadora Semântica |
|
|
Description: Agent specialized in pattern analysis and correlation detection in government data |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-24 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
from dataclasses import dataclass |
|
|
from collections import defaultdict, Counter |
|
|
|
|
|
import numpy as np |
|
|
from pydantic import BaseModel, Field as PydanticField |
|
|
|
|
|
from src.agents.deodoro import BaseAgent, AgentContext, AgentMessage, AgentResponse |
|
|
from src.core import get_logger, AgentStatus |
|
|
from src.core.exceptions import AgentExecutionError, DataAnalysisError |
|
|
from src.tools.transparency_api import TransparencyAPIClient, TransparencyAPIFilter |
|
|
from src.ml.spectral_analyzer import SpectralAnalyzer, SpectralFeatures, PeriodicPattern |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PatternResult: |
|
|
"""Result of pattern analysis.""" |
|
|
|
|
|
pattern_type: str |
|
|
description: str |
|
|
significance: float |
|
|
confidence: float |
|
|
insights: List[str] |
|
|
evidence: Dict[str, Any] |
|
|
recommendations: List[str] |
|
|
entities_involved: List[Dict[str, Any]] |
|
|
trend_direction: Optional[str] = None |
|
|
correlation_strength: Optional[float] = None |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CorrelationResult: |
|
|
"""Result of correlation analysis.""" |
|
|
|
|
|
correlation_type: str |
|
|
variables: List[str] |
|
|
correlation_coefficient: float |
|
|
p_value: Optional[float] |
|
|
significance_level: str |
|
|
description: str |
|
|
business_interpretation: str |
|
|
evidence: Dict[str, Any] |
|
|
recommendations: List[str] |
|
|
|
|
|
|
|
|
class AnalysisRequest(BaseModel): |
|
|
"""Request for pattern and correlation analysis.""" |
|
|
|
|
|
query: str = PydanticField(description="Natural language analysis query") |
|
|
analysis_types: Optional[List[str]] = PydanticField(default=None, description="Types of analysis to perform") |
|
|
time_period: Optional[str] = PydanticField(default="12_months", description="Time period for analysis") |
|
|
organization_codes: Optional[List[str]] = PydanticField(default=None, description="Organizations to analyze") |
|
|
focus_areas: Optional[List[str]] = PydanticField(default=None, description="Specific areas to focus on") |
|
|
comparison_mode: bool = PydanticField(default=False, description="Enable comparison between entities") |
|
|
max_records: int = PydanticField(default=200, description="Maximum records for analysis") |
|
|
|
|
|
|
|
|
class AnalystAgent(BaseAgent): |
|
|
""" |
|
|
Agent specialized in pattern analysis and correlation detection in government data. |
|
|
|
|
|
Capabilities: |
|
|
- Spending trend analysis over time |
|
|
- Organizational spending pattern comparison |
|
|
- Vendor market behavior analysis |
|
|
- Seasonal pattern detection |
|
|
- Contract value distribution analysis |
|
|
- Cross-organizational correlation analysis |
|
|
- Performance and efficiency metrics |
|
|
- Predictive trend modeling |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
min_correlation_threshold: float = 0.3, |
|
|
significance_threshold: float = 0.05, |
|
|
trend_detection_window: int = 6, |
|
|
): |
|
|
""" |
|
|
Initialize the Analyst Agent. |
|
|
|
|
|
Args: |
|
|
min_correlation_threshold: Minimum correlation coefficient to report |
|
|
significance_threshold: P-value threshold for statistical significance |
|
|
trend_detection_window: Number of periods for trend analysis |
|
|
""" |
|
|
super().__init__( |
|
|
name="Anita", |
|
|
description="Anita Garibaldi - Agent specialized in pattern analysis and correlation detection", |
|
|
capabilities=[ |
|
|
"spending_trend_analysis", |
|
|
"organizational_comparison", |
|
|
"vendor_behavior_analysis", |
|
|
"seasonal_pattern_detection", |
|
|
"value_distribution_analysis", |
|
|
"correlation_analysis", |
|
|
"efficiency_metrics", |
|
|
"predictive_modeling" |
|
|
], |
|
|
max_retries=3, |
|
|
timeout=60 |
|
|
) |
|
|
self.correlation_threshold = min_correlation_threshold |
|
|
self.significance_threshold = significance_threshold |
|
|
self.trend_window = trend_detection_window |
|
|
|
|
|
|
|
|
self.spectral_analyzer = SpectralAnalyzer() |
|
|
|
|
|
|
|
|
self.analysis_methods = { |
|
|
"spending_trends": self._analyze_spending_trends, |
|
|
"organizational_patterns": self._analyze_organizational_patterns, |
|
|
"vendor_behavior": self._analyze_vendor_behavior, |
|
|
"seasonal_patterns": self._analyze_seasonal_patterns, |
|
|
"spectral_patterns": self._analyze_spectral_patterns, |
|
|
"cross_spectral_analysis": self._perform_cross_spectral_analysis, |
|
|
"value_distribution": self._analyze_value_distribution, |
|
|
"correlation_analysis": self._perform_correlation_analysis, |
|
|
"efficiency_metrics": self._calculate_efficiency_metrics, |
|
|
} |
|
|
|
|
|
self.logger.info( |
|
|
"analyst_agent_initialized", |
|
|
agent_name=self.name, |
|
|
correlation_threshold=min_correlation_threshold, |
|
|
significance_threshold=significance_threshold, |
|
|
) |
|
|
|
|
|
async def initialize(self) -> None: |
|
|
"""Initialize agent resources.""" |
|
|
self.logger.info(f"{self.name} agent initialized") |
|
|
|
|
|
async def shutdown(self) -> None: |
|
|
"""Cleanup agent resources.""" |
|
|
self.logger.info(f"{self.name} agent shutting down") |
|
|
|
|
|
async def process( |
|
|
self, |
|
|
message: AgentMessage, |
|
|
context: AgentContext |
|
|
) -> AgentResponse: |
|
|
""" |
|
|
Process pattern analysis request and return insights. |
|
|
|
|
|
Args: |
|
|
message: Analysis request message |
|
|
context: Agent execution context |
|
|
|
|
|
Returns: |
|
|
AgentResponse with patterns and correlations |
|
|
""" |
|
|
try: |
|
|
self.logger.info( |
|
|
"analysis_started", |
|
|
investigation_id=context.investigation_id, |
|
|
agent_name=self.name, |
|
|
action=message.action, |
|
|
) |
|
|
|
|
|
|
|
|
if message.action == "analyze": |
|
|
request = AnalysisRequest(**message.payload) |
|
|
else: |
|
|
raise AgentExecutionError( |
|
|
f"Unsupported action: {message.action}", |
|
|
agent_id=self.name |
|
|
) |
|
|
|
|
|
|
|
|
analysis_data = await self._fetch_analysis_data(request, context) |
|
|
|
|
|
if not analysis_data: |
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
status=AgentStatus.COMPLETED, |
|
|
result={ |
|
|
"status": "no_data", |
|
|
"message": "No data found for the specified criteria", |
|
|
"patterns": [], |
|
|
"correlations": [], |
|
|
"summary": {"total_records": 0, "patterns_found": 0} |
|
|
}, |
|
|
metadata={"investigation_id": context.investigation_id} |
|
|
) |
|
|
|
|
|
|
|
|
patterns = await self._run_pattern_analysis(analysis_data, request, context) |
|
|
|
|
|
|
|
|
correlations = await self._run_correlation_analysis(analysis_data, request, context) |
|
|
|
|
|
|
|
|
insights = self._generate_insights(patterns, correlations, analysis_data) |
|
|
|
|
|
|
|
|
result = { |
|
|
"status": "completed", |
|
|
"query": request.query, |
|
|
"patterns": [self._pattern_to_dict(p) for p in patterns], |
|
|
"correlations": [self._correlation_to_dict(c) for c in correlations], |
|
|
"insights": insights, |
|
|
"summary": self._generate_analysis_summary(analysis_data, patterns, correlations), |
|
|
"metadata": { |
|
|
"investigation_id": context.investigation_id, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"agent_name": self.name, |
|
|
"records_analyzed": len(analysis_data), |
|
|
"patterns_found": len(patterns), |
|
|
"correlations_found": len(correlations), |
|
|
} |
|
|
} |
|
|
|
|
|
self.logger.info( |
|
|
"analysis_completed", |
|
|
investigation_id=context.investigation_id, |
|
|
records_analyzed=len(analysis_data), |
|
|
patterns_found=len(patterns), |
|
|
correlations_found=len(correlations), |
|
|
) |
|
|
|
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
status=AgentStatus.COMPLETED, |
|
|
result=result, |
|
|
metadata={"investigation_id": context.investigation_id} |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error( |
|
|
"analysis_failed", |
|
|
investigation_id=context.investigation_id, |
|
|
error=str(e), |
|
|
agent_name=self.name, |
|
|
) |
|
|
|
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
status=AgentStatus.ERROR, |
|
|
error=str(e), |
|
|
result={ |
|
|
"status": "error", |
|
|
"error": str(e), |
|
|
"investigation_id": context.investigation_id, |
|
|
}, |
|
|
metadata={"investigation_id": context.investigation_id} |
|
|
) |
|
|
|
|
|
async def _fetch_analysis_data( |
|
|
self, |
|
|
request: AnalysisRequest, |
|
|
context: AgentContext |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch comprehensive data for pattern analysis. |
|
|
|
|
|
Args: |
|
|
request: Analysis parameters |
|
|
context: Agent context |
|
|
|
|
|
Returns: |
|
|
List of contract records for analysis |
|
|
""" |
|
|
all_contracts = [] |
|
|
|
|
|
|
|
|
org_codes = request.organization_codes or [ |
|
|
"26000", |
|
|
"20000", |
|
|
"25000", |
|
|
"36000", |
|
|
"44000", |
|
|
"30000", |
|
|
] |
|
|
|
|
|
async with TransparencyAPIClient() as client: |
|
|
for org_code in org_codes: |
|
|
try: |
|
|
|
|
|
for month in range(1, 13): |
|
|
filters = TransparencyAPIFilter( |
|
|
codigo_orgao=org_code, |
|
|
ano=2024, |
|
|
mes=month, |
|
|
pagina=1, |
|
|
tamanho_pagina=min(20, request.max_records // (len(org_codes) * 12)) |
|
|
) |
|
|
|
|
|
response = await client.get_contracts(filters) |
|
|
|
|
|
|
|
|
for contract in response.data: |
|
|
contract["_org_code"] = org_code |
|
|
contract["_month"] = month |
|
|
contract["_year"] = 2024 |
|
|
contract["_fetch_timestamp"] = datetime.utcnow().isoformat() |
|
|
|
|
|
all_contracts.extend(response.data) |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
self.logger.info( |
|
|
"organization_data_fetched", |
|
|
org_code=org_code, |
|
|
total_records=len([c for c in all_contracts if c.get("_org_code") == org_code]), |
|
|
investigation_id=context.investigation_id, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.warning( |
|
|
"organization_data_fetch_failed", |
|
|
org_code=org_code, |
|
|
error=str(e), |
|
|
investigation_id=context.investigation_id, |
|
|
) |
|
|
continue |
|
|
|
|
|
return all_contracts[:request.max_records] |
|
|
|
|
|
async def _run_pattern_analysis( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
request: AnalysisRequest, |
|
|
context: AgentContext |
|
|
) -> List[PatternResult]: |
|
|
""" |
|
|
Run pattern analysis algorithms on the data. |
|
|
|
|
|
Args: |
|
|
data: Contract records to analyze |
|
|
request: Analysis parameters |
|
|
context: Agent context |
|
|
|
|
|
Returns: |
|
|
List of detected patterns |
|
|
""" |
|
|
all_patterns = [] |
|
|
|
|
|
|
|
|
types_to_run = request.analysis_types or list(self.analysis_methods.keys()) |
|
|
types_to_run = [t for t in types_to_run if t != "correlation_analysis"] |
|
|
|
|
|
for analysis_type in types_to_run: |
|
|
if analysis_type in self.analysis_methods: |
|
|
try: |
|
|
method = self.analysis_methods[analysis_type] |
|
|
patterns = await method(data, context) |
|
|
all_patterns.extend(patterns) |
|
|
|
|
|
self.logger.info( |
|
|
"pattern_analysis_completed", |
|
|
type=analysis_type, |
|
|
patterns_found=len(patterns), |
|
|
investigation_id=context.investigation_id, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error( |
|
|
"pattern_analysis_failed", |
|
|
type=analysis_type, |
|
|
error=str(e), |
|
|
investigation_id=context.investigation_id, |
|
|
) |
|
|
|
|
|
|
|
|
all_patterns.sort(key=lambda x: x.significance, reverse=True) |
|
|
|
|
|
return all_patterns |
|
|
|
|
|
async def _run_correlation_analysis( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
request: AnalysisRequest, |
|
|
context: AgentContext |
|
|
) -> List[CorrelationResult]: |
|
|
""" |
|
|
Run correlation analysis on the data. |
|
|
|
|
|
Args: |
|
|
data: Contract records to analyze |
|
|
request: Analysis parameters |
|
|
context: Agent context |
|
|
|
|
|
Returns: |
|
|
List of detected correlations |
|
|
""" |
|
|
correlations = [] |
|
|
|
|
|
if "correlation_analysis" in (request.analysis_types or ["correlation_analysis"]): |
|
|
try: |
|
|
correlations = await self._perform_correlation_analysis(data, context) |
|
|
|
|
|
self.logger.info( |
|
|
"correlation_analysis_completed", |
|
|
correlations_found=len(correlations), |
|
|
investigation_id=context.investigation_id, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error( |
|
|
"correlation_analysis_failed", |
|
|
error=str(e), |
|
|
investigation_id=context.investigation_id, |
|
|
) |
|
|
|
|
|
return correlations |
|
|
|
|
|
async def _analyze_spending_trends( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
context: AgentContext |
|
|
) -> List[PatternResult]: |
|
|
"""Analyze spending trends over time.""" |
|
|
patterns = [] |
|
|
|
|
|
|
|
|
monthly_spending = defaultdict(float) |
|
|
monthly_counts = defaultdict(int) |
|
|
|
|
|
for contract in data: |
|
|
month = contract.get("_month") |
|
|
valor = contract.get("valorInicial") or contract.get("valorGlobal") or 0 |
|
|
|
|
|
if month and isinstance(valor, (int, float)): |
|
|
monthly_spending[month] += float(valor) |
|
|
monthly_counts[month] += 1 |
|
|
|
|
|
if len(monthly_spending) < 3: |
|
|
return patterns |
|
|
|
|
|
|
|
|
months = sorted(monthly_spending.keys()) |
|
|
values = [monthly_spending[m] for m in months] |
|
|
|
|
|
|
|
|
x = np.array(range(len(months))) |
|
|
y = np.array(values) |
|
|
|
|
|
if len(x) > 1 and np.std(y) > 0: |
|
|
correlation = np.corrcoef(x, y)[0, 1] |
|
|
slope = np.polyfit(x, y, 1)[0] |
|
|
|
|
|
|
|
|
if abs(correlation) > 0.5: |
|
|
trend_direction = "increasing" if slope > 0 else "decreasing" |
|
|
significance = abs(correlation) |
|
|
|
|
|
pattern = PatternResult( |
|
|
pattern_type="spending_trends", |
|
|
description=f"Tendência de gastos {trend_direction} detectada", |
|
|
significance=significance, |
|
|
confidence=abs(correlation), |
|
|
insights=[ |
|
|
f"Gastos apresentam tendência {trend_direction} com correlação de {correlation:.2f}", |
|
|
f"Variação média mensal: R$ {slope:,.2f}", |
|
|
f"Período analisado: {len(months)} meses", |
|
|
], |
|
|
evidence={ |
|
|
"monthly_spending": dict(monthly_spending), |
|
|
"trend_correlation": correlation, |
|
|
"monthly_slope": slope, |
|
|
"total_value": sum(values), |
|
|
"average_monthly": np.mean(values), |
|
|
}, |
|
|
recommendations=[ |
|
|
"Investigar fatores que causam a tendência observada", |
|
|
"Analisar planejamento orçamentário", |
|
|
"Verificar sazonalidade nos gastos", |
|
|
"Monitorar sustentabilidade da tendência", |
|
|
], |
|
|
entities_involved=[{ |
|
|
"type": "monthly_data", |
|
|
"months_analyzed": len(months), |
|
|
"total_contracts": sum(monthly_counts.values()), |
|
|
}], |
|
|
trend_direction=trend_direction, |
|
|
correlation_strength=abs(correlation), |
|
|
) |
|
|
|
|
|
patterns.append(pattern) |
|
|
|
|
|
return patterns |
|
|
|
|
|
async def _analyze_organizational_patterns( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
context: AgentContext |
|
|
) -> List[PatternResult]: |
|
|
"""Analyze spending patterns across organizations.""" |
|
|
patterns = [] |
|
|
|
|
|
|
|
|
org_stats = defaultdict(lambda: {"total_value": 0, "count": 0, "contracts": []}) |
|
|
|
|
|
for contract in data: |
|
|
org_code = contract.get("_org_code") |
|
|
valor = contract.get("valorInicial") or contract.get("valorGlobal") or 0 |
|
|
|
|
|
if org_code and isinstance(valor, (int, float)): |
|
|
org_stats[org_code]["total_value"] += float(valor) |
|
|
org_stats[org_code]["count"] += 1 |
|
|
org_stats[org_code]["contracts"].append(contract) |
|
|
|
|
|
if len(org_stats) < 2: |
|
|
return patterns |
|
|
|
|
|
|
|
|
org_efficiency = {} |
|
|
for org_code, stats in org_stats.items(): |
|
|
if stats["count"] > 0: |
|
|
avg_contract_value = stats["total_value"] / stats["count"] |
|
|
org_efficiency[org_code] = { |
|
|
"avg_contract_value": avg_contract_value, |
|
|
"total_value": stats["total_value"], |
|
|
"contract_count": stats["count"], |
|
|
"efficiency_ratio": stats["total_value"] / stats["count"], |
|
|
} |
|
|
|
|
|
|
|
|
avg_values = [eff["avg_contract_value"] for eff in org_efficiency.values()] |
|
|
mean_avg = np.mean(avg_values) |
|
|
std_avg = np.std(avg_values) |
|
|
|
|
|
for org_code, efficiency in org_efficiency.items(): |
|
|
if std_avg > 0: |
|
|
z_score = (efficiency["avg_contract_value"] - mean_avg) / std_avg |
|
|
|
|
|
if abs(z_score) > 1.5: |
|
|
pattern_type = "high_value_contracts" if z_score > 0 else "low_value_contracts" |
|
|
significance = min(abs(z_score) / 3.0, 1.0) |
|
|
|
|
|
pattern = PatternResult( |
|
|
pattern_type="organizational_patterns", |
|
|
description=f"Padrão organizacional atípico: {org_code}", |
|
|
significance=significance, |
|
|
confidence=min(abs(z_score) / 2.0, 1.0), |
|
|
insights=[ |
|
|
f"Organização {org_code} apresenta padrão atípico de contratação", |
|
|
f"Valor médio por contrato: R$ {efficiency['avg_contract_value']:,.2f}", |
|
|
f"Desvio da média geral: {z_score:.1f} desvios padrão", |
|
|
], |
|
|
evidence={ |
|
|
"organization_code": org_code, |
|
|
"avg_contract_value": efficiency["avg_contract_value"], |
|
|
"total_value": efficiency["total_value"], |
|
|
"contract_count": efficiency["contract_count"], |
|
|
"z_score": z_score, |
|
|
"market_average": mean_avg, |
|
|
}, |
|
|
recommendations=[ |
|
|
"Investigar critérios de contratação da organização", |
|
|
"Comparar com organizações similares", |
|
|
"Analisar eficiência dos processos", |
|
|
"Verificar adequação dos valores contratados", |
|
|
], |
|
|
entities_involved=[{ |
|
|
"organization": org_code, |
|
|
"total_contracts": efficiency["contract_count"], |
|
|
"total_value": efficiency["total_value"], |
|
|
}], |
|
|
) |
|
|
|
|
|
patterns.append(pattern) |
|
|
|
|
|
return patterns |
|
|
|
|
|
async def _analyze_vendor_behavior( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
context: AgentContext |
|
|
) -> List[PatternResult]: |
|
|
"""Analyze vendor behavior patterns.""" |
|
|
patterns = [] |
|
|
|
|
|
|
|
|
vendor_stats = defaultdict(lambda: { |
|
|
"contracts": [], |
|
|
"total_value": 0, |
|
|
"organizations": set(), |
|
|
"months": set(), |
|
|
}) |
|
|
|
|
|
for contract in data: |
|
|
supplier = contract.get("fornecedor", {}) |
|
|
vendor_name = supplier.get("nome", "Unknown") |
|
|
valor = contract.get("valorInicial") or contract.get("valorGlobal") or 0 |
|
|
org_code = contract.get("_org_code") |
|
|
month = contract.get("_month") |
|
|
|
|
|
if vendor_name != "Unknown" and isinstance(valor, (int, float)): |
|
|
vendor_stats[vendor_name]["contracts"].append(contract) |
|
|
vendor_stats[vendor_name]["total_value"] += float(valor) |
|
|
if org_code: |
|
|
vendor_stats[vendor_name]["organizations"].add(org_code) |
|
|
if month: |
|
|
vendor_stats[vendor_name]["months"].add(month) |
|
|
|
|
|
|
|
|
for vendor_name, stats in vendor_stats.items(): |
|
|
org_count = len(stats["organizations"]) |
|
|
contract_count = len(stats["contracts"]) |
|
|
|
|
|
|
|
|
if org_count >= 3 and contract_count >= 5: |
|
|
significance = min(org_count / 6.0, 1.0) |
|
|
|
|
|
pattern = PatternResult( |
|
|
pattern_type="vendor_behavior", |
|
|
description=f"Fornecedor multi-organizacional: {vendor_name}", |
|
|
significance=significance, |
|
|
confidence=min(contract_count / 10.0, 1.0), |
|
|
insights=[ |
|
|
f"Fornecedor atua em {org_count} organizações diferentes", |
|
|
f"Total de {contract_count} contratos", |
|
|
f"Valor total: R$ {stats['total_value']:,.2f}", |
|
|
f"Presença em {len(stats['months'])} meses diferentes", |
|
|
], |
|
|
evidence={ |
|
|
"vendor_name": vendor_name, |
|
|
"organization_count": org_count, |
|
|
"contract_count": contract_count, |
|
|
"total_value": stats["total_value"], |
|
|
"organizations": list(stats["organizations"]), |
|
|
"months_active": len(stats["months"]), |
|
|
}, |
|
|
recommendations=[ |
|
|
"Verificar especialização do fornecedor", |
|
|
"Analisar competitividade dos processos", |
|
|
"Investigar relacionamento com múltiplas organizações", |
|
|
"Revisar histórico de performance", |
|
|
], |
|
|
entities_involved=[{ |
|
|
"vendor": vendor_name, |
|
|
"organizations": list(stats["organizations"]), |
|
|
"contract_count": contract_count, |
|
|
}], |
|
|
) |
|
|
|
|
|
patterns.append(pattern) |
|
|
|
|
|
return patterns |
|
|
|
|
|
async def _analyze_seasonal_patterns( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
context: AgentContext |
|
|
) -> List[PatternResult]: |
|
|
"""Analyze seasonal patterns in contracting.""" |
|
|
patterns = [] |
|
|
|
|
|
|
|
|
monthly_activity = defaultdict(lambda: {"count": 0, "value": 0}) |
|
|
|
|
|
for contract in data: |
|
|
month = contract.get("_month") |
|
|
valor = contract.get("valorInicial") or contract.get("valorGlobal") or 0 |
|
|
|
|
|
if month and isinstance(valor, (int, float)): |
|
|
monthly_activity[month]["count"] += 1 |
|
|
monthly_activity[month]["value"] += float(valor) |
|
|
|
|
|
if len(monthly_activity) < 6: |
|
|
return patterns |
|
|
|
|
|
|
|
|
months = sorted(monthly_activity.keys()) |
|
|
counts = [monthly_activity[m]["count"] for m in months] |
|
|
values = [monthly_activity[m]["value"] for m in months] |
|
|
|
|
|
|
|
|
if 12 in monthly_activity and len(months) >= 6: |
|
|
dec_count = monthly_activity[12]["count"] |
|
|
avg_count = np.mean([monthly_activity[m]["count"] for m in months if m != 12]) |
|
|
|
|
|
if avg_count > 0: |
|
|
dec_ratio = dec_count / avg_count |
|
|
|
|
|
if dec_ratio > 1.5: |
|
|
significance = min((dec_ratio - 1) / 2, 1.0) |
|
|
|
|
|
pattern = PatternResult( |
|
|
pattern_type="seasonal_patterns", |
|
|
description="Padrão sazonal: concentração em dezembro", |
|
|
significance=significance, |
|
|
confidence=min(dec_ratio / 2.0, 1.0), |
|
|
insights=[ |
|
|
f"Dezembro apresenta {dec_ratio:.1f}x mais contratos que a média", |
|
|
f"Contratos em dezembro: {dec_count}", |
|
|
f"Média mensal: {avg_count:.1f}", |
|
|
"Possível correria de fim de ano orçamentário", |
|
|
], |
|
|
evidence={ |
|
|
"december_count": dec_count, |
|
|
"average_monthly_count": avg_count, |
|
|
"december_ratio": dec_ratio, |
|
|
"monthly_distribution": dict(monthly_activity), |
|
|
}, |
|
|
recommendations=[ |
|
|
"Melhorar planejamento anual de contratações", |
|
|
"Distribuir contratações ao longo do ano", |
|
|
"Investigar qualidade dos processos de fim de ano", |
|
|
"Implementar cronograma de contratações", |
|
|
], |
|
|
entities_involved=[{ |
|
|
"pattern": "end_of_year_rush", |
|
|
"affected_months": [12], |
|
|
"intensity": dec_ratio, |
|
|
}], |
|
|
) |
|
|
|
|
|
patterns.append(pattern) |
|
|
|
|
|
return patterns |
|
|
|
|
|
async def _analyze_value_distribution( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
context: AgentContext |
|
|
) -> List[PatternResult]: |
|
|
"""Analyze contract value distribution patterns.""" |
|
|
patterns = [] |
|
|
|
|
|
|
|
|
values = [] |
|
|
for contract in data: |
|
|
valor = contract.get("valorInicial") or contract.get("valorGlobal") or 0 |
|
|
if isinstance(valor, (int, float)) and valor > 0: |
|
|
values.append(float(valor)) |
|
|
|
|
|
if len(values) < 10: |
|
|
return patterns |
|
|
|
|
|
|
|
|
values_array = np.array(values) |
|
|
|
|
|
|
|
|
percentiles = np.percentile(values_array, [25, 50, 75, 90, 95, 99]) |
|
|
|
|
|
|
|
|
value_ranges = { |
|
|
"micro": (0, 8000), |
|
|
"small": (8000, 176000), |
|
|
"medium": (176000, 1500000), |
|
|
"large": (1500000, float('inf')) |
|
|
} |
|
|
|
|
|
range_counts = {} |
|
|
range_values = {} |
|
|
|
|
|
for range_name, (min_val, max_val) in value_ranges.items(): |
|
|
count = sum(1 for v in values if min_val <= v < max_val) |
|
|
total_val = sum(v for v in values if min_val <= v < max_val) |
|
|
range_counts[range_name] = count |
|
|
range_values[range_name] = total_val |
|
|
|
|
|
total_contracts = len(values) |
|
|
total_value = sum(values) |
|
|
|
|
|
|
|
|
for range_name, count in range_counts.items(): |
|
|
percentage = count / total_contracts if total_contracts > 0 else 0 |
|
|
value_percentage = range_values[range_name] / total_value if total_value > 0 else 0 |
|
|
|
|
|
|
|
|
if percentage > 0.7: |
|
|
significance = percentage |
|
|
|
|
|
pattern = PatternResult( |
|
|
pattern_type="value_distribution", |
|
|
description=f"Concentração em contratos de valor {range_name}", |
|
|
significance=significance, |
|
|
confidence=percentage, |
|
|
insights=[ |
|
|
f"{percentage:.1%} dos contratos estão na faixa {range_name}", |
|
|
f"Representam {value_percentage:.1%} do valor total", |
|
|
f"Total de {count} contratos nesta faixa", |
|
|
f"Faixa de valores: R$ {value_ranges[range_name][0]:,.2f} - R$ {value_ranges[range_name][1]:,.2f}", |
|
|
], |
|
|
evidence={ |
|
|
"range_name": range_name, |
|
|
"concentration_percentage": percentage * 100, |
|
|
"value_percentage": value_percentage * 100, |
|
|
"contract_count": count, |
|
|
"range_limits": value_ranges[range_name], |
|
|
"distribution": range_counts, |
|
|
}, |
|
|
recommendations=[ |
|
|
"Analisar adequação dos valores contratados", |
|
|
"Verificar se há fracionamento inadequado", |
|
|
"Revisar modalidades licitatórias utilizadas", |
|
|
"Comparar com benchmarks do setor", |
|
|
], |
|
|
entities_involved=[{ |
|
|
"value_range": range_name, |
|
|
"contract_count": count, |
|
|
"percentage": percentage * 100, |
|
|
}], |
|
|
) |
|
|
|
|
|
patterns.append(pattern) |
|
|
|
|
|
return patterns |
|
|
|
|
|
async def _perform_correlation_analysis( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
context: AgentContext |
|
|
) -> List[CorrelationResult]: |
|
|
"""Perform correlation analysis between variables.""" |
|
|
correlations = [] |
|
|
|
|
|
|
|
|
|
|
|
org_month_data = defaultdict(lambda: defaultdict(lambda: {"count": 0, "value": 0})) |
|
|
|
|
|
for contract in data: |
|
|
org_code = contract.get("_org_code") |
|
|
month = contract.get("_month") |
|
|
valor = contract.get("valorInicial") or contract.get("valorGlobal") or 0 |
|
|
|
|
|
if org_code and month and isinstance(valor, (int, float)): |
|
|
org_month_data[org_code][month]["count"] += 1 |
|
|
org_month_data[org_code][month]["value"] += float(valor) |
|
|
|
|
|
|
|
|
if len(org_month_data) >= 3: |
|
|
monthly_counts = [] |
|
|
monthly_avg_values = [] |
|
|
|
|
|
for org_code, month_data in org_month_data.items(): |
|
|
for month, stats in month_data.items(): |
|
|
if stats["count"] > 0: |
|
|
monthly_counts.append(stats["count"]) |
|
|
monthly_avg_values.append(stats["value"] / stats["count"]) |
|
|
|
|
|
if len(monthly_counts) >= 10 and len(monthly_avg_values) >= 10: |
|
|
|
|
|
correlation_coef = np.corrcoef(monthly_counts, monthly_avg_values)[0, 1] |
|
|
|
|
|
if abs(correlation_coef) > self.correlation_threshold: |
|
|
significance_level = "high" if abs(correlation_coef) > 0.7 else "medium" |
|
|
|
|
|
interpretation = ( |
|
|
"Correlação negativa indica que meses com mais contratos tendem a ter valores médios menores" |
|
|
if correlation_coef < 0 else |
|
|
"Correlação positiva indica que meses com mais contratos tendem a ter valores médios maiores" |
|
|
) |
|
|
|
|
|
correlation = CorrelationResult( |
|
|
correlation_type="count_vs_value", |
|
|
variables=["monthly_contract_count", "monthly_average_value"], |
|
|
correlation_coefficient=correlation_coef, |
|
|
p_value=None, |
|
|
significance_level=significance_level, |
|
|
description=f"Correlação entre quantidade e valor médio de contratos", |
|
|
business_interpretation=interpretation, |
|
|
evidence={ |
|
|
"correlation_coefficient": correlation_coef, |
|
|
"sample_size": len(monthly_counts), |
|
|
"count_range": [min(monthly_counts), max(monthly_counts)], |
|
|
"value_range": [min(monthly_avg_values), max(monthly_avg_values)], |
|
|
}, |
|
|
recommendations=[ |
|
|
"Investigar fatores que influenciam essa correlação", |
|
|
"Analisar estratégias de contratação", |
|
|
"Verificar planejamento orçamentário", |
|
|
"Monitorar tendências futuras", |
|
|
], |
|
|
) |
|
|
|
|
|
correlations.append(correlation) |
|
|
|
|
|
return correlations |
|
|
|
|
|
async def _calculate_efficiency_metrics( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
context: AgentContext |
|
|
) -> List[PatternResult]: |
|
|
"""Calculate efficiency metrics for organizations.""" |
|
|
patterns = [] |
|
|
|
|
|
|
|
|
org_metrics = defaultdict(lambda: { |
|
|
"total_value": 0, |
|
|
"contract_count": 0, |
|
|
"unique_vendors": set(), |
|
|
"months_active": set(), |
|
|
}) |
|
|
|
|
|
for contract in data: |
|
|
org_code = contract.get("_org_code") |
|
|
valor = contract.get("valorInicial") or contract.get("valorGlobal") or 0 |
|
|
supplier = contract.get("fornecedor", {}).get("nome") |
|
|
month = contract.get("_month") |
|
|
|
|
|
if org_code and isinstance(valor, (int, float)): |
|
|
org_metrics[org_code]["total_value"] += float(valor) |
|
|
org_metrics[org_code]["contract_count"] += 1 |
|
|
if supplier: |
|
|
org_metrics[org_code]["unique_vendors"].add(supplier) |
|
|
if month: |
|
|
org_metrics[org_code]["months_active"].add(month) |
|
|
|
|
|
|
|
|
efficiency_scores = {} |
|
|
for org_code, metrics in org_metrics.items(): |
|
|
if metrics["contract_count"] > 0: |
|
|
vendor_diversity = len(metrics["unique_vendors"]) / metrics["contract_count"] |
|
|
avg_contract_value = metrics["total_value"] / metrics["contract_count"] |
|
|
activity_consistency = len(metrics["months_active"]) / 12 |
|
|
|
|
|
|
|
|
efficiency_score = (vendor_diversity * 0.4 + activity_consistency * 0.6) |
|
|
|
|
|
efficiency_scores[org_code] = { |
|
|
"score": efficiency_score, |
|
|
"vendor_diversity": vendor_diversity, |
|
|
"avg_contract_value": avg_contract_value, |
|
|
"activity_consistency": activity_consistency, |
|
|
"metrics": metrics, |
|
|
} |
|
|
|
|
|
|
|
|
if efficiency_scores: |
|
|
scores = [eff["score"] for eff in efficiency_scores.values()] |
|
|
mean_score = np.mean(scores) |
|
|
std_score = np.std(scores) |
|
|
|
|
|
for org_code, efficiency in efficiency_scores.items(): |
|
|
if std_score > 0: |
|
|
z_score = (efficiency["score"] - mean_score) / std_score |
|
|
|
|
|
if abs(z_score) > 1.0: |
|
|
performance_type = "high_efficiency" if z_score > 0 else "low_efficiency" |
|
|
significance = min(abs(z_score) / 2.0, 1.0) |
|
|
|
|
|
pattern = PatternResult( |
|
|
pattern_type="efficiency_metrics", |
|
|
description=f"Performance organizacional {performance_type}: {org_code}", |
|
|
significance=significance, |
|
|
confidence=min(abs(z_score) / 1.5, 1.0), |
|
|
insights=[ |
|
|
f"Score de eficiência: {efficiency['score']:.2f}", |
|
|
f"Diversidade de fornecedores: {efficiency['vendor_diversity']:.2f}", |
|
|
f"Consistência de atividade: {efficiency['activity_consistency']:.2f}", |
|
|
f"Valor médio por contrato: R$ {efficiency['avg_contract_value']:,.2f}", |
|
|
], |
|
|
evidence={ |
|
|
"organization": org_code, |
|
|
"efficiency_score": efficiency["score"], |
|
|
"vendor_diversity": efficiency["vendor_diversity"], |
|
|
"activity_consistency": efficiency["activity_consistency"], |
|
|
"z_score": z_score, |
|
|
"benchmark_average": mean_score, |
|
|
}, |
|
|
recommendations=[ |
|
|
"Analisar fatores que contribuem para a performance", |
|
|
"Compartilhar boas práticas com outras organizações", |
|
|
"Investigar oportunidades de melhoria" if z_score < 0 else "Manter padrão de excelência", |
|
|
"Monitorar tendências de performance", |
|
|
], |
|
|
entities_involved=[{ |
|
|
"organization": org_code, |
|
|
"efficiency_score": efficiency["score"], |
|
|
"performance_type": performance_type, |
|
|
}], |
|
|
) |
|
|
|
|
|
patterns.append(pattern) |
|
|
|
|
|
return patterns |
|
|
|
|
|
async def _analyze_spectral_patterns( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
request: AnalysisRequest, |
|
|
context: AgentContext |
|
|
) -> List[PatternResult]: |
|
|
""" |
|
|
Analyze spectral patterns using Fourier transforms. |
|
|
|
|
|
Args: |
|
|
data: Contract data for analysis |
|
|
request: Analysis request parameters |
|
|
context: Agent context |
|
|
|
|
|
Returns: |
|
|
List of spectral pattern results |
|
|
""" |
|
|
patterns = [] |
|
|
|
|
|
try: |
|
|
|
|
|
org_groups = defaultdict(list) |
|
|
for contract in data: |
|
|
org_code = contract.get("_org_code", "unknown") |
|
|
org_groups[org_code].append(contract) |
|
|
|
|
|
for org_code, org_contracts in org_groups.items(): |
|
|
if len(org_contracts) < 30: |
|
|
continue |
|
|
|
|
|
|
|
|
time_series_data = self._prepare_time_series_for_org(org_contracts) |
|
|
if len(time_series_data) < 20: |
|
|
continue |
|
|
|
|
|
|
|
|
spending_data = pd.Series([item['value'] for item in time_series_data]) |
|
|
timestamps = pd.DatetimeIndex([item['date'] for item in time_series_data]) |
|
|
|
|
|
|
|
|
spectral_features = self.spectral_analyzer.analyze_time_series( |
|
|
spending_data, timestamps |
|
|
) |
|
|
|
|
|
|
|
|
periodic_patterns = self.spectral_analyzer.find_periodic_patterns( |
|
|
spending_data, timestamps, entity_name=f"Org_{org_code}" |
|
|
) |
|
|
|
|
|
|
|
|
for i, period_pattern in enumerate(periodic_patterns[:5]): |
|
|
if period_pattern.amplitude > 0.1: |
|
|
pattern = PatternResult( |
|
|
pattern_type="spectral_periodic", |
|
|
description=f"Padrão periódico detectado: {period_pattern.period_days:.1f} dias", |
|
|
significance=period_pattern.amplitude, |
|
|
confidence=period_pattern.confidence, |
|
|
insights=[ |
|
|
f"Período dominante: {period_pattern.period_days:.1f} dias", |
|
|
f"Força do padrão: {period_pattern.amplitude:.1%}", |
|
|
f"Tipo: {period_pattern.pattern_type}", |
|
|
period_pattern.business_interpretation |
|
|
], |
|
|
evidence={ |
|
|
"period_days": period_pattern.period_days, |
|
|
"frequency_hz": period_pattern.frequency_hz, |
|
|
"amplitude": period_pattern.amplitude, |
|
|
"pattern_type": period_pattern.pattern_type, |
|
|
"confidence": period_pattern.confidence, |
|
|
"spectral_entropy": spectral_features.spectral_entropy, |
|
|
"dominant_frequencies": spectral_features.dominant_frequencies, |
|
|
"seasonal_components": spectral_features.seasonal_components |
|
|
}, |
|
|
recommendations=[ |
|
|
f"Investigar causa do padrão de {period_pattern.period_days:.1f} dias", |
|
|
"Verificar se corresponde a processos de negócio conhecidos", |
|
|
"Analisar se há justificativa administrativa", |
|
|
"Considerar otimização do cronograma de contratações" |
|
|
], |
|
|
entities_involved=[{ |
|
|
"organization_code": org_code, |
|
|
"contracts_analyzed": len(org_contracts), |
|
|
"period_days": period_pattern.period_days, |
|
|
"pattern_strength": period_pattern.amplitude |
|
|
}], |
|
|
trend_direction=self._classify_trend_from_spectral(spectral_features), |
|
|
correlation_strength=period_pattern.amplitude |
|
|
) |
|
|
patterns.append(pattern) |
|
|
|
|
|
|
|
|
if spectral_features.spectral_entropy < 0.3: |
|
|
pattern = PatternResult( |
|
|
pattern_type="spectral_regularity", |
|
|
description=f"Padrão de gastos muito regular detectado (entropia: {spectral_features.spectral_entropy:.2f})", |
|
|
significance=1 - spectral_features.spectral_entropy, |
|
|
confidence=0.8, |
|
|
insights=[ |
|
|
f"Entropia espectral baixa: {spectral_features.spectral_entropy:.2f}", |
|
|
"Gastos seguem padrão muito regular", |
|
|
"Pode indicar processos automatizados ou planejamento rígido", |
|
|
f"Anomalia score: {spectral_features.anomaly_score:.2f}" |
|
|
], |
|
|
evidence={ |
|
|
"spectral_entropy": spectral_features.spectral_entropy, |
|
|
"anomaly_score": spectral_features.anomaly_score, |
|
|
"dominant_frequencies": spectral_features.dominant_frequencies[:5], |
|
|
"seasonal_components": spectral_features.seasonal_components |
|
|
}, |
|
|
recommendations=[ |
|
|
"Verificar se a regularidade é justificada", |
|
|
"Investigar processos de planejamento orçamentário", |
|
|
"Analisar flexibilidade nos cronogramas", |
|
|
"Considerar diversificação temporal" |
|
|
], |
|
|
entities_involved=[{ |
|
|
"organization_code": org_code, |
|
|
"spectral_entropy": spectral_features.spectral_entropy, |
|
|
"regularity_score": 1 - spectral_features.spectral_entropy |
|
|
}] |
|
|
) |
|
|
patterns.append(pattern) |
|
|
|
|
|
self.logger.info( |
|
|
"spectral_analysis_completed", |
|
|
patterns_found=len(patterns), |
|
|
organizations_analyzed=len(org_groups) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Error in spectral pattern analysis: {str(e)}") |
|
|
|
|
|
return patterns |
|
|
|
|
|
async def _perform_cross_spectral_analysis( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
request: AnalysisRequest, |
|
|
context: AgentContext |
|
|
) -> List[CorrelationResult]: |
|
|
""" |
|
|
Perform cross-spectral analysis between organizations. |
|
|
|
|
|
Args: |
|
|
data: Contract data for analysis |
|
|
request: Analysis request parameters |
|
|
context: Agent context |
|
|
|
|
|
Returns: |
|
|
List of cross-spectral correlation results |
|
|
""" |
|
|
correlations = [] |
|
|
|
|
|
try: |
|
|
|
|
|
org_groups = defaultdict(list) |
|
|
for contract in data: |
|
|
org_code = contract.get("_org_code", "unknown") |
|
|
org_groups[org_code].append(contract) |
|
|
|
|
|
|
|
|
valid_orgs = {org: contracts for org, contracts in org_groups.items() |
|
|
if len(contracts) >= 30} |
|
|
|
|
|
if len(valid_orgs) < 2: |
|
|
return correlations |
|
|
|
|
|
org_list = list(valid_orgs.keys()) |
|
|
|
|
|
|
|
|
for i, org1 in enumerate(org_list): |
|
|
for org2 in org_list[i+1:]: |
|
|
try: |
|
|
|
|
|
ts1 = self._prepare_time_series_for_org(valid_orgs[org1]) |
|
|
ts2 = self._prepare_time_series_for_org(valid_orgs[org2]) |
|
|
|
|
|
if len(ts1) < 20 or len(ts2) < 20: |
|
|
continue |
|
|
|
|
|
|
|
|
all_dates = sorted(set([item['date'] for item in ts1 + ts2])) |
|
|
if len(all_dates) < 20: |
|
|
continue |
|
|
|
|
|
|
|
|
data1 = pd.Series(index=all_dates, dtype=float).fillna(0) |
|
|
data2 = pd.Series(index=all_dates, dtype=float).fillna(0) |
|
|
|
|
|
for item in ts1: |
|
|
data1[item['date']] += item['value'] |
|
|
for item in ts2: |
|
|
data2[item['date']] += item['value'] |
|
|
|
|
|
timestamps = pd.DatetimeIndex(all_dates) |
|
|
|
|
|
|
|
|
cross_spectral_result = self.spectral_analyzer.cross_spectral_analysis( |
|
|
data1, data2, f"Org_{org1}", f"Org_{org2}", timestamps |
|
|
) |
|
|
|
|
|
if cross_spectral_result and cross_spectral_result.get('max_coherence', 0) > 0.5: |
|
|
correlation = CorrelationResult( |
|
|
correlation_type="cross_spectral", |
|
|
variables=[f"Org_{org1}", f"Org_{org2}"], |
|
|
correlation_coefficient=cross_spectral_result['correlation_coefficient'], |
|
|
p_value=None, |
|
|
significance_level=self._assess_spectral_significance( |
|
|
cross_spectral_result['max_coherence'] |
|
|
), |
|
|
description=f"Correlação espectral entre organizações {org1} e {org2}", |
|
|
business_interpretation=cross_spectral_result['business_interpretation'], |
|
|
evidence={ |
|
|
"max_coherence": cross_spectral_result['max_coherence'], |
|
|
"mean_coherence": cross_spectral_result['mean_coherence'], |
|
|
"correlated_periods_days": cross_spectral_result['correlated_periods_days'], |
|
|
"synchronization_score": cross_spectral_result['synchronization_score'], |
|
|
"correlated_frequencies": cross_spectral_result['correlated_frequencies'] |
|
|
}, |
|
|
recommendations=[ |
|
|
"Investigar possível coordenação entre organizações", |
|
|
"Verificar se há fornecedores em comum", |
|
|
"Analisar sincronização de processos", |
|
|
"Revisar independência das contratações" |
|
|
] |
|
|
) |
|
|
correlations.append(correlation) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.warning(f"Cross-spectral analysis failed for {org1}-{org2}: {str(e)}") |
|
|
continue |
|
|
|
|
|
self.logger.info( |
|
|
"cross_spectral_analysis_completed", |
|
|
correlations_found=len(correlations), |
|
|
organizations_compared=len(org_list) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Error in cross-spectral analysis: {str(e)}") |
|
|
|
|
|
return correlations |
|
|
|
|
|
def _prepare_time_series_for_org(self, contracts: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
"""Prepare time series data for a specific organization.""" |
|
|
time_series = [] |
|
|
|
|
|
for contract in contracts: |
|
|
|
|
|
date_str = ( |
|
|
contract.get("dataAssinatura") or |
|
|
contract.get("dataPublicacao") or |
|
|
contract.get("dataInicio") |
|
|
) |
|
|
|
|
|
if not date_str: |
|
|
continue |
|
|
|
|
|
try: |
|
|
|
|
|
date_parts = date_str.split("/") |
|
|
if len(date_parts) == 3: |
|
|
day, month, year = int(date_parts[0]), int(date_parts[1]), int(date_parts[2]) |
|
|
date_obj = datetime(year, month, day) |
|
|
|
|
|
|
|
|
valor = contract.get("valorInicial") or contract.get("valorGlobal") or 0 |
|
|
if isinstance(valor, (int, float)) and valor > 0: |
|
|
time_series.append({ |
|
|
'date': date_obj, |
|
|
'value': float(valor), |
|
|
'contract_id': contract.get('id') |
|
|
}) |
|
|
|
|
|
except (ValueError, IndexError): |
|
|
continue |
|
|
|
|
|
|
|
|
time_series.sort(key=lambda x: x['date']) |
|
|
|
|
|
|
|
|
daily_aggregates = defaultdict(float) |
|
|
for item in time_series: |
|
|
daily_aggregates[item['date']] += item['value'] |
|
|
|
|
|
return [{'date': date, 'value': value} for date, value in daily_aggregates.items()] |
|
|
|
|
|
def _classify_trend_from_spectral(self, features: SpectralFeatures) -> Optional[str]: |
|
|
"""Classify trend direction from spectral features.""" |
|
|
|
|
|
if hasattr(features, 'trend_component') and len(features.trend_component) > 10: |
|
|
trend_start = np.mean(features.trend_component[:len(features.trend_component)//3]) |
|
|
trend_end = np.mean(features.trend_component[-len(features.trend_component)//3:]) |
|
|
|
|
|
if trend_end > trend_start * 1.1: |
|
|
return "increasing" |
|
|
elif trend_end < trend_start * 0.9: |
|
|
return "decreasing" |
|
|
else: |
|
|
return "stable" |
|
|
|
|
|
return None |
|
|
|
|
|
def _assess_spectral_significance(self, coherence: float) -> str: |
|
|
"""Assess significance level of spectral coherence.""" |
|
|
if coherence > 0.8: |
|
|
return "high" |
|
|
elif coherence > 0.6: |
|
|
return "medium" |
|
|
else: |
|
|
return "low" |
|
|
|
|
|
def _generate_insights( |
|
|
self, |
|
|
patterns: List[PatternResult], |
|
|
correlations: List[CorrelationResult], |
|
|
data: List[Dict[str, Any]] |
|
|
) -> List[str]: |
|
|
"""Generate high-level insights from analysis results.""" |
|
|
insights = [] |
|
|
|
|
|
|
|
|
total_contracts = len(data) |
|
|
total_value = sum( |
|
|
float(c.get("valorInicial") or c.get("valorGlobal") or 0) |
|
|
for c in data |
|
|
if isinstance(c.get("valorInicial") or c.get("valorGlobal"), (int, float)) |
|
|
) |
|
|
|
|
|
insights.append(f"Analisados {total_contracts} contratos totalizando R$ {total_value:,.2f}") |
|
|
|
|
|
|
|
|
if patterns: |
|
|
high_significance = [p for p in patterns if p.significance > 0.7] |
|
|
insights.append(f"Identificados {len(patterns)} padrões, sendo {len(high_significance)} de alta significância") |
|
|
|
|
|
|
|
|
if high_significance: |
|
|
top_pattern = max(high_significance, key=lambda p: p.significance) |
|
|
insights.append(f"Padrão mais significativo: {top_pattern.description}") |
|
|
|
|
|
|
|
|
if correlations: |
|
|
strong_correlations = [c for c in correlations if abs(c.correlation_coefficient) > 0.7] |
|
|
insights.append(f"Encontradas {len(correlations)} correlações, sendo {len(strong_correlations)} fortes") |
|
|
|
|
|
|
|
|
risk_patterns = [p for p in patterns if p.pattern_type in ["spending_trends", "vendor_behavior"]] |
|
|
if risk_patterns: |
|
|
insights.append(f"Identificados {len(risk_patterns)} padrões que requerem atenção especial") |
|
|
|
|
|
return insights |
|
|
|
|
|
def _generate_analysis_summary( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
patterns: List[PatternResult], |
|
|
correlations: List[CorrelationResult] |
|
|
) -> Dict[str, Any]: |
|
|
"""Generate summary statistics for the analysis.""" |
|
|
|
|
|
total_value = sum( |
|
|
float(c.get("valorInicial") or c.get("valorGlobal") or 0) |
|
|
for c in data |
|
|
if isinstance(c.get("valorInicial") or c.get("valorGlobal"), (int, float)) |
|
|
) |
|
|
|
|
|
organizations = len(set(c.get("_org_code") for c in data if c.get("_org_code"))) |
|
|
months_covered = len(set(c.get("_month") for c in data if c.get("_month"))) |
|
|
|
|
|
|
|
|
pattern_types = Counter(p.pattern_type for p in patterns) |
|
|
high_significance_patterns = len([p for p in patterns if p.significance > 0.7]) |
|
|
|
|
|
|
|
|
analysis_score = min( |
|
|
(len(patterns) + len(correlations)) / max(len(data) / 10, 1) * 5, |
|
|
10 |
|
|
) |
|
|
|
|
|
return { |
|
|
"total_records": len(data), |
|
|
"total_value": total_value, |
|
|
"organizations_analyzed": organizations, |
|
|
"months_covered": months_covered, |
|
|
"patterns_found": len(patterns), |
|
|
"correlations_found": len(correlations), |
|
|
"pattern_types": dict(pattern_types), |
|
|
"high_significance_patterns": high_significance_patterns, |
|
|
"analysis_score": analysis_score, |
|
|
"avg_contract_value": total_value / len(data) if data else 0, |
|
|
} |
|
|
|
|
|
def _pattern_to_dict(self, pattern: PatternResult) -> Dict[str, Any]: |
|
|
"""Convert PatternResult to dictionary for serialization.""" |
|
|
return { |
|
|
"type": pattern.pattern_type, |
|
|
"description": pattern.description, |
|
|
"significance": pattern.significance, |
|
|
"confidence": pattern.confidence, |
|
|
"insights": pattern.insights, |
|
|
"evidence": pattern.evidence, |
|
|
"recommendations": pattern.recommendations, |
|
|
"entities_involved": pattern.entities_involved, |
|
|
"trend_direction": pattern.trend_direction, |
|
|
"correlation_strength": pattern.correlation_strength, |
|
|
} |
|
|
|
|
|
def _correlation_to_dict(self, correlation: CorrelationResult) -> Dict[str, Any]: |
|
|
"""Convert CorrelationResult to dictionary for serialization.""" |
|
|
return { |
|
|
"type": correlation.correlation_type, |
|
|
"variables": correlation.variables, |
|
|
"correlation_coefficient": correlation.correlation_coefficient, |
|
|
"p_value": correlation.p_value, |
|
|
"significance_level": correlation.significance_level, |
|
|
"description": correlation.description, |
|
|
"business_interpretation": correlation.business_interpretation, |
|
|
"evidence": correlation.evidence, |
|
|
"recommendations": correlation.recommendations, |
|
|
} |