""" Module: agents.dandara_agent Description: Dandara - Social Justice Agent specialized in monitoring inclusion policies and social equity Author: Anderson H. Silva Date: 2025-01-24 License: Proprietary - All rights reserved """ import asyncio from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple from dataclasses import dataclass import numpy as np import pandas as pd from pydantic import BaseModel, Field as PydanticField from src.agents.deodoro import BaseAgent, AgentContext, AgentMessage, AgentResponse from src.core import get_logger from src.core.exceptions import AgentExecutionError, DataAnalysisError @dataclass class EquityAnalysisResult: """Result of social equity analysis.""" analysis_type: str gini_coefficient: float # 0.0 to 1.0 equity_score: int # 0-100 population_affected: int violations_detected: List[Dict[str, Any]] gaps_identified: List[Dict[str, Any]] recommendations: List[str] evidence_sources: List[str] analysis_timestamp: datetime confidence_level: float class SocialJusticeRequest(BaseModel): """Request for social justice analysis.""" query: str = PydanticField(description="Social equity analysis query") target_groups: Optional[List[str]] = PydanticField(default=None, description="Specific demographic groups to analyze") policy_areas: Optional[List[str]] = PydanticField(default=None, description="Policy areas (education, health, housing, etc)") geographical_scope: Optional[str] = PydanticField(default=None, description="Geographic scope (municipality, state, federal)") time_period: Optional[Tuple[str, str]] = PydanticField(default=None, description="Analysis period (start, end)") metrics_focus: Optional[List[str]] = PydanticField(default=None, description="Specific metrics to focus on") class DandaraAgent(BaseAgent): """ Dandara - Social Justice Agent Specialized in monitoring inclusion policies, social equity, and distributive justice indicators. Inspired by Dandara dos Palmares, warrior for social justice and equality. """ def __init__(self): super().__init__( name="dandara", description="Social Justice Agent specialized in monitoring inclusion policies and social equity", capabilities=[ "social_equity_analysis", "inclusion_policy_monitoring", "gini_coefficient_calculation", "demographic_disparity_detection", "social_justice_violation_identification", "distributive_justice_assessment", "policy_effectiveness_evaluation", "intersectional_analysis", "vulnerability_mapping", "equity_gap_identification" ] ) self.logger = get_logger("agent.dandara") # Social justice analysis tools self._equity_metrics = { "gini_coefficient": self._calculate_gini, "atkinson_index": self._calculate_atkinson, "theil_index": self._calculate_theil, "palma_ratio": self._calculate_palma, "quintile_ratio": self._calculate_quintile_ratio } # Data sources for social analysis self._data_sources = [ "IBGE", "DataSUS", "INEP", "MDS", "SNIS", "Portal da TransparĂȘncia", "RAIS", "PNAD" ] async def process( self, message: AgentMessage, context: AgentContext, ) -> AgentResponse: """ Process social justice analysis request. Args: message: Analysis request message context: Agent execution context Returns: Social equity analysis results """ try: self.logger.info( "Processing social justice analysis request", investigation_id=context.investigation_id, message_type=message.type, ) # Parse request if isinstance(message.data, dict): request = SocialJusticeRequest(**message.data) else: request = SocialJusticeRequest(query=str(message.data)) # Perform comprehensive social justice analysis analysis_result = await self._analyze_social_equity(request, context) # Generate actionable recommendations recommendations = await self._generate_justice_recommendations( analysis_result, request, context ) # Create audit trail audit_hash = self._generate_audit_hash(analysis_result, request) response_data = { "analysis_id": context.investigation_id, "timestamp": datetime.utcnow().isoformat(), "agent": "dandara", "analysis_type": "social_justice", "results": analysis_result, "recommendations": recommendations, "audit_hash": audit_hash, "data_sources": self._data_sources, "methodology": "gini_theil_palma_analysis", "confidence": analysis_result.confidence_level } self.logger.info( "Social justice analysis completed", investigation_id=context.investigation_id, equity_score=analysis_result.equity_score, violations_count=len(analysis_result.violations_detected), ) return AgentResponse( agent_name=self.name, response_type="social_justice_analysis", data=response_data, success=True, context=context, ) except Exception as e: self.logger.error( "Social justice analysis failed", investigation_id=context.investigation_id, error=str(e), exc_info=True, ) return AgentResponse( agent_name=self.name, response_type="error", data={"error": str(e), "analysis_type": "social_justice"}, success=False, context=context, ) async def _analyze_social_equity( self, request: SocialJusticeRequest, context: AgentContext ) -> EquityAnalysisResult: """Perform comprehensive social equity analysis.""" self.logger.info( "Starting social equity analysis", query=request.query, target_groups=request.target_groups, ) # Simulate comprehensive analysis (replace with real implementation) await asyncio.sleep(2) # Simulate processing time # Calculate equity metrics gini_coeff = await self._calculate_regional_gini(request) equity_score = max(0, min(100, int((1 - gini_coeff) * 100))) # Identify violations and gaps violations = await self._detect_equity_violations(request, context) gaps = await self._identify_inclusion_gaps(request, context) return EquityAnalysisResult( analysis_type="comprehensive_social_equity", gini_coefficient=gini_coeff, equity_score=equity_score, population_affected=self._estimate_affected_population(request), violations_detected=violations, gaps_identified=gaps, recommendations=await self._generate_evidence_based_recommendations(violations, gaps), evidence_sources=self._data_sources, analysis_timestamp=datetime.utcnow(), confidence_level=0.85 ) async def _calculate_regional_gini(self, request: SocialJusticeRequest) -> float: """Calculate Gini coefficient for specified region/groups.""" # Placeholder - implement real Gini calculation return np.random.uniform(0.3, 0.7) # Brazil typically 0.5-0.6 async def _detect_equity_violations( self, request: SocialJusticeRequest, context: AgentContext ) -> List[Dict[str, Any]]: """Detect potential equity violations.""" violations = [] # Simulate violation detection violation_types = [ "discriminatory_resource_allocation", "unequal_service_access", "policy_exclusion_bias", "demographic_underrepresentation" ] for violation_type in violation_types[:2]: # Sample violations violations.append({ "type": violation_type, "severity": np.random.uniform(0.6, 0.9), "legal_reference": "CF/88 Art. 5Âș", "evidence": f"Statistical disparity detected in {violation_type}", "affected_groups": request.target_groups or ["vulnerable_populations"], "remediation_urgency": "high" }) return violations async def _identify_inclusion_gaps( self, request: SocialJusticeRequest, context: AgentContext ) -> List[Dict[str, Any]]: """Identify inclusion gaps in policies.""" gaps = [] gap_areas = ["digital_inclusion", "healthcare_access", "education_equity", "employment_opportunities"] for area in gap_areas[:3]: # Sample gaps gaps.append({ "area": area, "gap_size": np.random.uniform(0.3, 0.8), "target_population": request.target_groups or ["general_population"], "current_coverage": np.random.uniform(0.2, 0.7), "recommended_coverage": 0.95, "implementation_complexity": np.random.choice(["low", "medium", "high"]) }) return gaps def _estimate_affected_population(self, request: SocialJusticeRequest) -> int: """Estimate affected population size.""" # Placeholder - implement real population estimation return np.random.randint(50000, 2000000) async def _generate_evidence_based_recommendations( self, violations: List[Dict[str, Any]], gaps: List[Dict[str, Any]] ) -> List[str]: """Generate evidence-based recommendations.""" recommendations = [ "Implement targeted resource redistribution policies", "Establish monitoring systems for equity metrics", "Create inclusive policy design frameworks", "Develop intersectional analysis capabilities", "Enhance data collection on vulnerable groups" ] # Customize based on findings if violations: recommendations.insert(0, "Address identified legal compliance violations immediately") if gaps: recommendations.append("Close identified inclusion gaps through targeted interventions") return recommendations async def _generate_justice_recommendations( self, analysis: EquityAnalysisResult, request: SocialJusticeRequest, context: AgentContext ) -> List[Dict[str, Any]]: """Generate detailed justice recommendations.""" recommendations = [] for rec_text in analysis.recommendations: recommendations.append({ "recommendation": rec_text, "priority": "high" if analysis.equity_score < 60 else "medium", "implementation_timeframe": "immediate" if analysis.equity_score < 40 else "short_term", "expected_impact": np.random.uniform(0.6, 0.9), "required_resources": np.random.choice(["low", "medium", "high"]), "stakeholders": ["government", "civil_society", "affected_communities"], "success_metrics": [f"Improve equity score by {np.random.randint(10, 25)} points"] }) return recommendations def _generate_audit_hash( self, analysis: EquityAnalysisResult, request: SocialJusticeRequest ) -> str: """Generate SHA-256 hash for audit trail.""" import hashlib audit_data = f"{analysis.analysis_timestamp}{analysis.gini_coefficient}{len(analysis.violations_detected)}{request.query}" return hashlib.sha256(audit_data.encode()).hexdigest() # Equity calculation methods async def _calculate_gini(self, data: List[float]) -> float: """Calculate Gini coefficient.""" if not data: return 0.0 sorted_data = np.sort(data) n = len(sorted_data) cumsum = np.cumsum(sorted_data) return (n + 1 - 2 * np.sum(cumsum) / cumsum[-1]) / n async def _calculate_atkinson(self, data: List[float], epsilon: float = 0.5) -> float: """Calculate Atkinson inequality index.""" if not data: return 0.0 mean_income = np.mean(data) if epsilon == 1: geometric_mean = np.exp(np.mean(np.log(data))) return 1 - geometric_mean / mean_income else: weighted_sum = np.mean(np.power(data, 1 - epsilon)) return 1 - np.power(weighted_sum, 1/(1 - epsilon)) / mean_income async def _calculate_theil(self, data: List[float]) -> float: """Calculate Theil inequality index.""" if not data: return 0.0 mean_income = np.mean(data) return np.mean((data / mean_income) * np.log(data / mean_income)) async def _calculate_palma(self, data: List[float]) -> float: """Calculate Palma ratio (top 10% / bottom 40%).""" if len(data) < 10: return 0.0 sorted_data = np.sort(data) n = len(sorted_data) bottom_40_pct = np.sum(sorted_data[:int(0.4 * n)]) top_10_pct = np.sum(sorted_data[int(0.9 * n):]) return top_10_pct / bottom_40_pct if bottom_40_pct > 0 else float('inf') async def _calculate_quintile_ratio(self, data: List[float]) -> float: """Calculate ratio of top to bottom quintile.""" if len(data) < 5: return 0.0 sorted_data = np.sort(data) n = len(sorted_data) bottom_quintile = np.mean(sorted_data[:int(0.2 * n)]) top_quintile = np.mean(sorted_data[int(0.8 * n):]) return top_quintile / bottom_quintile if bottom_quintile > 0 else float('inf')