# ๐Ÿง  Cidadรฃo.AI Machine Learning Pipeline ## ๐Ÿ“‹ Overview The **Machine Learning Pipeline** powers the analytical core of Cidadรฃo.AI with **advanced anomaly detection**, **pattern recognition**, and **explainable AI** capabilities. Built with **scikit-learn**, **TensorFlow**, and **statistical analysis** tools to provide transparent, interpretable insights into government data. ## ๐Ÿ—๏ธ Architecture ``` src/ml/ โ”œโ”€โ”€ models.py # Core ML models and algorithms โ”œโ”€โ”€ anomaly_detector.py # Anomaly detection engine โ”œโ”€โ”€ pattern_analyzer.py # Pattern recognition system โ”œโ”€โ”€ spectral_analyzer.py # Frequency domain analysis โ”œโ”€โ”€ data_pipeline.py # Data preprocessing pipeline โ”œโ”€โ”€ training_pipeline.py # Model training orchestration โ”œโ”€โ”€ advanced_pipeline.py # Advanced ML algorithms โ”œโ”€โ”€ cidadao_model.py # Custom Cidadรฃo.AI model โ”œโ”€โ”€ hf_cidadao_model.py # HuggingFace integration โ”œโ”€โ”€ model_api.py # Model serving API โ”œโ”€โ”€ hf_integration.py # HuggingFace deployment โ””โ”€โ”€ transparency_benchmark.py # Model evaluation benchmarks ``` ## ๐Ÿ”ฌ Core ML Capabilities ### 1. **Anomaly Detection Engine** (anomaly_detector.py) #### Statistical Anomaly Detection ```python class AnomalyDetector: """ Multi-algorithm anomaly detection for government transparency data Methods: - Statistical outliers (Z-score, IQR, Modified Z-score) - Isolation Forest for high-dimensional data - One-Class SVM for complex patterns - Local Outlier Factor for density-based detection - Time series anomalies with seasonal decomposition """ # Price anomaly detection def detect_price_anomalies( self, contracts: List[Contract], threshold: float = 2.5 ) -> List[PriceAnomaly]: """ Detect price anomalies using statistical methods Algorithm: 1. Group contracts by category/type 2. Calculate mean and standard deviation 3. Flag contracts beyond threshold * std_dev 4. Apply contextual filters (contract size, organization type) """ # Vendor concentration analysis def detect_vendor_concentration( self, contracts: List[Contract], concentration_threshold: float = 0.7 ) -> List[VendorConcentrationAnomaly]: """ Detect monopolistic vendor patterns Algorithm: 1. Calculate vendor market share by organization 2. Apply Herfindahl-Hirschman Index (HHI) 3. Flag organizations with high vendor concentration 4. Analyze temporal patterns for sudden changes """ ``` #### Advanced Anomaly Types ```python # Anomaly classification system class AnomalyType(Enum): PRICE_OUTLIER = "price_outlier" # Statistical price deviation VENDOR_CONCENTRATION = "vendor_concentration" # Market concentration TEMPORAL_SUSPICION = "temporal_suspicion" # Timing irregularities DUPLICATE_CONTRACT = "duplicate_contract" # Contract similarity PAYMENT_IRREGULARITY = "payment_irregularity" # Payment pattern anomaly SEASONAL_DEVIATION = "seasonal_deviation" # Seasonal pattern break NETWORK_ANOMALY = "network_anomaly" # Graph-based anomalies # Severity classification class AnomalySeverity(Enum): LOW = "low" # Minor deviations, may be normal MEDIUM = "medium" # Noticeable patterns requiring attention HIGH = "high" # Strong indicators of irregularities CRITICAL = "critical" # Severe anomalies requiring immediate action ``` ### 2. **Pattern Analysis System** (pattern_analyzer.py) #### Time Series Analysis ```python class PatternAnalyzer: """ Advanced pattern recognition for government spending patterns Capabilities: - Seasonal decomposition (trend, seasonal, residual) - Spectral analysis using FFT - Cross-correlation analysis between organizations - Regime change detection - Forecasting with uncertainty quantification """ def analyze_spending_trends( self, expenses: List[Expense], decomposition_model: str = "additive" ) -> TrendAnalysis: """ Decompose spending into trend, seasonal, and irregular components Algorithm: 1. Time series preprocessing and gap filling 2. Seasonal-Trend decomposition using LOESS (STL) 3. Trend change point detection 4. Seasonal pattern stability analysis 5. Residual anomaly identification """ def detect_spending_regime_changes( self, time_series: np.ndarray, method: str = "cusum" ) -> List[RegimeChange]: """ Detect structural breaks in spending patterns Methods: - CUSUM (Cumulative Sum) control charts - Bayesian change point detection - Structural break tests (Chow test, Quandt-Andrews) """ ``` #### Cross-Organizational Analysis ```python def analyze_cross_organizational_patterns( self, organizations: List[str], time_window: str = "monthly" ) -> CrossOrgAnalysis: """ Identify patterns across government organizations Features: - Spending correlation analysis - Synchronized timing detection - Resource competition analysis - Coordination pattern identification """ # Calculate cross-correlation matrix correlation_matrix = np.corrcoef([ org_spending_series for org in organizations ]) # Detect synchronized events synchronized_events = self._detect_synchronized_spending( organizations, threshold=0.8 ) return CrossOrgAnalysis( correlation_matrix=correlation_matrix, synchronized_events=synchronized_events, coordination_score=self._calculate_coordination_score(correlation_matrix) ) ``` ### 3. **Spectral Analysis Engine** (spectral_analyzer.py) #### Frequency Domain Analysis ```python class SpectralAnalyzer: """ Frequency domain analysis for detecting periodic patterns Applications: - End-of-year spending rush detection - Electoral cycle influence analysis - Budget cycle pattern identification - Periodic corruption pattern detection """ def analyze_spending_spectrum( self, spending_series: np.ndarray, sampling_rate: str = "monthly" ) -> SpectralAnalysis: """ Perform FFT analysis on spending time series Algorithm: 1. Preprocessing: detrending, windowing 2. Fast Fourier Transform (FFT) 3. Power spectral density estimation 4. Peak detection in frequency domain 5. Periodic pattern significance testing """ # Remove trend and apply windowing detrended = signal.detrend(spending_series) windowed = detrended * signal.windows.hann(len(detrended)) # FFT analysis frequencies = np.fft.fftfreq(len(windowed)) fft_result = np.fft.fft(windowed) power_spectrum = np.abs(fft_result) ** 2 # Detect significant peaks peaks, properties = signal.find_peaks( power_spectrum, height=np.mean(power_spectrum) + 2 * np.std(power_spectrum), distance=10 ) return SpectralAnalysis( frequencies=frequencies[peaks], power_spectrum=power_spectrum, significant_periods=1 / frequencies[peaks], seasonality_strength=self._calculate_seasonality_strength(power_spectrum) ) ``` ### 4. **Data Processing Pipeline** (data_pipeline.py) #### Advanced Data Preprocessing ```python class DataPipeline: """ Comprehensive data preprocessing for ML algorithms Features: - Missing value imputation with multiple strategies - Outlier detection and treatment - Feature engineering for government data - Text preprocessing for contract descriptions - Temporal feature extraction """ def preprocess_contracts( self, contracts: List[Contract] ) -> ProcessedDataset: """ Transform raw contract data into ML-ready features Pipeline: 1. Data cleaning and validation 2. Missing value imputation 3. Categorical encoding 4. Numerical scaling and normalization 5. Feature engineering 6. Dimensionality reduction if needed """ # Extract features features = self._extract_contract_features(contracts) # Handle missing values features_imputed = self._impute_missing_values(features) # Scale numerical features features_scaled = self._scale_features(features_imputed) # Engineer domain-specific features features_engineered = self._engineer_transparency_features(features_scaled) return ProcessedDataset( features=features_engineered, feature_names=self._get_feature_names(), preprocessing_metadata=self._get_preprocessing_metadata() ) def _extract_contract_features(self, contracts: List[Contract]) -> np.ndarray: """Extract numerical features from contract data""" features = [] for contract in contracts: contract_features = [ # Financial features float(contract.valor_inicial or 0), float(contract.valor_global or 0), # Temporal features self._extract_temporal_features(contract.data_assinatura), # Categorical features (encoded) self._encode_modality(contract.modalidade_contratacao), self._encode_organization(contract.orgao.codigo if contract.orgao else None), # Text features (TF-IDF of contract object) *self._extract_text_features(contract.objeto), # Derived features self._calculate_contract_duration(contract), self._calculate_value_per_day(contract), self._get_vendor_risk_score(contract.fornecedor), ] features.append(contract_features) return np.array(features) ``` ### 5. **Custom Cidadรฃo.AI Model** (cidadao_model.py) #### Specialized Transparency Analysis Model ```python class CidadaoAIModel: """ Custom model specialized for Brazilian government transparency analysis Architecture: - Multi-task learning for various anomaly types - Attention mechanisms for important features - Interpretability through SHAP values - Uncertainty quantification - Brazilian government domain knowledge integration """ def __init__(self): self.anomaly_detector = self._build_anomaly_detector() self.pattern_classifier = self._build_pattern_classifier() self.risk_scorer = self._build_risk_scorer() self.explainer = self._build_explainer() def _build_anomaly_detector(self) -> tf.keras.Model: """Build neural network for anomaly detection""" inputs = tf.keras.Input(shape=(self.n_features,)) # Encoder encoded = tf.keras.layers.Dense(128, activation='relu')(inputs) encoded = tf.keras.layers.Dropout(0.2)(encoded) encoded = tf.keras.layers.Dense(64, activation='relu')(encoded) encoded = tf.keras.layers.Dropout(0.2)(encoded) encoded = tf.keras.layers.Dense(32, activation='relu')(encoded) # Decoder (autoencoder for anomaly detection) decoded = tf.keras.layers.Dense(64, activation='relu')(encoded) decoded = tf.keras.layers.Dense(128, activation='relu')(decoded) decoded = tf.keras.layers.Dense(self.n_features, activation='linear')(decoded) # Anomaly score output anomaly_score = tf.keras.layers.Dense(1, activation='sigmoid', name='anomaly_score')(encoded) model = tf.keras.Model(inputs=inputs, outputs=[decoded, anomaly_score]) return model def predict_anomalies( self, data: np.ndarray, return_explanations: bool = True ) -> AnomalyPrediction: """ Predict anomalies with explanations Returns: - Anomaly scores (0-1) - Anomaly classifications - Feature importance (SHAP values) - Confidence intervals """ # Get predictions reconstructed, anomaly_scores = self.anomaly_detector.predict(data) # Calculate reconstruction error reconstruction_error = np.mean((data - reconstructed) ** 2, axis=1) # Classify anomalies anomaly_labels = (anomaly_scores > self.anomaly_threshold).astype(int) # Generate explanations if requested explanations = None if return_explanations: explanations = self.explainer.explain_predictions(data, anomaly_scores) return AnomalyPrediction( anomaly_scores=anomaly_scores, anomaly_labels=anomaly_labels, reconstruction_error=reconstruction_error, explanations=explanations, confidence=self._calculate_confidence(anomaly_scores) ) ``` ### 6. **Model Interpretability** (explainer.py) #### SHAP-based Explanations ```python class TransparencyExplainer: """ Explainable AI for transparency analysis results Methods: - SHAP (SHapley Additive exPlanations) values - LIME (Local Interpretable Model-agnostic Explanations) - Feature importance analysis - Decision boundary visualization """ def explain_anomaly_prediction( self, model: Any, data: np.ndarray, prediction_index: int ) -> AnomalyExplanation: """ Generate human-readable explanations for anomaly predictions Returns: - Feature contributions to the prediction - Natural language explanation - Visualization data for charts - Confidence intervals """ # Calculate SHAP values explainer = shap.DeepExplainer(model, data[:100]) # Background data shap_values = explainer.shap_values(data[prediction_index:prediction_index+1]) # Get feature names and values feature_names = self.get_feature_names() feature_values = data[prediction_index] # Sort by importance importance_indices = np.argsort(np.abs(shap_values[0]))[::-1] # Generate natural language explanation explanation_text = self._generate_explanation_text( shap_values[0], feature_names, feature_values, importance_indices[:5] # Top 5 features ) return AnomalyExplanation( shap_values=shap_values[0], feature_names=feature_names, feature_values=feature_values, explanation_text=explanation_text, top_features=importance_indices[:10] ) def _generate_explanation_text( self, shap_values: np.ndarray, feature_names: List[str], feature_values: np.ndarray, top_indices: List[int] ) -> str: """Generate human-readable explanation""" explanations = [] for idx in top_indices: feature_name = feature_names[idx] feature_value = feature_values[idx] shap_value = shap_values[idx] if shap_value > 0: direction = "increases" else: direction = "decreases" explanation = f"The {feature_name} value of {feature_value:.2f} {direction} the anomaly score by {abs(shap_value):.3f}" explanations.append(explanation) return ". ".join(explanations) + "." ``` ## ๐Ÿ“Š Model Training & Evaluation ### Training Pipeline (training_pipeline.py) #### Automated Model Training ```python class ModelTrainingPipeline: """ Automated training pipeline for transparency analysis models Features: - Cross-validation with time series splits - Hyperparameter optimization - Model selection and ensemble methods - Performance monitoring and logging - Automated model deployment """ def train_anomaly_detection_model( self, training_data: ProcessedDataset, validation_split: float = 0.2, hyperparameter_search: bool = True ) -> TrainingResult: """ Train anomaly detection model with optimization Pipeline: 1. Data splitting with temporal considerations 2. Hyperparameter optimization using Optuna 3. Model training with early stopping 4. Cross-validation evaluation 5. Model interpretation and validation """ # Split data maintaining temporal order train_data, val_data = self._temporal_split(training_data, validation_split) # Hyperparameter optimization if hyperparameter_search: best_params = self._optimize_hyperparameters(train_data, val_data) else: best_params = self.default_params # Train final model model = self._train_model(train_data, best_params) # Evaluate model evaluation_results = self._evaluate_model(model, val_data) # Generate model interpretation interpretation = self._interpret_model(model, val_data) return TrainingResult( model=model, parameters=best_params, evaluation=evaluation_results, interpretation=interpretation, training_metadata=self._get_training_metadata() ) ``` ### Model Evaluation Metrics ```python class TransparencyMetrics: """ Specialized metrics for transparency analysis evaluation Metrics: - Precision/Recall for anomaly detection - F1-score with class imbalance handling - Area Under ROC Curve (AUC-ROC) - Area Under Precision-Recall Curve (AUC-PR) - False Positive Rate at operational thresholds - Coverage: percentage of true anomalies detected """ def calculate_anomaly_detection_metrics( self, y_true: np.ndarray, y_pred_proba: np.ndarray, threshold: float = 0.5 ) -> Dict[str, float]: """Calculate comprehensive metrics for anomaly detection""" y_pred = (y_pred_proba > threshold).astype(int) # Basic classification metrics precision = precision_score(y_true, y_pred) recall = recall_score(y_true, y_pred) f1 = f1_score(y_true, y_pred) # ROC metrics auc_roc = roc_auc_score(y_true, y_pred_proba) auc_pr = average_precision_score(y_true, y_pred_proba) # Cost-sensitive metrics false_positive_rate = self._calculate_fpr(y_true, y_pred) false_negative_rate = self._calculate_fnr(y_true, y_pred) # Domain-specific metrics coverage = self._calculate_coverage(y_true, y_pred) efficiency = self._calculate_efficiency(y_true, y_pred) return { 'precision': precision, 'recall': recall, 'f1_score': f1, 'auc_roc': auc_roc, 'auc_pr': auc_pr, 'false_positive_rate': false_positive_rate, 'false_negative_rate': false_negative_rate, 'coverage': coverage, 'efficiency': efficiency } ``` ## ๐Ÿš€ Model Deployment ### HuggingFace Integration (hf_integration.py) #### Model Publishing to HuggingFace Hub ```python class HuggingFaceIntegration: """ Integration with HuggingFace Hub for model sharing and deployment Features: - Model uploading with metadata - Automatic model card generation - Version control and model registry - Inference API integration - Community model sharing """ def upload_model_to_hub( self, model: tf.keras.Model, model_name: str, description: str, metrics: Dict[str, float] ) -> str: """ Upload trained model to HuggingFace Hub Process: 1. Convert model to HuggingFace format 2. Generate model card with metrics and description 3. Package preprocessing pipelines 4. Upload to Hub with version tags 5. Set up inference API """ # Convert to HuggingFace format hf_model = self._convert_to_hf_format(model) # Generate model card model_card = self._generate_model_card( model_name, description, metrics ) # Upload to hub repo_url = hf_model.push_to_hub( model_name, commit_message=f"Upload {model_name} v{self.version}", model_card=model_card ) return repo_url ``` ### API Serving (model_api.py) #### FastAPI Model Serving ```python from fastapi import FastAPI, HTTPException from pydantic import BaseModel app = FastAPI(title="Cidadรฃo.AI ML API") class PredictionRequest(BaseModel): contracts: List[Dict[str, Any]] include_explanations: bool = True anomaly_threshold: float = 0.5 class PredictionResponse(BaseModel): anomalies: List[AnomalyResult] model_version: str processing_time_ms: float confidence_score: float @app.post("/predict/anomalies", response_model=PredictionResponse) async def predict_anomalies(request: PredictionRequest): """ Predict anomalies in government contracts Returns: - Anomaly predictions with scores - Explanations for each prediction - Model metadata and performance metrics """ start_time = time.time() # Load model (cached) model = await get_cached_model() # Preprocess data processed_data = preprocess_contracts(request.contracts) # Make predictions predictions = model.predict_anomalies( processed_data, threshold=request.anomaly_threshold, return_explanations=request.include_explanations ) processing_time = (time.time() - start_time) * 1000 return PredictionResponse( anomalies=predictions.anomalies, model_version=model.version, processing_time_ms=processing_time, confidence_score=predictions.overall_confidence ) ``` ## ๐Ÿ“Š Performance Benchmarks ### Transparency Benchmark Suite (transparency_benchmark.py) #### Comprehensive Model Evaluation ```python class TransparencyBenchmark: """ Benchmark suite for transparency analysis models Tests: - Synthetic anomaly detection - Real-world case study validation - Cross-organization generalization - Temporal stability assessment - Interpretability quality metrics """ def run_comprehensive_benchmark( self, model: Any, test_datasets: List[str] ) -> BenchmarkResults: """ Run complete benchmark suite on model Benchmarks: 1. Synthetic data with known anomalies 2. Historical case studies with verified outcomes 3. Cross-validation across different organizations 4. Temporal robustness testing 5. Adversarial robustness evaluation """ results = {} for dataset_name in test_datasets: dataset = self._load_benchmark_dataset(dataset_name) # Run predictions predictions = model.predict(dataset.X) # Calculate metrics metrics = self._calculate_metrics(dataset.y, predictions) # Test interpretability interpretability_score = self._test_interpretability( model, dataset.X[:10] ) results[dataset_name] = { 'metrics': metrics, 'interpretability': interpretability_score, 'processing_time': self._measure_processing_time(model, dataset.X) } return BenchmarkResults(results) ``` ## ๐Ÿงช Usage Examples ### Basic Anomaly Detection ```python from src.ml.anomaly_detector import AnomalyDetector from src.ml.data_pipeline import DataPipeline # Initialize components detector = AnomalyDetector() pipeline = DataPipeline() # Process contract data contracts = fetch_contracts_from_api() processed_data = pipeline.preprocess_contracts(contracts) # Detect anomalies anomalies = detector.detect_price_anomalies( contracts, threshold=2.5 ) for anomaly in anomalies: print(f"Anomaly: {anomaly.description}") print(f"Confidence: {anomaly.confidence:.2f}") print(f"Affected contracts: {len(anomaly.affected_records)}") ``` ### Advanced Pattern Analysis ```python from src.ml.pattern_analyzer import PatternAnalyzer from src.ml.spectral_analyzer import SpectralAnalyzer # Initialize analyzers pattern_analyzer = PatternAnalyzer() spectral_analyzer = SpectralAnalyzer() # Analyze spending trends expenses = fetch_expenses_from_api(organization="20000", year=2024) trend_analysis = pattern_analyzer.analyze_spending_trends(expenses) print(f"Trend direction: {trend_analysis.trend_direction}") print(f"Seasonality strength: {trend_analysis.seasonality_strength:.2f}") print(f"Anomalous periods: {len(trend_analysis.anomalous_periods)}") # Spectral analysis spending_series = extract_monthly_spending(expenses) spectral_analysis = spectral_analyzer.analyze_spending_spectrum(spending_series) print(f"Dominant periods: {spectral_analysis.significant_periods}") print(f"End-of-year effect: {spectral_analysis.eoy_strength:.2f}") ``` ### Custom Model Training ```python from src.ml.training_pipeline import ModelTrainingPipeline from src.ml.cidadao_model import CidadaoAIModel # Prepare training data training_data = prepare_training_dataset() # Initialize training pipeline trainer = ModelTrainingPipeline() # Train model with hyperparameter optimization training_result = await trainer.train_anomaly_detection_model( training_data, hyperparameter_search=True, cross_validation_folds=5 ) print(f"Best F1 score: {training_result.evaluation.f1_score:.3f}") print(f"Model size: {training_result.model.count_params()} parameters") # Deploy to HuggingFace hf_integration = HuggingFaceIntegration() model_url = hf_integration.upload_model_to_hub( training_result.model, "cidadao-ai/anomaly-detector-v1", "Government contract anomaly detection model", training_result.evaluation.metrics ) print(f"Model deployed: {model_url}") ``` --- This ML pipeline provides **state-of-the-art anomaly detection** and **pattern analysis** capabilities specifically designed for Brazilian government transparency data, with **full interpretability** and **production-ready deployment** options.