cidadao.ai-backend / src /ml /advanced_pipeline.py
anderson-ufrj
refactor(performance): replace all json imports with json_utils
9730fbc
"""
Pipeline de ML Profissional com MLOps
Sistema completo de treinamento, versionamento e deployment de modelos
"""
import asyncio
import logging
import os
import pickle
from src.core import json_utils
import hashlib
from typing import Dict, List, Optional, Any, Union, Tuple, Type
from datetime import datetime, timedelta
from pathlib import Path
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from enum import Enum
import tempfile
import shutil
# ML Libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel, AutoConfig
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
import joblib
# MLOps Tools
try:
import mlflow
import mlflow.pytorch
MLFLOW_AVAILABLE = True
except ImportError:
MLFLOW_AVAILABLE = False
try:
import wandb
WANDB_AVAILABLE = True
except ImportError:
WANDB_AVAILABLE = False
from pydantic import BaseModel, Field
import structlog
logger = structlog.get_logger(__name__)
class ModelType(Enum):
"""Tipos de modelo"""
ANOMALY_DETECTOR = "anomaly_detector"
FINANCIAL_ANALYZER = "financial_analyzer"
LEGAL_COMPLIANCE = "legal_compliance"
ENSEMBLE = "ensemble"
class TrainingStatus(Enum):
"""Status do treinamento"""
PENDING = "pending"
PREPROCESSING = "preprocessing"
TRAINING = "training"
VALIDATING = "validating"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class ModelMetrics:
"""Métricas do modelo"""
accuracy: float = 0.0
precision: float = 0.0
recall: float = 0.0
f1_score: float = 0.0
auc_roc: float = 0.0
loss: float = 0.0
val_accuracy: float = 0.0
val_loss: float = 0.0
inference_time_ms: float = 0.0
model_size_mb: float = 0.0
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class TrainingRun:
"""Execução de treinamento"""
id: str
model_type: ModelType
status: TrainingStatus
config: Dict[str, Any]
metrics: Optional[ModelMetrics] = None
artifacts_path: Optional[str] = None
error_message: Optional[str] = None
created_at: datetime = field(default_factory=datetime.utcnow)
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
experiment_id: Optional[str] = None
class MLPipelineConfig(BaseModel):
"""Configuração do pipeline ML"""
# Model settings
model_name: str = "cidadao-transparency-model"
model_version: str = "1.0.0"
base_model: str = "neuralmind/bert-base-portuguese-cased"
# Training parameters
learning_rate: float = 2e-5
batch_size: int = 16
num_epochs: int = 10
warmup_steps: int = 500
weight_decay: float = 0.01
max_length: int = 512
# Data parameters
train_split: float = 0.7
val_split: float = 0.15
test_split: float = 0.15
min_samples_per_class: int = 100
data_augmentation: bool = True
# Infrastructure
device: str = "cuda" if torch.cuda.is_available() else "cpu"
num_workers: int = 4
pin_memory: bool = True
mixed_precision: bool = True
# MLOps
experiment_tracking: bool = True
model_registry: bool = True
auto_deployment: bool = False
artifacts_dir: str = "./models/artifacts"
models_dir: str = "./models/trained"
# Performance
early_stopping_patience: int = 3
gradient_accumulation_steps: int = 1
max_grad_norm: float = 1.0
# Evaluation
eval_steps: int = 500
save_steps: int = 1000
logging_steps: int = 100
class TransparencyDataset(Dataset):
"""Dataset para dados de transparência"""
def __init__(self, texts: List[str], labels: List[int], tokenizer, max_length: int = 512):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
encoding = self.tokenizer(
text,
truncation=True,
padding='max_length',
max_length=self.max_length,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'label': torch.tensor(label, dtype=torch.long)
}
class TransparencyClassifier(nn.Module):
"""Classificador especializado para transparência"""
def __init__(self, model_name: str, num_labels: int = 3, dropout: float = 0.3):
super().__init__()
self.bert = AutoModel.from_pretrained(model_name)
self.dropout = nn.Dropout(dropout)
# Multi-head classifier
hidden_size = self.bert.config.hidden_size
# Anomaly detection head
self.anomaly_classifier = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, num_labels)
)
# Financial risk head
self.financial_classifier = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, 5) # Risk levels
)
# Legal compliance head
self.legal_classifier = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 4),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 4, 2) # Compliant/Non-compliant
)
# Confidence estimation
self.confidence_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 4),
nn.ReLU(),
nn.Linear(hidden_size // 4, 1),
nn.Sigmoid()
)
def forward(self, input_ids, attention_mask, labels=None, task="anomaly"):
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
pooled_output = outputs.pooler_output
pooled_output = self.dropout(pooled_output)
# Get predictions for all tasks
anomaly_logits = self.anomaly_classifier(pooled_output)
financial_logits = self.financial_classifier(pooled_output)
legal_logits = self.legal_classifier(pooled_output)
confidence = self.confidence_head(pooled_output)
outputs = {
'anomaly_logits': anomaly_logits,
'financial_logits': financial_logits,
'legal_logits': legal_logits,
'confidence': confidence
}
# Calculate loss if labels provided
if labels is not None:
if task == "anomaly":
loss = F.cross_entropy(anomaly_logits, labels)
elif task == "financial":
loss = F.cross_entropy(financial_logits, labels)
elif task == "legal":
loss = F.cross_entropy(legal_logits, labels)
else:
# Multi-task loss (assuming labels is a dict)
loss = 0
if 'anomaly' in labels:
loss += F.cross_entropy(anomaly_logits, labels['anomaly'])
if 'financial' in labels:
loss += F.cross_entropy(financial_logits, labels['financial'])
if 'legal' in labels:
loss += F.cross_entropy(legal_logits, labels['legal'])
outputs['loss'] = loss
return outputs
class MLPipelineManager:
"""Gerenciador avançado de pipeline ML"""
def __init__(self, config: MLPipelineConfig):
self.config = config
self.device = torch.device(config.device)
# Create directories
Path(config.artifacts_dir).mkdir(parents=True, exist_ok=True)
Path(config.models_dir).mkdir(parents=True, exist_ok=True)
# Initialize tracking
self.training_runs: Dict[str, TrainingRun] = {}
self.models: Dict[str, Any] = {}
# MLOps setup
self._setup_experiment_tracking()
def _setup_experiment_tracking(self):
"""Configurar experiment tracking"""
if not self.config.experiment_tracking:
return
if MLFLOW_AVAILABLE:
try:
mlflow.set_experiment(f"cidadao-ai-{self.config.model_name}")
logger.info("✅ MLflow experiment tracking configurado")
except Exception as e:
logger.warning(f"⚠️ MLflow setup falhou: {e}")
if WANDB_AVAILABLE:
try:
# wandb.init would be called in training function
logger.info("✅ W&B tracking disponível")
except Exception as e:
logger.warning(f"⚠️ W&B setup falhou: {e}")
async def prepare_data(self,
contracts_data: List[Dict[str, Any]],
model_type: ModelType = ModelType.ANOMALY_DETECTOR) -> Tuple[DataLoader, DataLoader, DataLoader]:
"""Preparar dados para treinamento"""
logger.info(f"🔄 Preparando dados para {model_type.value}...")
# Extract text and generate labels
texts = []
labels = []
for contract in contracts_data:
# Create descriptive text
text = self._create_contract_text(contract)
texts.append(text)
# Generate label based on model type
if model_type == ModelType.ANOMALY_DETECTOR:
label = self._generate_anomaly_label(contract)
elif model_type == ModelType.FINANCIAL_ANALYZER:
label = self._generate_financial_label(contract)
elif model_type == ModelType.LEGAL_COMPLIANCE:
label = self._generate_legal_label(contract)
else:
label = 0
labels.append(label)
# Split data
train_texts, temp_texts, train_labels, temp_labels = train_test_split(
texts, labels,
test_size=(1 - self.config.train_split),
random_state=42,
stratify=labels
)
val_size = self.config.val_split / (self.config.val_split + self.config.test_split)
val_texts, test_texts, val_labels, test_labels = train_test_split(
temp_texts, temp_labels,
test_size=(1 - val_size),
random_state=42,
stratify=temp_labels
)
# Create tokenizer
tokenizer = AutoTokenizer.from_pretrained(self.config.base_model)
# Create datasets
train_dataset = TransparencyDataset(train_texts, train_labels, tokenizer, self.config.max_length)
val_dataset = TransparencyDataset(val_texts, val_labels, tokenizer, self.config.max_length)
test_dataset = TransparencyDataset(test_texts, test_labels, tokenizer, self.config.max_length)
# Create data loaders
train_loader = DataLoader(
train_dataset,
batch_size=self.config.batch_size,
shuffle=True,
num_workers=self.config.num_workers,
pin_memory=self.config.pin_memory
)
val_loader = DataLoader(
val_dataset,
batch_size=self.config.batch_size,
shuffle=False,
num_workers=self.config.num_workers,
pin_memory=self.config.pin_memory
)
test_loader = DataLoader(
test_dataset,
batch_size=self.config.batch_size,
shuffle=False,
num_workers=self.config.num_workers,
pin_memory=self.config.pin_memory
)
logger.info(f"✅ Dados preparados: {len(train_dataset)} treino, {len(val_dataset)} validação, {len(test_dataset)} teste")
return train_loader, val_loader, test_loader
def _create_contract_text(self, contract: Dict[str, Any]) -> str:
"""Criar texto descritivo do contrato"""
parts = []
if 'objeto' in contract:
parts.append(f"Objeto: {contract['objeto']}")
if 'valor' in contract or 'valorInicial' in contract:
valor = contract.get('valor', contract.get('valorInicial', 0))
parts.append(f"Valor: R$ {valor:,.2f}")
if 'nomeRazaoSocialFornecedor' in contract:
parts.append(f"Fornecedor: {contract['nomeRazaoSocialFornecedor']}")
if 'modalidadeLicitacao' in contract:
parts.append(f"Modalidade: {contract['modalidadeLicitacao']}")
if 'situacao' in contract:
parts.append(f"Situação: {contract['situacao']}")
return ". ".join(parts)
def _generate_anomaly_label(self, contract: Dict[str, Any]) -> int:
"""Gerar label de anomalia (0=Normal, 1=Suspeito, 2=Anômalo)"""
valor = contract.get('valor', contract.get('valorInicial', 0))
modalidade = contract.get('modalidadeLicitacao', '').lower()
# Simple rule-based labeling for training data
score = 0
# High value contracts
if valor > 50_000_000:
score += 1
# Emergency or direct awards
if any(word in modalidade for word in ['emergencial', 'dispensa', 'inexigibilidade']):
score += 1
# Missing information
if not contract.get('objeto') or len(contract.get('objeto', '')) < 10:
score += 1
return min(score, 2) # Cap at 2 (Anômalo)
def _generate_financial_label(self, contract: Dict[str, Any]) -> int:
"""Gerar label de risco financeiro (0=Muito Baixo, 1=Baixo, 2=Médio, 3=Alto, 4=Muito Alto)"""
valor = contract.get('valor', contract.get('valorInicial', 0))
if valor < 100_000:
return 0 # Muito Baixo
elif valor < 1_000_000:
return 1 # Baixo
elif valor < 10_000_000:
return 2 # Médio
elif valor < 50_000_000:
return 3 # Alto
else:
return 4 # Muito Alto
def _generate_legal_label(self, contract: Dict[str, Any]) -> int:
"""Gerar label de conformidade legal (0=Não Conforme, 1=Conforme)"""
modalidade = contract.get('modalidadeLicitacao', '').lower()
# Simple compliance check
if 'pregao' in modalidade or 'concorrencia' in modalidade:
return 1 # Conforme
else:
return 0 # Potentially non-compliant
async def train_model(self,
train_loader: DataLoader,
val_loader: DataLoader,
model_type: ModelType = ModelType.ANOMALY_DETECTOR) -> str:
"""Treinar modelo"""
run_id = f"{model_type.value}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
training_run = TrainingRun(
id=run_id,
model_type=model_type,
status=TrainingStatus.TRAINING,
config=self.config.dict()
)
self.training_runs[run_id] = training_run
try:
logger.info(f"🚀 Iniciando treinamento {run_id}...")
# Initialize tracking
if WANDB_AVAILABLE and self.config.experiment_tracking:
wandb.init(
project="cidadao-ai",
name=run_id,
config=self.config.dict()
)
if MLFLOW_AVAILABLE and self.config.experiment_tracking:
mlflow.start_run(run_name=run_id)
# Create model
num_labels = 3 if model_type == ModelType.ANOMALY_DETECTOR else (5 if model_type == ModelType.FINANCIAL_ANALYZER else 2)
model = TransparencyClassifier(self.config.base_model, num_labels)
model.to(self.device)
# Setup optimizer
optimizer = optim.AdamW(
model.parameters(),
lr=self.config.learning_rate,
weight_decay=self.config.weight_decay
)
# Setup scheduler
total_steps = len(train_loader) * self.config.num_epochs
scheduler = optim.lr_scheduler.LinearLR(
optimizer,
start_factor=1.0,
end_factor=0.1,
total_iters=total_steps
)
# Mixed precision training
scaler = torch.cuda.amp.GradScaler() if self.config.mixed_precision else None
# Training variables
best_val_acc = 0.0
patience_counter = 0
global_step = 0
training_run.started_at = datetime.utcnow()
# Training loop
for epoch in range(self.config.num_epochs):
logger.info(f"📚 Época {epoch + 1}/{self.config.num_epochs}")
# Training phase
model.train()
train_loss = 0.0
train_correct = 0
train_total = 0
for batch_idx, batch in enumerate(train_loader):
input_ids = batch['input_ids'].to(self.device)
attention_mask = batch['attention_mask'].to(self.device)
labels = batch['label'].to(self.device)
optimizer.zero_grad()
# Forward pass
if self.config.mixed_precision and scaler:
with torch.cuda.amp.autocast():
outputs = model(input_ids, attention_mask, labels, task=model_type.value.split('_')[0])
loss = outputs['loss']
else:
outputs = model(input_ids, attention_mask, labels, task=model_type.value.split('_')[0])
loss = outputs['loss']
# Backward pass
if self.config.mixed_precision and scaler:
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), self.config.max_grad_norm)
scaler.step(optimizer)
scaler.update()
else:
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), self.config.max_grad_norm)
optimizer.step()
scheduler.step()
# Statistics
train_loss += loss.item()
# Get predictions for accuracy
task_key = f"{model_type.value.split('_')[0]}_logits"
if task_key in outputs:
_, predicted = torch.max(outputs[task_key], 1)
train_total += labels.size(0)
train_correct += (predicted == labels).sum().item()
global_step += 1
# Logging
if global_step % self.config.logging_steps == 0:
current_lr = scheduler.get_last_lr()[0]
logger.info(f"Step {global_step}, Loss: {loss.item():.4f}, LR: {current_lr:.2e}")
if WANDB_AVAILABLE and self.config.experiment_tracking:
wandb.log({
"train_loss": loss.item(),
"learning_rate": current_lr,
"step": global_step
})
# Validation phase
if epoch % 1 == 0: # Validate every epoch
val_metrics = await self._validate_model(model, val_loader, model_type)
logger.info(f"📊 Validação - Acc: {val_metrics.val_accuracy:.4f}, Loss: {val_metrics.val_loss:.4f}")
# Early stopping
if val_metrics.val_accuracy > best_val_acc:
best_val_acc = val_metrics.val_accuracy
patience_counter = 0
# Save best model
model_path = Path(self.config.models_dir) / f"{run_id}_best.pt"
torch.save({
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'config': self.config.dict(),
'metrics': val_metrics.__dict__,
'epoch': epoch
}, model_path)
else:
patience_counter += 1
if patience_counter >= self.config.early_stopping_patience:
logger.info(f"⏹️ Early stopping após {epoch + 1} épocas")
break
# Log to tracking systems
if WANDB_AVAILABLE and self.config.experiment_tracking:
wandb.log({
"val_accuracy": val_metrics.val_accuracy,
"val_loss": val_metrics.val_loss,
"val_f1": val_metrics.f1_score,
"epoch": epoch
})
if MLFLOW_AVAILABLE and self.config.experiment_tracking:
mlflow.log_metrics({
"val_accuracy": val_metrics.val_accuracy,
"val_loss": val_metrics.val_loss,
"val_f1": val_metrics.f1_score
}, step=epoch)
# Final validation
final_metrics = await self._validate_model(model, val_loader, model_type)
training_run.metrics = final_metrics
training_run.status = TrainingStatus.COMPLETED
training_run.completed_at = datetime.utcnow()
# Save final model
final_model_path = Path(self.config.models_dir) / f"{run_id}_final.pt"
torch.save({
'model_state_dict': model.state_dict(),
'config': self.config.dict(),
'metrics': final_metrics.__dict__,
'run_id': run_id
}, final_model_path)
training_run.artifacts_path = str(final_model_path)
# Register model
if self.config.model_registry:
await self._register_model(run_id, final_model_path, final_metrics)
logger.info(f"✅ Treinamento {run_id} concluído com sucesso!")
return run_id
except Exception as e:
training_run.status = TrainingStatus.FAILED
training_run.error_message = str(e)
training_run.completed_at = datetime.utcnow()
logger.error(f"❌ Treinamento {run_id} falhou: {e}")
raise
finally:
# Cleanup tracking
if WANDB_AVAILABLE and self.config.experiment_tracking:
wandb.finish()
if MLFLOW_AVAILABLE and self.config.experiment_tracking:
mlflow.end_run()
async def _validate_model(self, model, val_loader: DataLoader, model_type: ModelType) -> ModelMetrics:
"""Validar modelo"""
model.eval()
val_loss = 0.0
all_predictions = []
all_labels = []
all_confidences = []
with torch.no_grad():
for batch in val_loader:
input_ids = batch['input_ids'].to(self.device)
attention_mask = batch['attention_mask'].to(self.device)
labels = batch['label'].to(self.device)
outputs = model(input_ids, attention_mask, labels, task=model_type.value.split('_')[0])
val_loss += outputs['loss'].item()
# Get predictions
task_key = f"{model_type.value.split('_')[0]}_logits"
if task_key in outputs:
_, predicted = torch.max(outputs[task_key], 1)
all_predictions.extend(predicted.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
all_confidences.extend(outputs['confidence'].cpu().numpy())
# Calculate metrics
val_loss /= len(val_loader)
accuracy = accuracy_score(all_labels, all_predictions)
precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_predictions, average='weighted')
# AUC for binary classification
auc = 0.0
if len(set(all_labels)) == 2:
try:
auc = roc_auc_score(all_labels, all_confidences)
except:
pass
return ModelMetrics(
accuracy=accuracy,
precision=precision,
recall=recall,
f1_score=f1,
auc_roc=auc,
val_accuracy=accuracy,
val_loss=val_loss,
inference_time_ms=0.0 # TODO: measure inference time
)
async def _register_model(self, run_id: str, model_path: Path, metrics: ModelMetrics):
"""Registrar modelo no registry"""
try:
if MLFLOW_AVAILABLE:
# Log model to MLflow
mlflow.pytorch.log_model(
pytorch_model=model_path,
artifact_path="model",
registered_model_name=f"{self.config.model_name}-{run_id}"
)
logger.info(f"✅ Modelo {run_id} registrado no MLflow")
except Exception as e:
logger.error(f"❌ Erro ao registrar modelo: {e}")
async def load_model(self, run_id: str) -> Optional[TransparencyClassifier]:
"""Carregar modelo treinado"""
model_path = Path(self.config.models_dir) / f"{run_id}_best.pt"
if not model_path.exists():
model_path = Path(self.config.models_dir) / f"{run_id}_final.pt"
if not model_path.exists():
logger.error(f"❌ Modelo {run_id} não encontrado")
return None
try:
checkpoint = torch.load(model_path, map_location=self.device)
# Recreate model
model = TransparencyClassifier(self.config.base_model)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(self.device)
model.eval()
self.models[run_id] = model
logger.info(f"✅ Modelo {run_id} carregado")
return model
except Exception as e:
logger.error(f"❌ Erro ao carregar modelo {run_id}: {e}")
return None
async def predict(self, model: TransparencyClassifier, text: str, model_type: ModelType) -> Dict[str, Any]:
"""Fazer predição"""
tokenizer = AutoTokenizer.from_pretrained(self.config.base_model)
# Tokenize
encoding = tokenizer(
text,
truncation=True,
padding='max_length',
max_length=self.config.max_length,
return_tensors='pt'
)
input_ids = encoding['input_ids'].to(self.device)
attention_mask = encoding['attention_mask'].to(self.device)
# Predict
with torch.no_grad():
outputs = model(input_ids, attention_mask)
# Process outputs
results = {}
# Anomaly detection
if 'anomaly_logits' in outputs:
anomaly_probs = F.softmax(outputs['anomaly_logits'], dim=-1)
anomaly_pred = torch.argmax(anomaly_probs, dim=-1)
labels = ["Normal", "Suspeito", "Anômalo"]
results["anomaly"] = {
"label": labels[anomaly_pred.item()],
"confidence": anomaly_probs.max().item(),
"probabilities": anomaly_probs.squeeze().tolist()
}
# Financial risk
if 'financial_logits' in outputs:
financial_probs = F.softmax(outputs['financial_logits'], dim=-1)
financial_pred = torch.argmax(financial_probs, dim=-1)
labels = ["Muito Baixo", "Baixo", "Médio", "Alto", "Muito Alto"]
results["financial"] = {
"label": labels[financial_pred.item()],
"confidence": financial_probs.max().item(),
"probabilities": financial_probs.squeeze().tolist()
}
# Legal compliance
if 'legal_logits' in outputs:
legal_probs = F.softmax(outputs['legal_logits'], dim=-1)
legal_pred = torch.argmax(legal_probs, dim=-1)
labels = ["Não Conforme", "Conforme"]
results["legal"] = {
"label": labels[legal_pred.item()],
"confidence": legal_probs.max().item(),
"probabilities": legal_probs.squeeze().tolist()
}
# Overall confidence
if 'confidence' in outputs:
results["overall_confidence"] = outputs['confidence'].item()
return results
def get_training_status(self, run_id: str) -> Optional[TrainingRun]:
"""Obter status do treinamento"""
return self.training_runs.get(run_id)
def list_models(self) -> List[Dict[str, Any]]:
"""Listar modelos disponíveis"""
models = []
models_dir = Path(self.config.models_dir)
for model_file in models_dir.glob("*.pt"):
try:
checkpoint = torch.load(model_file, map_location='cpu')
models.append({
"filename": model_file.name,
"run_id": checkpoint.get('run_id', 'unknown'),
"metrics": checkpoint.get('metrics', {}),
"created": datetime.fromtimestamp(model_file.stat().st_mtime)
})
except:
continue
return models
# Singleton instance
_ml_pipeline_manager: Optional[MLPipelineManager] = None
async def get_ml_pipeline_manager() -> MLPipelineManager:
"""Obter instância singleton do ML pipeline manager"""
global _ml_pipeline_manager
if _ml_pipeline_manager is None:
config = MLPipelineConfig()
_ml_pipeline_manager = MLPipelineManager(config)
return _ml_pipeline_manager
if __name__ == "__main__":
# Teste do pipeline
import asyncio
async def test_ml_pipeline():
"""Teste do pipeline ML"""
print("🧪 Testando pipeline ML...")
# Get pipeline manager
pipeline = await get_ml_pipeline_manager()
# Mock data for testing
mock_contracts = [
{
"objeto": "Aquisição de equipamentos médicos",
"valor": 5000000,
"nomeRazaoSocialFornecedor": "Empresa XYZ",
"modalidadeLicitacao": "Pregão Eletrônico"
},
{
"objeto": "Obra de construção hospitalar",
"valor": 100000000,
"nomeRazaoSocialFornecedor": "Construtora ABC",
"modalidadeLicitacao": "Dispensa de Licitação"
}
] * 50 # Duplicate for testing
try:
# Prepare data
train_loader, val_loader, test_loader = await pipeline.prepare_data(
mock_contracts,
ModelType.ANOMALY_DETECTOR
)
print(f"✅ Dados preparados: {len(train_loader)} batches de treino")
# Train model (quick test with 1 epoch)
pipeline.config.num_epochs = 1
run_id = await pipeline.train_model(
train_loader,
val_loader,
ModelType.ANOMALY_DETECTOR
)
print(f"✅ Modelo treinado: {run_id}")
# Load and test model
model = await pipeline.load_model(run_id)
if model:
result = await pipeline.predict(
model,
"Contrato emergencial de R$ 50 milhões sem licitação",
ModelType.ANOMALY_DETECTOR
)
print(f"✅ Predição: {result}")
# List models
models = pipeline.list_models()
print(f"✅ Modelos disponíveis: {len(models)}")
except Exception as e:
print(f"❌ Erro no teste: {e}")
print("✅ Teste concluído!")
asyncio.run(test_ml_pipeline())