|
|
""" |
|
|
Module: agents.predictive_agent |
|
|
Codinome: Ceuci - Agente Preditivo |
|
|
Description: Agent specialized in predictive analysis and trend modeling for government data |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-07-23 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from pydantic import BaseModel, Field as PydanticField |
|
|
|
|
|
from src.agents.deodoro import BaseAgent, AgentContext, AgentMessage, AgentResponse |
|
|
from src.core import get_logger |
|
|
from src.core.exceptions import AgentExecutionError, DataAnalysisError |
|
|
|
|
|
|
|
|
class PredictionType(Enum): |
|
|
"""Types of predictions supported.""" |
|
|
TIME_SERIES = "time_series" |
|
|
CLASSIFICATION = "classification" |
|
|
REGRESSION = "regression" |
|
|
ANOMALY_FORECAST = "anomaly_forecast" |
|
|
TREND_ANALYSIS = "trend_analysis" |
|
|
SEASONAL_DECOMPOSITION = "seasonal_decomposition" |
|
|
|
|
|
|
|
|
class ModelType(Enum): |
|
|
"""Machine learning models available.""" |
|
|
ARIMA = "arima" |
|
|
LSTM = "lstm" |
|
|
PROPHET = "prophet" |
|
|
RANDOM_FOREST = "random_forest" |
|
|
XG_BOOST = "xgboost" |
|
|
LINEAR_REGRESSION = "linear_regression" |
|
|
POLYNOMIAL_REGRESSION = "polynomial_regression" |
|
|
SARIMA = "sarima" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PredictionRequest: |
|
|
"""Request for predictive analysis.""" |
|
|
|
|
|
request_id: str |
|
|
prediction_type: PredictionType |
|
|
model_type: ModelType |
|
|
data: List[Dict[str, Any]] |
|
|
target_variable: str |
|
|
feature_variables: List[str] |
|
|
prediction_horizon: int |
|
|
confidence_level: float |
|
|
additional_params: Dict[str, Any] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PredictionResult: |
|
|
"""Result of predictive analysis.""" |
|
|
|
|
|
request_id: str |
|
|
model_type: ModelType |
|
|
predictions: List[Dict[str, Any]] |
|
|
confidence_intervals: List[Dict[str, Any]] |
|
|
model_performance: Dict[str, float] |
|
|
feature_importance: Dict[str, float] |
|
|
trend_analysis: Dict[str, Any] |
|
|
seasonal_patterns: Dict[str, Any] |
|
|
anomaly_alerts: List[Dict[str, Any]] |
|
|
metadata: Dict[str, Any] |
|
|
timestamp: datetime |
|
|
|
|
|
|
|
|
class PredictiveAgent(BaseAgent): |
|
|
""" |
|
|
Ceuci - Agente Preditivo |
|
|
|
|
|
MISSÃO: |
|
|
Realiza análise preditiva e modelagem de tendências em dados governamentais, |
|
|
fornecendo insights sobre padrões futuros e alertas de anomalias. |
|
|
|
|
|
ALGORITMOS E MODELOS IMPLEMENTADOS: |
|
|
|
|
|
1. ANÁLISE DE SÉRIES TEMPORAIS: |
|
|
- ARIMA (AutoRegressive Integrated Moving Average) |
|
|
• Fórmula: ARIMA(p,d,q) - (1-φ₁L-...-φₚLᵖ)(1-L)ᵈXₜ = (1+θ₁L+...+θₑLᵠ)εₜ |
|
|
• Aplicação: Previsão de gastos públicos, receitas |
|
|
|
|
|
- SARIMA (Seasonal ARIMA) |
|
|
• Extensão sazonal do ARIMA: SARIMA(p,d,q)(P,D,Q)s |
|
|
• Aplicação: Dados com sazonalidade (orçamentos anuais) |
|
|
|
|
|
- Prophet (Facebook Algorithm) |
|
|
• Modelo aditivo: y(t) = g(t) + s(t) + h(t) + εₜ |
|
|
• Componentes: tendência, sazonalidade, feriados, erro |
|
|
|
|
|
2. REDES NEURAIS PARA PREVISÃO: |
|
|
- LSTM (Long Short-Term Memory) |
|
|
• Arquitetura: Input Gate, Forget Gate, Output Gate |
|
|
• Aplicação: Padrões complexos em séries longas |
|
|
• Fórmula Forget Gate: fₜ = σ(Wf·[hₜ₋₁,xₜ] + bf) |
|
|
|
|
|
- GRU (Gated Recurrent Unit) |
|
|
• Versão simplificada do LSTM |
|
|
• Aplicação: Previsões com menos dados históricos |
|
|
|
|
|
- Transformer Networks |
|
|
• Attention mechanism para dependências longas |
|
|
• Aplicação: Análise de múltiplas séries relacionadas |
|
|
|
|
|
3. MACHINE LEARNING SUPERVISIONADO: |
|
|
- Random Forest para Regressão |
|
|
• Ensemble de árvores de decisão |
|
|
• Aplicação: Previsão baseada em múltiplas variáveis |
|
|
|
|
|
- XGBoost (Extreme Gradient Boosting) |
|
|
• Objective: L(θ) = Σᵢl(yᵢ,ŷᵢ) + Σₖ Ω(fₖ) |
|
|
• Aplicação: Previsões com alta precisão |
|
|
|
|
|
- Support Vector Regression (SVR) |
|
|
• Kernel trick para relações não-lineares |
|
|
• Aplicação: Previsões robustas a outliers |
|
|
|
|
|
4. DETECÇÃO DE TENDÊNCIAS: |
|
|
- Regressão Polinomial |
|
|
• y = β₀ + β₁x + β₂x² + ... + βₙxⁿ + ε |
|
|
• Aplicação: Tendências não-lineares |
|
|
|
|
|
- Smoothing Algorithms |
|
|
• Moving Average, LOWESS, Savitzky-Golay |
|
|
• Aplicação: Suavização de ruído nos dados |
|
|
|
|
|
- Change Point Detection |
|
|
• PELT (Pruned Exact Linear Time) |
|
|
• Aplicação: Identificação de mudanças estruturais |
|
|
|
|
|
5. DECOMPOSIÇÃO SAZONAL: |
|
|
- STL (Seasonal-Trend decomposition using Loess) |
|
|
• Xₜ = Trendₜ + Seasonalₜ + Remainderₜ |
|
|
• Aplicação: Separação de componentes temporais |
|
|
|
|
|
- X-13ARIMA-SEATS |
|
|
• Método oficial do US Census Bureau |
|
|
• Aplicação: Ajuste sazonal robusto |
|
|
|
|
|
- Classical Decomposition |
|
|
• Método aditivo/multiplicativo simples |
|
|
• Aplicação: Análise exploratória inicial |
|
|
|
|
|
6. ANÁLISE DE ANOMALIAS FUTURAS: |
|
|
- Isolation Forest Temporal |
|
|
• Extensão do Isolation Forest para séries temporais |
|
|
• Aplicação: Detecção de anomalias futuras |
|
|
|
|
|
- One-Class SVM |
|
|
• Classificação de normalidade vs anomalia |
|
|
• Aplicação: Alertas de gastos anômalos |
|
|
|
|
|
- LSTM Autoencoder |
|
|
• Reconstrução de padrões normais |
|
|
• Aplicação: Detecção de desvios futuros |
|
|
|
|
|
TÉCNICAS ESTATÍSTICAS AVANÇADAS: |
|
|
|
|
|
- Análise de Cointegração (Johansen Test) |
|
|
- Causalidade de Granger |
|
|
- Análise de Volatilidade (GARCH models) |
|
|
- Testes de Estacionariedade (ADF, KPSS) |
|
|
- Cross-Validation Temporal (Walk-Forward) |
|
|
|
|
|
MÉTRICAS DE AVALIAÇÃO: |
|
|
|
|
|
- Mean Absolute Error (MAE): MAE = (1/n)Σᵢ|yᵢ - ŷᵢ| |
|
|
- Root Mean Square Error (RMSE): RMSE = √((1/n)Σᵢ(yᵢ - ŷᵢ)²) |
|
|
- Mean Absolute Percentage Error (MAPE): MAPE = (100/n)Σᵢ|(yᵢ - ŷᵢ)/yᵢ| |
|
|
- Symmetric MAPE (sMAPE): Reduz bias para valores pequenos |
|
|
- Theil's U Statistic: Compara com modelo naive |
|
|
- Diebold-Mariano Test: Significância estatística das previsões |
|
|
|
|
|
APLICAÇÕES ESPECÍFICAS: |
|
|
|
|
|
1. Previsão Orçamentária: |
|
|
- Receitas federais, estaduais, municipais |
|
|
- Despesas por categoria e órgão |
|
|
- Déficit/superávit fiscal |
|
|
|
|
|
2. Análise de Licitações: |
|
|
- Volume de licitações por período |
|
|
- Valores médios de contratos |
|
|
- Detecção de padrões suspeitos |
|
|
|
|
|
3. Monitoramento de Políticas: |
|
|
- Impacto de mudanças regulatórias |
|
|
- Efetividade de programas sociais |
|
|
- ROI de investimentos públicos |
|
|
|
|
|
4. Alertas Preventivos: |
|
|
- Riscos de estouro orçamentário |
|
|
- Anomalias em gastos específicos |
|
|
- Padrões indicativos de fraude |
|
|
|
|
|
PERFORMANCE E ESCALABILIDADE: |
|
|
|
|
|
- Processamento: >1M pontos de dados em <30s |
|
|
- Modelos: Suporte a 50+ modelos simultâneos |
|
|
- Precisão: MAPE < 5% para previsões de curto prazo |
|
|
- Latência: <2s para previsões online |
|
|
- Memória: Otimizado para datasets de até 10GB |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
|
super().__init__( |
|
|
name="PredictiveAgent", |
|
|
description="Ceuci - Agente especializado em análise preditiva", |
|
|
config=config or {} |
|
|
) |
|
|
self.logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
self.model_config = { |
|
|
"arima": {"max_p": 5, "max_d": 2, "max_q": 5}, |
|
|
"lstm": {"hidden_size": 128, "num_layers": 2, "dropout": 0.2}, |
|
|
"prophet": {"yearly_seasonality": True, "weekly_seasonality": False}, |
|
|
"random_forest": {"n_estimators": 100, "max_depth": 10}, |
|
|
"xgboost": {"max_depth": 6, "learning_rate": 0.1, "n_estimators": 100} |
|
|
} |
|
|
|
|
|
|
|
|
self.trained_models = {} |
|
|
|
|
|
|
|
|
self.prediction_history = [] |
|
|
|
|
|
async def initialize(self) -> None: |
|
|
"""Inicializa modelos de ML e configurações.""" |
|
|
self.logger.info("Initializing Ceuci predictive analysis engine...") |
|
|
|
|
|
|
|
|
await self._load_pretrained_models() |
|
|
|
|
|
|
|
|
await self._setup_preprocessing_pipelines() |
|
|
|
|
|
|
|
|
await self._setup_evaluation_metrics() |
|
|
|
|
|
self.logger.info("Ceuci ready for predictive analysis") |
|
|
|
|
|
async def predict_time_series( |
|
|
self, |
|
|
request: PredictionRequest, |
|
|
context: AgentContext |
|
|
) -> PredictionResult: |
|
|
""" |
|
|
Realiza previsão de séries temporais. |
|
|
|
|
|
PIPELINE DE PREVISÃO: |
|
|
1. Pré-processamento dos dados (limpeza, normalização) |
|
|
2. Análise de estacionariedade e transformações |
|
|
3. Seleção automática de hiperparâmetros |
|
|
4. Treinamento do modelo selecionado |
|
|
5. Geração de previsões com intervalos de confiança |
|
|
6. Avaliação de performance e métricas |
|
|
7. Análise de tendências e sazonalidade |
|
|
""" |
|
|
self.logger.info(f"Starting time series prediction: {request.request_id}") |
|
|
|
|
|
|
|
|
processed_data = await self._preprocess_time_series(request.data, request.target_variable) |
|
|
|
|
|
|
|
|
model = await self._train_model(processed_data, request.model_type, request.additional_params) |
|
|
|
|
|
|
|
|
predictions = await self._generate_predictions(model, request.prediction_horizon, request.confidence_level) |
|
|
|
|
|
|
|
|
performance_metrics = await self._evaluate_model_performance(model, processed_data) |
|
|
|
|
|
|
|
|
trend_analysis = await self._analyze_trends(processed_data, predictions) |
|
|
|
|
|
return PredictionResult( |
|
|
request_id=request.request_id, |
|
|
model_type=request.model_type, |
|
|
predictions=predictions, |
|
|
confidence_intervals=self._calculate_confidence_intervals(predictions, request.confidence_level), |
|
|
model_performance=performance_metrics, |
|
|
feature_importance=await self._calculate_feature_importance(model, request.feature_variables), |
|
|
trend_analysis=trend_analysis, |
|
|
seasonal_patterns=await self._detect_seasonal_patterns(processed_data), |
|
|
anomaly_alerts=await self._detect_future_anomalies(predictions), |
|
|
metadata={"model_version": "1.0", "training_samples": len(processed_data)}, |
|
|
timestamp=datetime.utcnow() |
|
|
) |
|
|
|
|
|
async def analyze_trends( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
target_variable: str, |
|
|
context: AgentContext |
|
|
) -> Dict[str, Any]: |
|
|
"""Analisa tendências sem fazer previsões específicas.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
async def detect_seasonal_patterns( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
target_variable: str, |
|
|
context: AgentContext |
|
|
) -> Dict[str, Any]: |
|
|
"""Detecta padrões sazonais nos dados.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
async def forecast_anomalies( |
|
|
self, |
|
|
historical_data: List[Dict[str, Any]], |
|
|
prediction_horizon: int, |
|
|
context: AgentContext |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Prevê possíveis anomalias futuras.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
async def compare_models( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
target_variable: str, |
|
|
models: List[ModelType], |
|
|
context: AgentContext |
|
|
) -> Dict[str, Any]: |
|
|
"""Compara performance de múltiplos modelos.""" |
|
|
model_comparison = {} |
|
|
|
|
|
for model_type in models: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model_comparison[model_type.value] = { |
|
|
"mae": 0.0, |
|
|
"rmse": 0.0, |
|
|
"mape": 0.0, |
|
|
"training_time": 0.0, |
|
|
"prediction_time": 0.0 |
|
|
} |
|
|
|
|
|
return model_comparison |
|
|
|
|
|
async def process_message(self, message: AgentMessage, context: AgentContext) -> AgentResponse: |
|
|
"""Processa mensagens e coordena análise preditiva.""" |
|
|
try: |
|
|
action = message.content.get("action") |
|
|
|
|
|
if action == "predict_time_series": |
|
|
request_data = message.content.get("prediction_request") |
|
|
|
|
|
|
|
|
request = PredictionRequest( |
|
|
request_id=request_data.get("request_id"), |
|
|
prediction_type=PredictionType(request_data.get("prediction_type")), |
|
|
model_type=ModelType(request_data.get("model_type")), |
|
|
data=request_data.get("data", []), |
|
|
target_variable=request_data.get("target_variable"), |
|
|
feature_variables=request_data.get("feature_variables", []), |
|
|
prediction_horizon=request_data.get("prediction_horizon", 12), |
|
|
confidence_level=request_data.get("confidence_level", 0.95), |
|
|
additional_params=request_data.get("additional_params", {}) |
|
|
) |
|
|
|
|
|
result = await self.predict_time_series(request, context) |
|
|
|
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
content={ |
|
|
"prediction_result": { |
|
|
"request_id": result.request_id, |
|
|
"predictions": result.predictions, |
|
|
"model_performance": result.model_performance, |
|
|
"trend_direction": result.trend_analysis.get("direction", "unknown"), |
|
|
"seasonal_strength": result.seasonal_patterns.get("strength", 0.0), |
|
|
"anomaly_alerts": len(result.anomaly_alerts) |
|
|
}, |
|
|
"status": "prediction_completed" |
|
|
}, |
|
|
confidence=min(result.model_performance.get("confidence", 0.5), 1.0), |
|
|
metadata=result.metadata |
|
|
) |
|
|
|
|
|
elif action == "analyze_trends": |
|
|
data = message.content.get("data", []) |
|
|
target_var = message.content.get("target_variable") |
|
|
|
|
|
trend_analysis = await self.analyze_trends(data, target_var, context) |
|
|
|
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
content={"trend_analysis": trend_analysis, "status": "analysis_completed"}, |
|
|
confidence=0.85 |
|
|
) |
|
|
|
|
|
elif action == "compare_models": |
|
|
data = message.content.get("data", []) |
|
|
target_var = message.content.get("target_variable") |
|
|
models = [ModelType(m) for m in message.content.get("models", ["arima", "lstm"])] |
|
|
|
|
|
comparison_result = await self.compare_models(data, target_var, models, context) |
|
|
|
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
content={"model_comparison": comparison_result, "status": "comparison_completed"}, |
|
|
confidence=0.90 |
|
|
) |
|
|
|
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
content={"error": "Unknown predictive action"}, |
|
|
confidence=0.0 |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Error in predictive analysis: {str(e)}") |
|
|
raise AgentExecutionError(f"Predictive analysis failed: {str(e)}") |
|
|
|
|
|
async def _preprocess_time_series( |
|
|
self, |
|
|
data: List[Dict[str, Any]], |
|
|
target_variable: str |
|
|
) -> pd.DataFrame: |
|
|
"""Pré-processa dados de séries temporais.""" |
|
|
df = pd.DataFrame(data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return df |
|
|
|
|
|
async def _train_model( |
|
|
self, |
|
|
data: pd.DataFrame, |
|
|
model_type: ModelType, |
|
|
params: Dict[str, Any] |
|
|
) -> Any: |
|
|
"""Treina o modelo especificado.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return {"model_type": model_type.value, "trained": True} |
|
|
|
|
|
async def _generate_predictions( |
|
|
self, |
|
|
model: Any, |
|
|
horizon: int, |
|
|
confidence_level: float |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Gera previsões usando o modelo treinado.""" |
|
|
predictions = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i in range(horizon): |
|
|
predictions.append({ |
|
|
"period": i + 1, |
|
|
"predicted_value": 100.0 + i * 5, |
|
|
"lower_bound": 90.0 + i * 5, |
|
|
"upper_bound": 110.0 + i * 5, |
|
|
"confidence": confidence_level |
|
|
}) |
|
|
|
|
|
return predictions |
|
|
|
|
|
async def _evaluate_model_performance( |
|
|
self, |
|
|
model: Any, |
|
|
data: pd.DataFrame |
|
|
) -> Dict[str, float]: |
|
|
"""Avalia performance do modelo.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return { |
|
|
"mae": 5.2, |
|
|
"rmse": 7.8, |
|
|
"mape": 4.5, |
|
|
"r2_score": 0.85, |
|
|
"aic": 150.2, |
|
|
"bic": 160.5, |
|
|
"confidence": 0.82 |
|
|
} |
|
|
|
|
|
async def _analyze_trends( |
|
|
self, |
|
|
historical_data: pd.DataFrame, |
|
|
predictions: List[Dict[str, Any]] |
|
|
) -> Dict[str, Any]: |
|
|
"""Analisa tendências nos dados históricos e previsões.""" |
|
|
|
|
|
return { |
|
|
"direction": "upward", |
|
|
"strength": 0.75, |
|
|
"acceleration": 0.05, |
|
|
"change_points": [], |
|
|
"volatility": 0.12 |
|
|
} |
|
|
|
|
|
def _calculate_confidence_intervals( |
|
|
self, |
|
|
predictions: List[Dict[str, Any]], |
|
|
confidence_level: float |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Calcula intervalos de confiança para as previsões.""" |
|
|
intervals = [] |
|
|
|
|
|
for pred in predictions: |
|
|
intervals.append({ |
|
|
"period": pred["period"], |
|
|
"lower_bound": pred.get("lower_bound", pred["predicted_value"] * 0.9), |
|
|
"upper_bound": pred.get("upper_bound", pred["predicted_value"] * 1.1), |
|
|
"confidence_level": confidence_level |
|
|
}) |
|
|
|
|
|
return intervals |
|
|
|
|
|
async def _calculate_feature_importance( |
|
|
self, |
|
|
model: Any, |
|
|
features: List[str] |
|
|
) -> Dict[str, float]: |
|
|
"""Calcula importância das features.""" |
|
|
|
|
|
return {feature: 1.0 / len(features) for feature in features} |
|
|
|
|
|
async def _detect_seasonal_patterns(self, data: pd.DataFrame) -> Dict[str, Any]: |
|
|
"""Detecta padrões sazonais.""" |
|
|
|
|
|
return { |
|
|
"has_seasonality": True, |
|
|
"seasonal_period": 12, |
|
|
"strength": 0.65, |
|
|
"patterns": [] |
|
|
} |
|
|
|
|
|
async def _detect_future_anomalies( |
|
|
self, |
|
|
predictions: List[Dict[str, Any]] |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Detecta possíveis anomalias nas previsões.""" |
|
|
|
|
|
return [] |
|
|
|
|
|
async def _load_pretrained_models(self) -> None: |
|
|
"""Carrega modelos pré-treinados.""" |
|
|
|
|
|
pass |
|
|
|
|
|
async def _setup_preprocessing_pipelines(self) -> None: |
|
|
"""Configura pipelines de preprocessing.""" |
|
|
|
|
|
pass |
|
|
|
|
|
async def _setup_evaluation_metrics(self) -> None: |
|
|
"""Configura métricas de avaliação.""" |
|
|
|
|
|
pass |