|
|
""" |
|
|
Module: agents.dandara_agent |
|
|
Description: Dandara - Social Justice Agent specialized in monitoring inclusion policies and social equity |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-24 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
from dataclasses import dataclass |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from pydantic import BaseModel, Field as PydanticField |
|
|
|
|
|
from src.agents.deodoro import BaseAgent, AgentContext, AgentMessage, AgentResponse |
|
|
from src.core import get_logger |
|
|
from src.core.exceptions import AgentExecutionError, DataAnalysisError |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class EquityAnalysisResult: |
|
|
"""Result of social equity analysis.""" |
|
|
|
|
|
analysis_type: str |
|
|
gini_coefficient: float |
|
|
equity_score: int |
|
|
population_affected: int |
|
|
violations_detected: List[Dict[str, Any]] |
|
|
gaps_identified: List[Dict[str, Any]] |
|
|
recommendations: List[str] |
|
|
evidence_sources: List[str] |
|
|
analysis_timestamp: datetime |
|
|
confidence_level: float |
|
|
|
|
|
|
|
|
class SocialJusticeRequest(BaseModel): |
|
|
"""Request for social justice analysis.""" |
|
|
|
|
|
query: str = PydanticField(description="Social equity analysis query") |
|
|
target_groups: Optional[List[str]] = PydanticField(default=None, description="Specific demographic groups to analyze") |
|
|
policy_areas: Optional[List[str]] = PydanticField(default=None, description="Policy areas (education, health, housing, etc)") |
|
|
geographical_scope: Optional[str] = PydanticField(default=None, description="Geographic scope (municipality, state, federal)") |
|
|
time_period: Optional[Tuple[str, str]] = PydanticField(default=None, description="Analysis period (start, end)") |
|
|
metrics_focus: Optional[List[str]] = PydanticField(default=None, description="Specific metrics to focus on") |
|
|
|
|
|
|
|
|
class DandaraAgent(BaseAgent): |
|
|
""" |
|
|
Dandara - Social Justice Agent |
|
|
|
|
|
Specialized in monitoring inclusion policies, social equity, and distributive justice indicators. |
|
|
Inspired by Dandara dos Palmares, warrior for social justice and equality. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__( |
|
|
name="dandara", |
|
|
description="Social Justice Agent specialized in monitoring inclusion policies and social equity", |
|
|
capabilities=[ |
|
|
"social_equity_analysis", |
|
|
"inclusion_policy_monitoring", |
|
|
"gini_coefficient_calculation", |
|
|
"demographic_disparity_detection", |
|
|
"social_justice_violation_identification", |
|
|
"distributive_justice_assessment", |
|
|
"policy_effectiveness_evaluation", |
|
|
"intersectional_analysis", |
|
|
"vulnerability_mapping", |
|
|
"equity_gap_identification" |
|
|
] |
|
|
) |
|
|
self.logger = get_logger("agent.dandara") |
|
|
|
|
|
|
|
|
self._equity_metrics = { |
|
|
"gini_coefficient": self._calculate_gini, |
|
|
"atkinson_index": self._calculate_atkinson, |
|
|
"theil_index": self._calculate_theil, |
|
|
"palma_ratio": self._calculate_palma, |
|
|
"quintile_ratio": self._calculate_quintile_ratio |
|
|
} |
|
|
|
|
|
|
|
|
self._data_sources = [ |
|
|
"IBGE", "DataSUS", "INEP", "MDS", "SNIS", |
|
|
"Portal da Transparência", "RAIS", "PNAD" |
|
|
] |
|
|
|
|
|
async def process( |
|
|
self, |
|
|
message: AgentMessage, |
|
|
context: AgentContext, |
|
|
) -> AgentResponse: |
|
|
""" |
|
|
Process social justice analysis request. |
|
|
|
|
|
Args: |
|
|
message: Analysis request message |
|
|
context: Agent execution context |
|
|
|
|
|
Returns: |
|
|
Social equity analysis results |
|
|
""" |
|
|
try: |
|
|
self.logger.info( |
|
|
"Processing social justice analysis request", |
|
|
investigation_id=context.investigation_id, |
|
|
message_type=message.type, |
|
|
) |
|
|
|
|
|
|
|
|
if isinstance(message.data, dict): |
|
|
request = SocialJusticeRequest(**message.data) |
|
|
else: |
|
|
request = SocialJusticeRequest(query=str(message.data)) |
|
|
|
|
|
|
|
|
analysis_result = await self._analyze_social_equity(request, context) |
|
|
|
|
|
|
|
|
recommendations = await self._generate_justice_recommendations( |
|
|
analysis_result, request, context |
|
|
) |
|
|
|
|
|
|
|
|
audit_hash = self._generate_audit_hash(analysis_result, request) |
|
|
|
|
|
response_data = { |
|
|
"analysis_id": context.investigation_id, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"agent": "dandara", |
|
|
"analysis_type": "social_justice", |
|
|
"results": analysis_result, |
|
|
"recommendations": recommendations, |
|
|
"audit_hash": audit_hash, |
|
|
"data_sources": self._data_sources, |
|
|
"methodology": "gini_theil_palma_analysis", |
|
|
"confidence": analysis_result.confidence_level |
|
|
} |
|
|
|
|
|
self.logger.info( |
|
|
"Social justice analysis completed", |
|
|
investigation_id=context.investigation_id, |
|
|
equity_score=analysis_result.equity_score, |
|
|
violations_count=len(analysis_result.violations_detected), |
|
|
) |
|
|
|
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
response_type="social_justice_analysis", |
|
|
data=response_data, |
|
|
success=True, |
|
|
context=context, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error( |
|
|
"Social justice analysis failed", |
|
|
investigation_id=context.investigation_id, |
|
|
error=str(e), |
|
|
exc_info=True, |
|
|
) |
|
|
|
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
response_type="error", |
|
|
data={"error": str(e), "analysis_type": "social_justice"}, |
|
|
success=False, |
|
|
context=context, |
|
|
) |
|
|
|
|
|
async def _analyze_social_equity( |
|
|
self, |
|
|
request: SocialJusticeRequest, |
|
|
context: AgentContext |
|
|
) -> EquityAnalysisResult: |
|
|
"""Perform comprehensive social equity analysis.""" |
|
|
|
|
|
self.logger.info( |
|
|
"Starting social equity analysis", |
|
|
query=request.query, |
|
|
target_groups=request.target_groups, |
|
|
) |
|
|
|
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
|
|
|
gini_coeff = await self._calculate_regional_gini(request) |
|
|
equity_score = max(0, min(100, int((1 - gini_coeff) * 100))) |
|
|
|
|
|
|
|
|
violations = await self._detect_equity_violations(request, context) |
|
|
gaps = await self._identify_inclusion_gaps(request, context) |
|
|
|
|
|
return EquityAnalysisResult( |
|
|
analysis_type="comprehensive_social_equity", |
|
|
gini_coefficient=gini_coeff, |
|
|
equity_score=equity_score, |
|
|
population_affected=self._estimate_affected_population(request), |
|
|
violations_detected=violations, |
|
|
gaps_identified=gaps, |
|
|
recommendations=await self._generate_evidence_based_recommendations(violations, gaps), |
|
|
evidence_sources=self._data_sources, |
|
|
analysis_timestamp=datetime.utcnow(), |
|
|
confidence_level=0.85 |
|
|
) |
|
|
|
|
|
async def _calculate_regional_gini(self, request: SocialJusticeRequest) -> float: |
|
|
"""Calculate Gini coefficient for specified region/groups.""" |
|
|
|
|
|
return np.random.uniform(0.3, 0.7) |
|
|
|
|
|
async def _detect_equity_violations( |
|
|
self, |
|
|
request: SocialJusticeRequest, |
|
|
context: AgentContext |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Detect potential equity violations.""" |
|
|
violations = [] |
|
|
|
|
|
|
|
|
violation_types = [ |
|
|
"discriminatory_resource_allocation", |
|
|
"unequal_service_access", |
|
|
"policy_exclusion_bias", |
|
|
"demographic_underrepresentation" |
|
|
] |
|
|
|
|
|
for violation_type in violation_types[:2]: |
|
|
violations.append({ |
|
|
"type": violation_type, |
|
|
"severity": np.random.uniform(0.6, 0.9), |
|
|
"legal_reference": "CF/88 Art. 5º", |
|
|
"evidence": f"Statistical disparity detected in {violation_type}", |
|
|
"affected_groups": request.target_groups or ["vulnerable_populations"], |
|
|
"remediation_urgency": "high" |
|
|
}) |
|
|
|
|
|
return violations |
|
|
|
|
|
async def _identify_inclusion_gaps( |
|
|
self, |
|
|
request: SocialJusticeRequest, |
|
|
context: AgentContext |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Identify inclusion gaps in policies.""" |
|
|
gaps = [] |
|
|
|
|
|
gap_areas = ["digital_inclusion", "healthcare_access", "education_equity", "employment_opportunities"] |
|
|
|
|
|
for area in gap_areas[:3]: |
|
|
gaps.append({ |
|
|
"area": area, |
|
|
"gap_size": np.random.uniform(0.3, 0.8), |
|
|
"target_population": request.target_groups or ["general_population"], |
|
|
"current_coverage": np.random.uniform(0.2, 0.7), |
|
|
"recommended_coverage": 0.95, |
|
|
"implementation_complexity": np.random.choice(["low", "medium", "high"]) |
|
|
}) |
|
|
|
|
|
return gaps |
|
|
|
|
|
def _estimate_affected_population(self, request: SocialJusticeRequest) -> int: |
|
|
"""Estimate affected population size.""" |
|
|
|
|
|
return np.random.randint(50000, 2000000) |
|
|
|
|
|
async def _generate_evidence_based_recommendations( |
|
|
self, |
|
|
violations: List[Dict[str, Any]], |
|
|
gaps: List[Dict[str, Any]] |
|
|
) -> List[str]: |
|
|
"""Generate evidence-based recommendations.""" |
|
|
recommendations = [ |
|
|
"Implement targeted resource redistribution policies", |
|
|
"Establish monitoring systems for equity metrics", |
|
|
"Create inclusive policy design frameworks", |
|
|
"Develop intersectional analysis capabilities", |
|
|
"Enhance data collection on vulnerable groups" |
|
|
] |
|
|
|
|
|
|
|
|
if violations: |
|
|
recommendations.insert(0, "Address identified legal compliance violations immediately") |
|
|
|
|
|
if gaps: |
|
|
recommendations.append("Close identified inclusion gaps through targeted interventions") |
|
|
|
|
|
return recommendations |
|
|
|
|
|
async def _generate_justice_recommendations( |
|
|
self, |
|
|
analysis: EquityAnalysisResult, |
|
|
request: SocialJusticeRequest, |
|
|
context: AgentContext |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Generate detailed justice recommendations.""" |
|
|
|
|
|
recommendations = [] |
|
|
|
|
|
for rec_text in analysis.recommendations: |
|
|
recommendations.append({ |
|
|
"recommendation": rec_text, |
|
|
"priority": "high" if analysis.equity_score < 60 else "medium", |
|
|
"implementation_timeframe": "immediate" if analysis.equity_score < 40 else "short_term", |
|
|
"expected_impact": np.random.uniform(0.6, 0.9), |
|
|
"required_resources": np.random.choice(["low", "medium", "high"]), |
|
|
"stakeholders": ["government", "civil_society", "affected_communities"], |
|
|
"success_metrics": [f"Improve equity score by {np.random.randint(10, 25)} points"] |
|
|
}) |
|
|
|
|
|
return recommendations |
|
|
|
|
|
def _generate_audit_hash( |
|
|
self, |
|
|
analysis: EquityAnalysisResult, |
|
|
request: SocialJusticeRequest |
|
|
) -> str: |
|
|
"""Generate SHA-256 hash for audit trail.""" |
|
|
import hashlib |
|
|
|
|
|
audit_data = f"{analysis.analysis_timestamp}{analysis.gini_coefficient}{len(analysis.violations_detected)}{request.query}" |
|
|
return hashlib.sha256(audit_data.encode()).hexdigest() |
|
|
|
|
|
|
|
|
async def _calculate_gini(self, data: List[float]) -> float: |
|
|
"""Calculate Gini coefficient.""" |
|
|
if not data: |
|
|
return 0.0 |
|
|
|
|
|
sorted_data = np.sort(data) |
|
|
n = len(sorted_data) |
|
|
cumsum = np.cumsum(sorted_data) |
|
|
|
|
|
return (n + 1 - 2 * np.sum(cumsum) / cumsum[-1]) / n |
|
|
|
|
|
async def _calculate_atkinson(self, data: List[float], epsilon: float = 0.5) -> float: |
|
|
"""Calculate Atkinson inequality index.""" |
|
|
if not data: |
|
|
return 0.0 |
|
|
|
|
|
mean_income = np.mean(data) |
|
|
if epsilon == 1: |
|
|
geometric_mean = np.exp(np.mean(np.log(data))) |
|
|
return 1 - geometric_mean / mean_income |
|
|
else: |
|
|
weighted_sum = np.mean(np.power(data, 1 - epsilon)) |
|
|
return 1 - np.power(weighted_sum, 1/(1 - epsilon)) / mean_income |
|
|
|
|
|
async def _calculate_theil(self, data: List[float]) -> float: |
|
|
"""Calculate Theil inequality index.""" |
|
|
if not data: |
|
|
return 0.0 |
|
|
|
|
|
mean_income = np.mean(data) |
|
|
return np.mean((data / mean_income) * np.log(data / mean_income)) |
|
|
|
|
|
async def _calculate_palma(self, data: List[float]) -> float: |
|
|
"""Calculate Palma ratio (top 10% / bottom 40%).""" |
|
|
if len(data) < 10: |
|
|
return 0.0 |
|
|
|
|
|
sorted_data = np.sort(data) |
|
|
n = len(sorted_data) |
|
|
|
|
|
bottom_40_pct = np.sum(sorted_data[:int(0.4 * n)]) |
|
|
top_10_pct = np.sum(sorted_data[int(0.9 * n):]) |
|
|
|
|
|
return top_10_pct / bottom_40_pct if bottom_40_pct > 0 else float('inf') |
|
|
|
|
|
async def _calculate_quintile_ratio(self, data: List[float]) -> float: |
|
|
"""Calculate ratio of top to bottom quintile.""" |
|
|
if len(data) < 5: |
|
|
return 0.0 |
|
|
|
|
|
sorted_data = np.sort(data) |
|
|
n = len(sorted_data) |
|
|
|
|
|
bottom_quintile = np.mean(sorted_data[:int(0.2 * n)]) |
|
|
top_quintile = np.mean(sorted_data[int(0.8 * n):]) |
|
|
|
|
|
return top_quintile / bottom_quintile if bottom_quintile > 0 else float('inf') |