""" Module: agents.bonifacio_agent Description: José Bonifácio - Public Policy Agent specialized in analyzing policy effectiveness Author: Anderson H. Silva Date: 2025-01-24 License: Proprietary - All rights reserved """ import asyncio import hashlib from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple from dataclasses import dataclass from enum import Enum import statistics import numpy as np import pandas as pd from pydantic import BaseModel, Field as PydanticField from src.agents.deodoro import BaseAgent, AgentContext, AgentMessage, AgentResponse from src.core import get_logger from src.core.exceptions import AgentExecutionError, DataAnalysisError class PolicyStatus(Enum): """Status of policy analysis.""" ACTIVE = "active" INACTIVE = "inactive" UNDER_REVIEW = "under_review" DISCONTINUED = "discontinued" PLANNED = "planned" class ImpactLevel(Enum): """Impact level classification.""" VERY_LOW = "very_low" LOW = "low" MEDIUM = "medium" HIGH = "high" VERY_HIGH = "very_high" @dataclass class PolicyIndicator: """Policy performance indicator.""" name: str baseline_value: float current_value: float target_value: float unit: str data_source: str last_update: datetime statistical_significance: float trend: str # "improving", "deteriorating", "stable" @dataclass class PolicyEvaluation: """Comprehensive policy evaluation result.""" policy_id: str policy_name: str analysis_period: Tuple[datetime, datetime] status: PolicyStatus investment: Dict[str, float] # planned, executed, deviation beneficiaries: Dict[str, Any] # target, reached, cost_per_capita indicators: List[PolicyIndicator] effectiveness_score: Dict[str, float] # efficacy, efficiency, effectiveness roi_social: float sustainability_score: int # 0-100 impact_level: ImpactLevel recommendations: List[Dict[str, Any]] evidence_sources: List[str] analysis_confidence: float hash_verification: str class PolicyAnalysisRequest(BaseModel): """Request for public policy analysis.""" policy_name: str = PydanticField(description="Name or description of the policy") policy_area: Optional[str] = PydanticField(default=None, description="Policy area (health, education, security, etc)") geographical_scope: Optional[str] = PydanticField(default=None, description="Geographic scope (municipal, state, federal)") analysis_period: Optional[Tuple[str, str]] = PydanticField(default=None, description="Analysis period (start, end)") budget_data: Optional[Dict[str, float]] = PydanticField(default=None, description="Budget information") target_indicators: Optional[List[str]] = PydanticField(default=None, description="Specific indicators to analyze") comparison_policies: Optional[List[str]] = PydanticField(default=None, description="Other policies to compare with") benchmarking_scope: str = PydanticField(default="national", description="Benchmarking scope") class BonifacioAgent(BaseAgent): """ José Bonifácio - Public Policy Agent Specialized in analyzing public policy effectiveness, efficiency, and impact. Evaluates institutional reforms and measures social return on investment. Inspired by José Bonifácio de Andrada e Silva, the "Patriarch of Independence" and architect of Brazilian institutional foundations. """ def __init__(self): super().__init__( name="bonifacio", description="Public Policy Agent specialized in analyzing policy effectiveness and institutional reforms", capabilities=[ "policy_effectiveness_analysis", "institutional_reform_evaluation", "social_roi_calculation", "policy_impact_assessment", "benchmarking_analysis", "cost_benefit_analysis", "stakeholder_impact_mapping", "policy_sustainability_scoring", "implementation_gap_analysis", "evidence_based_recommendations", "statistical_significance_testing", "longitudinal_policy_tracking", "comparative_policy_analysis", "resource_allocation_optimization" ] ) self.logger = get_logger("agent.bonifacio") # Policy evaluation frameworks self._evaluation_frameworks = { "logic_model": self._apply_logic_model_framework, "results_chain": self._apply_results_chain_framework, "theory_of_change": self._apply_theory_of_change_framework, "cost_effectiveness": self._apply_cost_effectiveness_framework } # Data sources for policy analysis self._data_sources = [ "Portal da Transparência", "TCU", "CGU", "IBGE", "IPEA", "DataSUS", "INEP", "SIAFI", "SICONV", "Tesouro Nacional", "CAPES", "CNJ", "CNMP" ] # Policy areas and their key indicators self._policy_indicators = { "education": ["literacy_rate", "school_completion", "pisa_scores", "teacher_quality"], "health": ["mortality_rate", "vaccination_coverage", "hospital_capacity", "health_expenditure"], "security": ["crime_rate", "homicide_rate", "police_effectiveness", "prison_population"], "social": ["poverty_rate", "inequality_index", "employment_rate", "social_mobility"], "infrastructure": ["road_quality", "internet_access", "urban_mobility", "housing_deficit"], "environment": ["deforestation_rate", "air_quality", "water_quality", "renewable_energy"] } async def process( self, message: AgentMessage, context: AgentContext, ) -> AgentResponse: """ Process public policy analysis request. Args: message: Policy analysis request context: Agent execution context Returns: Comprehensive policy evaluation results """ try: self.logger.info( "Processing policy analysis request", investigation_id=context.investigation_id, message_type=message.type, ) # Parse request if isinstance(message.data, dict): request = PolicyAnalysisRequest(**message.data) else: request = PolicyAnalysisRequest(policy_name=str(message.data)) # Perform comprehensive policy evaluation evaluation = await self._evaluate_policy(request, context) # Generate strategic recommendations strategic_recommendations = await self._generate_strategic_recommendations( evaluation, request, context ) # Perform benchmarking analysis benchmarking = await self._perform_benchmarking_analysis( evaluation, request ) response_data = { "policy_id": evaluation.policy_id, "timestamp": datetime.utcnow().isoformat(), "agent": "bonifacio", "analysis_type": "policy_effectiveness", "policy_evaluation": { "policy_name": evaluation.policy_name, "status": evaluation.status.value, "investment": evaluation.investment, "beneficiaries": evaluation.beneficiaries, "effectiveness_scores": evaluation.effectiveness_score, "roi_social": evaluation.roi_social, "sustainability_score": evaluation.sustainability_score, "impact_level": evaluation.impact_level.value, "analysis_confidence": evaluation.analysis_confidence }, "indicators": [ { "name": ind.name, "baseline": ind.baseline_value, "current": ind.current_value, "target": ind.target_value, "performance_ratio": ind.current_value / ind.baseline_value if ind.baseline_value != 0 else 1.0, "goal_achievement": (ind.current_value / ind.target_value * 100) if ind.target_value != 0 else 0, "trend": ind.trend, "significance": ind.statistical_significance } for ind in evaluation.indicators ], "strategic_recommendations": strategic_recommendations, "benchmarking": benchmarking, "evidence_sources": evaluation.evidence_sources, "hash_verification": evaluation.hash_verification } self.logger.info( "Policy analysis completed", investigation_id=context.investigation_id, policy_name=evaluation.policy_name, effectiveness_score=evaluation.effectiveness_score.get("effectiveness", 0), impact_level=evaluation.impact_level.value, ) return AgentResponse( agent_name=self.name, response_type="policy_analysis", data=response_data, success=True, context=context, ) except Exception as e: self.logger.error( "Policy analysis failed", investigation_id=context.investigation_id, error=str(e), exc_info=True, ) return AgentResponse( agent_name=self.name, response_type="error", data={"error": str(e), "analysis_type": "policy_effectiveness"}, success=False, context=context, ) async def _evaluate_policy( self, request: PolicyAnalysisRequest, context: AgentContext ) -> PolicyEvaluation: """Perform comprehensive policy evaluation.""" self.logger.info( "Starting policy evaluation", policy_name=request.policy_name, policy_area=request.policy_area, ) # Generate policy ID policy_id = hashlib.md5( f"{request.policy_name}{request.policy_area}".encode() ).hexdigest()[:12] # Simulate comprehensive evaluation (replace with real implementation) await asyncio.sleep(3) # Simulate processing time # Determine analysis period if request.analysis_period: period_start = datetime.strptime(request.analysis_period[0], "%Y-%m-%d") period_end = datetime.strptime(request.analysis_period[1], "%Y-%m-%d") else: period_end = datetime.utcnow() period_start = period_end - timedelta(days=365) # Last year # Generate financial data investment_data = await self._analyze_investment_data(request, context) # Analyze beneficiaries beneficiary_data = await self._analyze_beneficiaries(request, context) # Evaluate indicators indicators = await self._evaluate_policy_indicators(request, context) # Calculate effectiveness scores effectiveness_scores = await self._calculate_effectiveness_scores( investment_data, beneficiary_data, indicators ) # Calculate social ROI social_roi = await self._calculate_social_roi( investment_data, beneficiary_data, indicators ) # Assess sustainability sustainability_score = await self._assess_policy_sustainability( request, investment_data, indicators ) # Determine impact level impact_level = self._classify_impact_level(effectiveness_scores, social_roi) # Generate evidence hash evidence_hash = self._generate_evidence_hash( policy_id, investment_data, beneficiary_data, indicators ) return PolicyEvaluation( policy_id=policy_id, policy_name=request.policy_name, analysis_period=(period_start, period_end), status=PolicyStatus.ACTIVE, # Assume active for now investment=investment_data, beneficiaries=beneficiary_data, indicators=indicators, effectiveness_score=effectiveness_scores, roi_social=social_roi, sustainability_score=sustainability_score, impact_level=impact_level, recommendations=[], # Will be filled by recommendation generator evidence_sources=self._data_sources, analysis_confidence=0.82, hash_verification=evidence_hash ) async def _analyze_investment_data( self, request: PolicyAnalysisRequest, context: AgentContext ) -> Dict[str, float]: """Analyze policy investment and budget execution.""" # Use provided budget data or simulate if request.budget_data: planned = request.budget_data.get("planned", 0) executed = request.budget_data.get("executed", 0) else: # Simulate budget data planned = np.random.uniform(10_000_000, 500_000_000) executed = planned * np.random.uniform(0.7, 1.2) # 70-120% execution deviation = ((executed - planned) / planned) * 100 if planned > 0 else 0 return { "planned": planned, "executed": executed, "deviation_percentage": deviation, "cost_per_beneficiary": executed / max(1, np.random.randint(1000, 100000)) } async def _analyze_beneficiaries( self, request: PolicyAnalysisRequest, context: AgentContext ) -> Dict[str, Any]: """Analyze policy beneficiaries and coverage.""" # Simulate beneficiary analysis target_population = np.random.randint(10000, 1000000) reached_population = int(target_population * np.random.uniform(0.6, 1.1)) coverage_rate = (reached_population / target_population) * 100 return { "target_population": target_population, "reached_population": reached_population, "coverage_rate": coverage_rate, "demographic_breakdown": { "urban": reached_population * 0.7, "rural": reached_population * 0.3, "vulnerable_groups": reached_population * 0.4 } } async def _evaluate_policy_indicators( self, request: PolicyAnalysisRequest, context: AgentContext ) -> List[PolicyIndicator]: """Evaluate key policy performance indicators.""" indicators = [] # Get relevant indicators for policy area policy_area = request.policy_area or "social" relevant_indicators = self._policy_indicators.get(policy_area, ["generic_outcome"]) for indicator_name in relevant_indicators[:5]: # Limit to 5 indicators baseline = np.random.uniform(10, 100) current = baseline * np.random.uniform(0.8, 1.4) # -20% to +40% change target = baseline * np.random.uniform(1.1, 1.5) # 10-50% improvement target # Determine trend if current > baseline * 1.05: trend = "improving" elif current < baseline * 0.95: trend = "deteriorating" else: trend = "stable" indicators.append(PolicyIndicator( name=indicator_name, baseline_value=baseline, current_value=current, target_value=target, unit="rate" if "rate" in indicator_name else "index", data_source=np.random.choice(self._data_sources[:5]), last_update=datetime.utcnow() - timedelta(days=np.random.randint(1, 90)), statistical_significance=np.random.uniform(0.7, 0.95), trend=trend )) return indicators async def _calculate_effectiveness_scores( self, investment: Dict[str, float], beneficiaries: Dict[str, Any], indicators: List[PolicyIndicator] ) -> Dict[str, float]: """Calculate efficacy, efficiency, and effectiveness scores.""" # Efficacy: achievement of intended results target_achievements = [] for ind in indicators: if ind.target_value > 0: achievement = min(1.0, ind.current_value / ind.target_value) target_achievements.append(achievement) efficacy = statistics.mean(target_achievements) if target_achievements else 0.5 # Efficiency: resource utilization budget_efficiency = 1.0 - abs(investment["deviation_percentage"]) / 100 budget_efficiency = max(0.0, min(1.0, budget_efficiency)) coverage_efficiency = min(1.0, beneficiaries["coverage_rate"] / 100) efficiency = (budget_efficiency + coverage_efficiency) / 2 # Effectiveness: overall impact considering costs and benefits cost_effectiveness = efficacy / (investment["cost_per_beneficiary"] / 1000) if investment["cost_per_beneficiary"] > 0 else 0 cost_effectiveness = min(1.0, cost_effectiveness) effectiveness = (efficacy * 0.4 + efficiency * 0.3 + cost_effectiveness * 0.3) return { "efficacy": round(efficacy, 3), "efficiency": round(efficiency, 3), "effectiveness": round(effectiveness, 3), "cost_effectiveness": round(cost_effectiveness, 3) } async def _calculate_social_roi( self, investment: Dict[str, float], beneficiaries: Dict[str, Any], indicators: List[PolicyIndicator] ) -> float: """Calculate Social Return on Investment.""" # Estimate social benefits (simplified model) total_investment = investment["executed"] # Calculate benefits based on indicator improvements social_benefits = 0 for ind in indicators: improvement = max(0, ind.current_value - ind.baseline_value) # Monetize improvement (simplified estimation) benefit_per_unit = np.random.uniform(100, 1000) # R$ per unit improvement social_benefits += improvement * benefit_per_unit * beneficiaries["reached_population"] # Calculate ROI if total_investment > 0: social_roi = (social_benefits - total_investment) / total_investment else: social_roi = 0 return round(social_roi, 3) async def _assess_policy_sustainability( self, request: PolicyAnalysisRequest, investment: Dict[str, float], indicators: List[PolicyIndicator] ) -> int: """Assess policy sustainability score (0-100).""" sustainability_factors = [] # Budget sustainability if abs(investment["deviation_percentage"]) < 10: sustainability_factors.append(85) # Good budget control elif abs(investment["deviation_percentage"]) < 25: sustainability_factors.append(65) # Moderate control else: sustainability_factors.append(35) # Poor control # Performance sustainability (trend analysis) improving_indicators = len([ind for ind in indicators if ind.trend == "improving"]) total_indicators = len(indicators) if total_indicators > 0: performance_sustainability = (improving_indicators / total_indicators) * 100 sustainability_factors.append(performance_sustainability) # Institutional capacity (simulated) institutional_score = np.random.uniform(50, 90) sustainability_factors.append(institutional_score) # Political support (simulated) political_score = np.random.uniform(40, 85) sustainability_factors.append(political_score) return int(statistics.mean(sustainability_factors)) def _classify_impact_level( self, effectiveness_scores: Dict[str, float], social_roi: float ) -> ImpactLevel: """Classify policy impact level.""" overall_effectiveness = effectiveness_scores["effectiveness"] if overall_effectiveness >= 0.8 and social_roi >= 2.0: return ImpactLevel.VERY_HIGH elif overall_effectiveness >= 0.7 and social_roi >= 1.0: return ImpactLevel.HIGH elif overall_effectiveness >= 0.5 and social_roi >= 0.5: return ImpactLevel.MEDIUM elif overall_effectiveness >= 0.3 and social_roi >= 0.0: return ImpactLevel.LOW else: return ImpactLevel.VERY_LOW async def _generate_strategic_recommendations( self, evaluation: PolicyEvaluation, request: PolicyAnalysisRequest, context: AgentContext ) -> List[Dict[str, Any]]: """Generate strategic policy recommendations.""" recommendations = [] # Budget recommendations if abs(evaluation.investment["deviation_percentage"]) > 15: recommendations.append({ "area": "budget_management", "recommendation": "Implement enhanced budget monitoring and control mechanisms", "priority": "high", "expected_impact": 0.8, "implementation_timeframe": "immediate", "success_metrics": ["Reduce budget deviation to <10%"] }) # Coverage recommendations if evaluation.beneficiaries["coverage_rate"] < 80: recommendations.append({ "area": "coverage_expansion", "recommendation": "Expand outreach and improve access mechanisms", "priority": "medium", "expected_impact": 0.7, "implementation_timeframe": "short_term", "success_metrics": ["Increase coverage rate to >85%"] }) # Performance recommendations deteriorating_indicators = [ind for ind in evaluation.indicators if ind.trend == "deteriorating"] if deteriorating_indicators: recommendations.append({ "area": "performance_improvement", "recommendation": f"Address declining performance in {len(deteriorating_indicators)} key indicators", "priority": "high", "expected_impact": 0.9, "implementation_timeframe": "immediate", "success_metrics": ["Reverse negative trends in all indicators"] }) # Sustainability recommendations if evaluation.sustainability_score < 70: recommendations.append({ "area": "sustainability", "recommendation": "Strengthen institutional capacity and long-term planning", "priority": "medium", "expected_impact": 0.6, "implementation_timeframe": "medium_term", "success_metrics": ["Achieve sustainability score >75"] }) return recommendations async def _perform_benchmarking_analysis( self, evaluation: PolicyEvaluation, request: PolicyAnalysisRequest ) -> Dict[str, Any]: """Perform benchmarking against similar policies.""" # Simulate benchmarking data benchmarking = { "reference_policies": [ {"name": "Similar Policy A", "effectiveness": 0.72, "roi": 1.8}, {"name": "Similar Policy B", "effectiveness": 0.68, "roi": 1.4}, {"name": "Best Practice Example", "effectiveness": 0.85, "roi": 2.3} ], "percentile_ranking": { "effectiveness": np.random.randint(40, 95), "efficiency": np.random.randint(35, 90), "roi": np.random.randint(45, 88) }, "improvement_potential": { "effectiveness": max(0, 0.85 - evaluation.effectiveness_score["effectiveness"]), "roi": max(0, 2.3 - evaluation.roi_social) } } return benchmarking def _generate_evidence_hash( self, policy_id: str, investment: Dict[str, float], beneficiaries: Dict[str, Any], indicators: List[PolicyIndicator] ) -> str: """Generate SHA-256 hash for evidence verification.""" evidence_data = f"{policy_id}{investment['executed']}{beneficiaries['reached_population']}{len(indicators)}{datetime.utcnow().date()}" return hashlib.sha256(evidence_data.encode()).hexdigest() # Framework application methods async def _apply_logic_model_framework(self, request, evaluation): """Apply logic model evaluation framework.""" pass # Implementation would depend on specific requirements async def _apply_results_chain_framework(self, request, evaluation): """Apply results chain evaluation framework.""" pass # Implementation would depend on specific requirements async def _apply_theory_of_change_framework(self, request, evaluation): """Apply theory of change evaluation framework.""" pass # Implementation would depend on specific requirements async def _apply_cost_effectiveness_framework(self, request, evaluation): """Apply cost-effectiveness evaluation framework.""" pass # Implementation would depend on specific requirements