cidadao.ai-backend / src /ml /training_pipeline.py
anderson-ufrj
fix(ml): implement lazy initialization for MLTrainingPipeline
44eae1d
"""
ML Training Pipeline for Cidadão.AI
This module provides a comprehensive training pipeline for ML models
used in anomaly detection, fraud detection, and pattern recognition.
"""
import asyncio
import json
import os
from datetime import datetime
from typing import Dict, Any, List, Optional, Tuple, Union
from pathlib import Path
import pickle
import joblib
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, confusion_matrix, classification_report
)
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
from src.core import get_logger, settings
from src.core.exceptions import CidadaoAIError
from src.core.cache import get_redis_client
# from src.models.ml_models import AnomalyDetectorModel # TODO: Create this model
logger = get_logger(__name__)
class MLTrainingPipeline:
"""
Comprehensive ML training pipeline with versioning and tracking.
Features:
- Multiple algorithm support
- Automatic hyperparameter tuning
- Model versioning with MLflow
- Performance tracking
- A/B testing support
"""
def __init__(self, experiment_name: str = "cidadao_ai_models"):
"""Initialize the training pipeline."""
self.experiment_name = experiment_name
self.mlflow_client = None
self.models_dir = Path(getattr(settings, "ML_MODELS_DIR", "./models"))
self.models_dir.mkdir(exist_ok=True)
# Supported algorithms
self.algorithms = {
"isolation_forest": IsolationForest,
"one_class_svm": OneClassSVM,
"random_forest": RandomForestClassifier,
"local_outlier_factor": LocalOutlierFactor
}
# Model registry
self.model_registry = {}
self._initialize_mlflow()
def _initialize_mlflow(self):
"""Initialize MLflow tracking."""
try:
mlflow.set_tracking_uri(getattr(settings, "MLFLOW_TRACKING_URI", "file:./mlruns"))
mlflow.set_experiment(self.experiment_name)
self.mlflow_client = MlflowClient()
logger.info(f"MLflow initialized with experiment: {self.experiment_name}")
except Exception as e:
logger.warning(f"MLflow initialization failed: {e}. Using local tracking.")
async def train_model(
self,
model_type: str,
algorithm: str,
X_train: np.ndarray,
y_train: Optional[np.ndarray] = None,
hyperparameters: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Train a model with the specified algorithm.
Args:
model_type: Type of model (anomaly, fraud, pattern)
algorithm: Algorithm to use
X_train: Training features
y_train: Training labels (optional for unsupervised)
hyperparameters: Model hyperparameters
metadata: Additional metadata
Returns:
Training results with model info
"""
try:
logger.info(f"Starting training for {model_type} with {algorithm}")
# Start MLflow run
with mlflow.start_run(run_name=f"{model_type}_{algorithm}_{datetime.now().isoformat()}"):
# Log parameters
mlflow.log_param("model_type", model_type)
mlflow.log_param("algorithm", algorithm)
mlflow.log_param("n_samples", X_train.shape[0])
mlflow.log_param("n_features", X_train.shape[1])
if hyperparameters:
for key, value in hyperparameters.items():
mlflow.log_param(f"param_{key}", value)
# Create and train model
model = await self._create_model(algorithm, hyperparameters)
# Train based on supervised/unsupervised
if y_train is not None:
# Supervised training
X_tr, X_val, y_tr, y_val = train_test_split(
X_train, y_train, test_size=0.2, random_state=42
)
model.fit(X_tr, y_tr)
# Evaluate
y_pred = model.predict(X_val)
metrics = self._calculate_metrics(y_val, y_pred)
# Cross-validation
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
metrics["cv_score_mean"] = cv_scores.mean()
metrics["cv_score_std"] = cv_scores.std()
else:
# Unsupervised training
model.fit(X_train)
# Evaluate with anomaly scores
if hasattr(model, 'score_samples'):
anomaly_scores = model.score_samples(X_train)
metrics = {
"anomaly_score_mean": float(np.mean(anomaly_scores)),
"anomaly_score_std": float(np.std(anomaly_scores)),
"anomaly_score_min": float(np.min(anomaly_scores)),
"anomaly_score_max": float(np.max(anomaly_scores))
}
else:
metrics = {"training_complete": True}
# Log metrics
for metric_name, metric_value in metrics.items():
mlflow.log_metric(metric_name, metric_value)
# Save model
model_path = await self._save_model(
model, model_type, algorithm, metrics, metadata
)
# Log model to MLflow
mlflow.sklearn.log_model(
model,
f"{model_type}_{algorithm}",
registered_model_name=f"{model_type}_{algorithm}_model"
)
# Create model version
version = await self._create_model_version(
model_type, algorithm, model_path, metrics
)
return {
"success": True,
"model_id": version["model_id"],
"version": version["version"],
"metrics": metrics,
"model_path": model_path,
"run_id": mlflow.active_run().info.run_id
}
except Exception as e:
logger.error(f"Training failed: {str(e)}")
return {
"success": False,
"error": str(e),
"model_id": None
}
async def _create_model(
self,
algorithm: str,
hyperparameters: Optional[Dict[str, Any]] = None
) -> Any:
"""Create a model instance with hyperparameters."""
if algorithm not in self.algorithms:
raise ValueError(f"Unsupported algorithm: {algorithm}")
model_class = self.algorithms[algorithm]
# Default hyperparameters
default_params = {
"isolation_forest": {
"contamination": 0.1,
"random_state": 42,
"n_estimators": 100
},
"one_class_svm": {
"gamma": 0.001,
"nu": 0.05,
"kernel": "rbf"
},
"random_forest": {
"n_estimators": 100,
"random_state": 42,
"max_depth": 10
},
"local_outlier_factor": {
"contamination": 0.1,
"n_neighbors": 20
}
}
# Merge with provided hyperparameters
params = default_params.get(algorithm, {})
if hyperparameters:
params.update(hyperparameters)
return model_class(**params)
def _calculate_metrics(
self,
y_true: np.ndarray,
y_pred: np.ndarray,
y_proba: Optional[np.ndarray] = None
) -> Dict[str, float]:
"""Calculate comprehensive metrics for model evaluation."""
metrics = {
"accuracy": float(accuracy_score(y_true, y_pred)),
"precision": float(precision_score(y_true, y_pred, average='weighted')),
"recall": float(recall_score(y_true, y_pred, average='weighted')),
"f1_score": float(f1_score(y_true, y_pred, average='weighted'))
}
# Add ROC-AUC if probabilities available
if y_proba is not None and len(np.unique(y_true)) == 2:
metrics["roc_auc"] = float(roc_auc_score(y_true, y_proba[:, 1]))
return metrics
async def _save_model(
self,
model: Any,
model_type: str,
algorithm: str,
metrics: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None
) -> str:
"""Save trained model to disk."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_filename = f"{model_type}_{algorithm}_{timestamp}.pkl"
model_path = self.models_dir / model_filename
# Create model package
model_package = {
"model": model,
"model_type": model_type,
"algorithm": algorithm,
"metrics": metrics,
"metadata": metadata or {},
"created_at": datetime.now().isoformat(),
"version": timestamp
}
# Save with joblib for better compression
joblib.dump(model_package, model_path)
logger.info(f"Model saved to: {model_path}")
return str(model_path)
async def _create_model_version(
self,
model_type: str,
algorithm: str,
model_path: str,
metrics: Dict[str, Any]
) -> Dict[str, Any]:
"""Create a versioned model entry in the registry."""
model_id = f"{model_type}_{algorithm}"
# Get or create model entry
if model_id not in self.model_registry:
self.model_registry[model_id] = {
"versions": [],
"current_version": None,
"created_at": datetime.now().isoformat()
}
# Add new version
version = {
"version": len(self.model_registry[model_id]["versions"]) + 1,
"path": model_path,
"metrics": metrics,
"created_at": datetime.now().isoformat(),
"status": "staging" # staging, production, archived
}
self.model_registry[model_id]["versions"].append(version)
# Save registry to Redis
redis_client = await get_redis_client()
await redis_client.set(
f"ml_model_registry:{model_id}",
json.dumps(self.model_registry[model_id]),
ex=86400 * 30 # 30 days
)
return {
"model_id": model_id,
"version": version["version"]
}
async def load_model(
self,
model_id: str,
version: Optional[int] = None
) -> Tuple[Any, Dict[str, Any]]:
"""
Load a model from the registry.
Args:
model_id: Model identifier
version: Specific version (latest if None)
Returns:
Tuple of (model, metadata)
"""
# Try to load from Redis first
redis_client = await get_redis_client()
registry_data = await redis_client.get(f"ml_model_registry:{model_id}")
if registry_data:
registry = json.loads(registry_data)
elif model_id in self.model_registry:
registry = self.model_registry[model_id]
else:
raise ValueError(f"Model {model_id} not found in registry")
# Get version
if version is None:
# Get latest production version or latest version
prod_versions = [
v for v in registry["versions"]
if v.get("status") == "production"
]
if prod_versions:
version_data = max(prod_versions, key=lambda v: v["version"])
else:
version_data = max(registry["versions"], key=lambda v: v["version"])
else:
version_data = next(
(v for v in registry["versions"] if v["version"] == version),
None
)
if not version_data:
raise ValueError(f"Version {version} not found for model {model_id}")
# Load model
model_package = joblib.load(version_data["path"])
return model_package["model"], model_package
async def promote_model(
self,
model_id: str,
version: int,
status: str = "production"
) -> bool:
"""
Promote a model version to production.
Args:
model_id: Model identifier
version: Version to promote
status: New status (production, staging, archived)
"""
try:
# Load registry
redis_client = await get_redis_client()
registry_data = await redis_client.get(f"ml_model_registry:{model_id}")
if registry_data:
registry = json.loads(registry_data)
else:
registry = self.model_registry.get(model_id)
if not registry:
raise ValueError(f"Model {model_id} not found")
# Update version status
for v in registry["versions"]:
if v["version"] == version:
# Archive current production if promoting to production
if status == "production":
for other_v in registry["versions"]:
if other_v.get("status") == "production":
other_v["status"] = "archived"
v["status"] = status
v["promoted_at"] = datetime.now().isoformat()
break
# Save updated registry
self.model_registry[model_id] = registry
await redis_client.set(
f"ml_model_registry:{model_id}",
json.dumps(registry),
ex=86400 * 30
)
logger.info(f"Promoted {model_id} v{version} to {status}")
return True
except Exception as e:
logger.error(f"Failed to promote model: {e}")
return False
async def get_model_metrics(
self,
model_id: str,
version: Optional[int] = None
) -> Dict[str, Any]:
"""Get metrics for a specific model version."""
_, metadata = await self.load_model(model_id, version)
return metadata.get("metrics", {})
async def compare_models(
self,
model_ids: List[Tuple[str, Optional[int]]],
test_data: np.ndarray,
test_labels: Optional[np.ndarray] = None
) -> Dict[str, Any]:
"""
Compare multiple models on the same test data.
Args:
model_ids: List of (model_id, version) tuples
test_data: Test features
test_labels: Test labels (if available)
Returns:
Comparison results
"""
results = {}
for model_id, version in model_ids:
try:
model, metadata = await self.load_model(model_id, version)
# Make predictions
predictions = model.predict(test_data)
result = {
"model_id": model_id,
"version": version or "latest",
"algorithm": metadata.get("algorithm"),
"training_metrics": metadata.get("metrics", {})
}
# Calculate test metrics if labels available
if test_labels is not None:
test_metrics = self._calculate_metrics(test_labels, predictions)
result["test_metrics"] = test_metrics
# Add anomaly scores for unsupervised models
if hasattr(model, 'score_samples'):
scores = model.score_samples(test_data)
result["anomaly_scores"] = {
"mean": float(np.mean(scores)),
"std": float(np.std(scores)),
"percentiles": {
"10": float(np.percentile(scores, 10)),
"50": float(np.percentile(scores, 50)),
"90": float(np.percentile(scores, 90))
}
}
results[f"{model_id}_v{version or 'latest'}"] = result
except Exception as e:
logger.error(f"Failed to evaluate {model_id}: {e}")
results[f"{model_id}_v{version or 'latest'}"] = {
"error": str(e)
}
return results
async def cleanup_old_models(self, days: int = 30) -> int:
"""Remove models older than specified days."""
count = 0
cutoff_date = datetime.now().timestamp() - (days * 86400)
for model_file in self.models_dir.glob("*.pkl"):
if model_file.stat().st_mtime < cutoff_date:
model_file.unlink()
count += 1
logger.info(f"Removed old model: {model_file}")
return count
# Global training pipeline instance (lazy initialization)
_training_pipeline = None
def get_training_pipeline() -> MLTrainingPipeline:
"""Get or create the global training pipeline instance."""
global _training_pipeline
if _training_pipeline is None:
_training_pipeline = MLTrainingPipeline()
return _training_pipeline