|
|
""" |
|
|
ML Training Pipeline for Cidadão.AI |
|
|
|
|
|
This module provides a comprehensive training pipeline for ML models |
|
|
used in anomaly detection, fraud detection, and pattern recognition. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import os |
|
|
from datetime import datetime |
|
|
from typing import Dict, Any, List, Optional, Tuple, Union |
|
|
from pathlib import Path |
|
|
import pickle |
|
|
import joblib |
|
|
import numpy as np |
|
|
from sklearn.model_selection import train_test_split, cross_val_score |
|
|
from sklearn.metrics import ( |
|
|
accuracy_score, precision_score, recall_score, f1_score, |
|
|
roc_auc_score, confusion_matrix, classification_report |
|
|
) |
|
|
from sklearn.preprocessing import StandardScaler, LabelEncoder |
|
|
from sklearn.ensemble import IsolationForest, RandomForestClassifier |
|
|
from sklearn.svm import OneClassSVM |
|
|
from sklearn.neighbors import LocalOutlierFactor |
|
|
import mlflow |
|
|
import mlflow.sklearn |
|
|
from mlflow.tracking import MlflowClient |
|
|
|
|
|
from src.core import get_logger, settings |
|
|
from src.core.exceptions import CidadaoAIError |
|
|
from src.core.cache import get_redis_client |
|
|
|
|
|
|
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class MLTrainingPipeline: |
|
|
""" |
|
|
Comprehensive ML training pipeline with versioning and tracking. |
|
|
|
|
|
Features: |
|
|
- Multiple algorithm support |
|
|
- Automatic hyperparameter tuning |
|
|
- Model versioning with MLflow |
|
|
- Performance tracking |
|
|
- A/B testing support |
|
|
""" |
|
|
|
|
|
def __init__(self, experiment_name: str = "cidadao_ai_models"): |
|
|
"""Initialize the training pipeline.""" |
|
|
self.experiment_name = experiment_name |
|
|
self.mlflow_client = None |
|
|
self.models_dir = Path(getattr(settings, "ML_MODELS_DIR", "./models")) |
|
|
self.models_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
self.algorithms = { |
|
|
"isolation_forest": IsolationForest, |
|
|
"one_class_svm": OneClassSVM, |
|
|
"random_forest": RandomForestClassifier, |
|
|
"local_outlier_factor": LocalOutlierFactor |
|
|
} |
|
|
|
|
|
|
|
|
self.model_registry = {} |
|
|
self._initialize_mlflow() |
|
|
|
|
|
def _initialize_mlflow(self): |
|
|
"""Initialize MLflow tracking.""" |
|
|
try: |
|
|
mlflow.set_tracking_uri(getattr(settings, "MLFLOW_TRACKING_URI", "file:./mlruns")) |
|
|
mlflow.set_experiment(self.experiment_name) |
|
|
self.mlflow_client = MlflowClient() |
|
|
logger.info(f"MLflow initialized with experiment: {self.experiment_name}") |
|
|
except Exception as e: |
|
|
logger.warning(f"MLflow initialization failed: {e}. Using local tracking.") |
|
|
|
|
|
async def train_model( |
|
|
self, |
|
|
model_type: str, |
|
|
algorithm: str, |
|
|
X_train: np.ndarray, |
|
|
y_train: Optional[np.ndarray] = None, |
|
|
hyperparameters: Optional[Dict[str, Any]] = None, |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Train a model with the specified algorithm. |
|
|
|
|
|
Args: |
|
|
model_type: Type of model (anomaly, fraud, pattern) |
|
|
algorithm: Algorithm to use |
|
|
X_train: Training features |
|
|
y_train: Training labels (optional for unsupervised) |
|
|
hyperparameters: Model hyperparameters |
|
|
metadata: Additional metadata |
|
|
|
|
|
Returns: |
|
|
Training results with model info |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Starting training for {model_type} with {algorithm}") |
|
|
|
|
|
|
|
|
with mlflow.start_run(run_name=f"{model_type}_{algorithm}_{datetime.now().isoformat()}"): |
|
|
|
|
|
mlflow.log_param("model_type", model_type) |
|
|
mlflow.log_param("algorithm", algorithm) |
|
|
mlflow.log_param("n_samples", X_train.shape[0]) |
|
|
mlflow.log_param("n_features", X_train.shape[1]) |
|
|
|
|
|
if hyperparameters: |
|
|
for key, value in hyperparameters.items(): |
|
|
mlflow.log_param(f"param_{key}", value) |
|
|
|
|
|
|
|
|
model = await self._create_model(algorithm, hyperparameters) |
|
|
|
|
|
|
|
|
if y_train is not None: |
|
|
|
|
|
X_tr, X_val, y_tr, y_val = train_test_split( |
|
|
X_train, y_train, test_size=0.2, random_state=42 |
|
|
) |
|
|
|
|
|
model.fit(X_tr, y_tr) |
|
|
|
|
|
|
|
|
y_pred = model.predict(X_val) |
|
|
metrics = self._calculate_metrics(y_val, y_pred) |
|
|
|
|
|
|
|
|
cv_scores = cross_val_score(model, X_train, y_train, cv=5) |
|
|
metrics["cv_score_mean"] = cv_scores.mean() |
|
|
metrics["cv_score_std"] = cv_scores.std() |
|
|
|
|
|
else: |
|
|
|
|
|
model.fit(X_train) |
|
|
|
|
|
|
|
|
if hasattr(model, 'score_samples'): |
|
|
anomaly_scores = model.score_samples(X_train) |
|
|
metrics = { |
|
|
"anomaly_score_mean": float(np.mean(anomaly_scores)), |
|
|
"anomaly_score_std": float(np.std(anomaly_scores)), |
|
|
"anomaly_score_min": float(np.min(anomaly_scores)), |
|
|
"anomaly_score_max": float(np.max(anomaly_scores)) |
|
|
} |
|
|
else: |
|
|
metrics = {"training_complete": True} |
|
|
|
|
|
|
|
|
for metric_name, metric_value in metrics.items(): |
|
|
mlflow.log_metric(metric_name, metric_value) |
|
|
|
|
|
|
|
|
model_path = await self._save_model( |
|
|
model, model_type, algorithm, metrics, metadata |
|
|
) |
|
|
|
|
|
|
|
|
mlflow.sklearn.log_model( |
|
|
model, |
|
|
f"{model_type}_{algorithm}", |
|
|
registered_model_name=f"{model_type}_{algorithm}_model" |
|
|
) |
|
|
|
|
|
|
|
|
version = await self._create_model_version( |
|
|
model_type, algorithm, model_path, metrics |
|
|
) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"model_id": version["model_id"], |
|
|
"version": version["version"], |
|
|
"metrics": metrics, |
|
|
"model_path": model_path, |
|
|
"run_id": mlflow.active_run().info.run_id |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Training failed: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"model_id": None |
|
|
} |
|
|
|
|
|
async def _create_model( |
|
|
self, |
|
|
algorithm: str, |
|
|
hyperparameters: Optional[Dict[str, Any]] = None |
|
|
) -> Any: |
|
|
"""Create a model instance with hyperparameters.""" |
|
|
if algorithm not in self.algorithms: |
|
|
raise ValueError(f"Unsupported algorithm: {algorithm}") |
|
|
|
|
|
model_class = self.algorithms[algorithm] |
|
|
|
|
|
|
|
|
default_params = { |
|
|
"isolation_forest": { |
|
|
"contamination": 0.1, |
|
|
"random_state": 42, |
|
|
"n_estimators": 100 |
|
|
}, |
|
|
"one_class_svm": { |
|
|
"gamma": 0.001, |
|
|
"nu": 0.05, |
|
|
"kernel": "rbf" |
|
|
}, |
|
|
"random_forest": { |
|
|
"n_estimators": 100, |
|
|
"random_state": 42, |
|
|
"max_depth": 10 |
|
|
}, |
|
|
"local_outlier_factor": { |
|
|
"contamination": 0.1, |
|
|
"n_neighbors": 20 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
params = default_params.get(algorithm, {}) |
|
|
if hyperparameters: |
|
|
params.update(hyperparameters) |
|
|
|
|
|
return model_class(**params) |
|
|
|
|
|
def _calculate_metrics( |
|
|
self, |
|
|
y_true: np.ndarray, |
|
|
y_pred: np.ndarray, |
|
|
y_proba: Optional[np.ndarray] = None |
|
|
) -> Dict[str, float]: |
|
|
"""Calculate comprehensive metrics for model evaluation.""" |
|
|
metrics = { |
|
|
"accuracy": float(accuracy_score(y_true, y_pred)), |
|
|
"precision": float(precision_score(y_true, y_pred, average='weighted')), |
|
|
"recall": float(recall_score(y_true, y_pred, average='weighted')), |
|
|
"f1_score": float(f1_score(y_true, y_pred, average='weighted')) |
|
|
} |
|
|
|
|
|
|
|
|
if y_proba is not None and len(np.unique(y_true)) == 2: |
|
|
metrics["roc_auc"] = float(roc_auc_score(y_true, y_proba[:, 1])) |
|
|
|
|
|
return metrics |
|
|
|
|
|
async def _save_model( |
|
|
self, |
|
|
model: Any, |
|
|
model_type: str, |
|
|
algorithm: str, |
|
|
metrics: Dict[str, Any], |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
) -> str: |
|
|
"""Save trained model to disk.""" |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
model_filename = f"{model_type}_{algorithm}_{timestamp}.pkl" |
|
|
model_path = self.models_dir / model_filename |
|
|
|
|
|
|
|
|
model_package = { |
|
|
"model": model, |
|
|
"model_type": model_type, |
|
|
"algorithm": algorithm, |
|
|
"metrics": metrics, |
|
|
"metadata": metadata or {}, |
|
|
"created_at": datetime.now().isoformat(), |
|
|
"version": timestamp |
|
|
} |
|
|
|
|
|
|
|
|
joblib.dump(model_package, model_path) |
|
|
logger.info(f"Model saved to: {model_path}") |
|
|
|
|
|
return str(model_path) |
|
|
|
|
|
async def _create_model_version( |
|
|
self, |
|
|
model_type: str, |
|
|
algorithm: str, |
|
|
model_path: str, |
|
|
metrics: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Create a versioned model entry in the registry.""" |
|
|
model_id = f"{model_type}_{algorithm}" |
|
|
|
|
|
|
|
|
if model_id not in self.model_registry: |
|
|
self.model_registry[model_id] = { |
|
|
"versions": [], |
|
|
"current_version": None, |
|
|
"created_at": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
version = { |
|
|
"version": len(self.model_registry[model_id]["versions"]) + 1, |
|
|
"path": model_path, |
|
|
"metrics": metrics, |
|
|
"created_at": datetime.now().isoformat(), |
|
|
"status": "staging" |
|
|
} |
|
|
|
|
|
self.model_registry[model_id]["versions"].append(version) |
|
|
|
|
|
|
|
|
redis_client = await get_redis_client() |
|
|
await redis_client.set( |
|
|
f"ml_model_registry:{model_id}", |
|
|
json.dumps(self.model_registry[model_id]), |
|
|
ex=86400 * 30 |
|
|
) |
|
|
|
|
|
return { |
|
|
"model_id": model_id, |
|
|
"version": version["version"] |
|
|
} |
|
|
|
|
|
async def load_model( |
|
|
self, |
|
|
model_id: str, |
|
|
version: Optional[int] = None |
|
|
) -> Tuple[Any, Dict[str, Any]]: |
|
|
""" |
|
|
Load a model from the registry. |
|
|
|
|
|
Args: |
|
|
model_id: Model identifier |
|
|
version: Specific version (latest if None) |
|
|
|
|
|
Returns: |
|
|
Tuple of (model, metadata) |
|
|
""" |
|
|
|
|
|
redis_client = await get_redis_client() |
|
|
registry_data = await redis_client.get(f"ml_model_registry:{model_id}") |
|
|
|
|
|
if registry_data: |
|
|
registry = json.loads(registry_data) |
|
|
elif model_id in self.model_registry: |
|
|
registry = self.model_registry[model_id] |
|
|
else: |
|
|
raise ValueError(f"Model {model_id} not found in registry") |
|
|
|
|
|
|
|
|
if version is None: |
|
|
|
|
|
prod_versions = [ |
|
|
v for v in registry["versions"] |
|
|
if v.get("status") == "production" |
|
|
] |
|
|
|
|
|
if prod_versions: |
|
|
version_data = max(prod_versions, key=lambda v: v["version"]) |
|
|
else: |
|
|
version_data = max(registry["versions"], key=lambda v: v["version"]) |
|
|
else: |
|
|
version_data = next( |
|
|
(v for v in registry["versions"] if v["version"] == version), |
|
|
None |
|
|
) |
|
|
|
|
|
if not version_data: |
|
|
raise ValueError(f"Version {version} not found for model {model_id}") |
|
|
|
|
|
|
|
|
model_package = joblib.load(version_data["path"]) |
|
|
|
|
|
return model_package["model"], model_package |
|
|
|
|
|
async def promote_model( |
|
|
self, |
|
|
model_id: str, |
|
|
version: int, |
|
|
status: str = "production" |
|
|
) -> bool: |
|
|
""" |
|
|
Promote a model version to production. |
|
|
|
|
|
Args: |
|
|
model_id: Model identifier |
|
|
version: Version to promote |
|
|
status: New status (production, staging, archived) |
|
|
""" |
|
|
try: |
|
|
|
|
|
redis_client = await get_redis_client() |
|
|
registry_data = await redis_client.get(f"ml_model_registry:{model_id}") |
|
|
|
|
|
if registry_data: |
|
|
registry = json.loads(registry_data) |
|
|
else: |
|
|
registry = self.model_registry.get(model_id) |
|
|
|
|
|
if not registry: |
|
|
raise ValueError(f"Model {model_id} not found") |
|
|
|
|
|
|
|
|
for v in registry["versions"]: |
|
|
if v["version"] == version: |
|
|
|
|
|
if status == "production": |
|
|
for other_v in registry["versions"]: |
|
|
if other_v.get("status") == "production": |
|
|
other_v["status"] = "archived" |
|
|
|
|
|
v["status"] = status |
|
|
v["promoted_at"] = datetime.now().isoformat() |
|
|
break |
|
|
|
|
|
|
|
|
self.model_registry[model_id] = registry |
|
|
await redis_client.set( |
|
|
f"ml_model_registry:{model_id}", |
|
|
json.dumps(registry), |
|
|
ex=86400 * 30 |
|
|
) |
|
|
|
|
|
logger.info(f"Promoted {model_id} v{version} to {status}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to promote model: {e}") |
|
|
return False |
|
|
|
|
|
async def get_model_metrics( |
|
|
self, |
|
|
model_id: str, |
|
|
version: Optional[int] = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Get metrics for a specific model version.""" |
|
|
_, metadata = await self.load_model(model_id, version) |
|
|
return metadata.get("metrics", {}) |
|
|
|
|
|
async def compare_models( |
|
|
self, |
|
|
model_ids: List[Tuple[str, Optional[int]]], |
|
|
test_data: np.ndarray, |
|
|
test_labels: Optional[np.ndarray] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Compare multiple models on the same test data. |
|
|
|
|
|
Args: |
|
|
model_ids: List of (model_id, version) tuples |
|
|
test_data: Test features |
|
|
test_labels: Test labels (if available) |
|
|
|
|
|
Returns: |
|
|
Comparison results |
|
|
""" |
|
|
results = {} |
|
|
|
|
|
for model_id, version in model_ids: |
|
|
try: |
|
|
model, metadata = await self.load_model(model_id, version) |
|
|
|
|
|
|
|
|
predictions = model.predict(test_data) |
|
|
|
|
|
result = { |
|
|
"model_id": model_id, |
|
|
"version": version or "latest", |
|
|
"algorithm": metadata.get("algorithm"), |
|
|
"training_metrics": metadata.get("metrics", {}) |
|
|
} |
|
|
|
|
|
|
|
|
if test_labels is not None: |
|
|
test_metrics = self._calculate_metrics(test_labels, predictions) |
|
|
result["test_metrics"] = test_metrics |
|
|
|
|
|
|
|
|
if hasattr(model, 'score_samples'): |
|
|
scores = model.score_samples(test_data) |
|
|
result["anomaly_scores"] = { |
|
|
"mean": float(np.mean(scores)), |
|
|
"std": float(np.std(scores)), |
|
|
"percentiles": { |
|
|
"10": float(np.percentile(scores, 10)), |
|
|
"50": float(np.percentile(scores, 50)), |
|
|
"90": float(np.percentile(scores, 90)) |
|
|
} |
|
|
} |
|
|
|
|
|
results[f"{model_id}_v{version or 'latest'}"] = result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to evaluate {model_id}: {e}") |
|
|
results[f"{model_id}_v{version or 'latest'}"] = { |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
return results |
|
|
|
|
|
async def cleanup_old_models(self, days: int = 30) -> int: |
|
|
"""Remove models older than specified days.""" |
|
|
count = 0 |
|
|
cutoff_date = datetime.now().timestamp() - (days * 86400) |
|
|
|
|
|
for model_file in self.models_dir.glob("*.pkl"): |
|
|
if model_file.stat().st_mtime < cutoff_date: |
|
|
model_file.unlink() |
|
|
count += 1 |
|
|
logger.info(f"Removed old model: {model_file}") |
|
|
|
|
|
return count |
|
|
|
|
|
|
|
|
|
|
|
_training_pipeline = None |
|
|
|
|
|
def get_training_pipeline() -> MLTrainingPipeline: |
|
|
"""Get or create the global training pipeline instance.""" |
|
|
global _training_pipeline |
|
|
if _training_pipeline is None: |
|
|
_training_pipeline = MLTrainingPipeline() |
|
|
return _training_pipeline |