neural-thinker's picture
feat: clean HuggingFace deployment with essential files only
824bf31
"""
Module: agents.bonifacio_agent
Description: José Bonifácio - Public Policy Agent specialized in analyzing policy effectiveness
Author: Anderson H. Silva
Date: 2025-01-24
License: Proprietary - All rights reserved
"""
import asyncio
import hashlib
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import statistics
import numpy as np
import pandas as pd
from pydantic import BaseModel, Field as PydanticField
from src.agents.deodoro import BaseAgent, AgentContext, AgentMessage, AgentResponse
from src.core import get_logger
from src.core.exceptions import AgentExecutionError, DataAnalysisError
class PolicyStatus(Enum):
"""Status of policy analysis."""
ACTIVE = "active"
INACTIVE = "inactive"
UNDER_REVIEW = "under_review"
DISCONTINUED = "discontinued"
PLANNED = "planned"
class ImpactLevel(Enum):
"""Impact level classification."""
VERY_LOW = "very_low"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
VERY_HIGH = "very_high"
@dataclass
class PolicyIndicator:
"""Policy performance indicator."""
name: str
baseline_value: float
current_value: float
target_value: float
unit: str
data_source: str
last_update: datetime
statistical_significance: float
trend: str # "improving", "deteriorating", "stable"
@dataclass
class PolicyEvaluation:
"""Comprehensive policy evaluation result."""
policy_id: str
policy_name: str
analysis_period: Tuple[datetime, datetime]
status: PolicyStatus
investment: Dict[str, float] # planned, executed, deviation
beneficiaries: Dict[str, Any] # target, reached, cost_per_capita
indicators: List[PolicyIndicator]
effectiveness_score: Dict[str, float] # efficacy, efficiency, effectiveness
roi_social: float
sustainability_score: int # 0-100
impact_level: ImpactLevel
recommendations: List[Dict[str, Any]]
evidence_sources: List[str]
analysis_confidence: float
hash_verification: str
class PolicyAnalysisRequest(BaseModel):
"""Request for public policy analysis."""
policy_name: str = PydanticField(description="Name or description of the policy")
policy_area: Optional[str] = PydanticField(default=None, description="Policy area (health, education, security, etc)")
geographical_scope: Optional[str] = PydanticField(default=None, description="Geographic scope (municipal, state, federal)")
analysis_period: Optional[Tuple[str, str]] = PydanticField(default=None, description="Analysis period (start, end)")
budget_data: Optional[Dict[str, float]] = PydanticField(default=None, description="Budget information")
target_indicators: Optional[List[str]] = PydanticField(default=None, description="Specific indicators to analyze")
comparison_policies: Optional[List[str]] = PydanticField(default=None, description="Other policies to compare with")
benchmarking_scope: str = PydanticField(default="national", description="Benchmarking scope")
class BonifacioAgent(BaseAgent):
"""
José Bonifácio - Public Policy Agent
Specialized in analyzing public policy effectiveness, efficiency, and impact.
Evaluates institutional reforms and measures social return on investment.
Inspired by José Bonifácio de Andrada e Silva, the "Patriarch of Independence"
and architect of Brazilian institutional foundations.
"""
def __init__(self):
super().__init__(
name="bonifacio",
description="Public Policy Agent specialized in analyzing policy effectiveness and institutional reforms",
capabilities=[
"policy_effectiveness_analysis",
"institutional_reform_evaluation",
"social_roi_calculation",
"policy_impact_assessment",
"benchmarking_analysis",
"cost_benefit_analysis",
"stakeholder_impact_mapping",
"policy_sustainability_scoring",
"implementation_gap_analysis",
"evidence_based_recommendations",
"statistical_significance_testing",
"longitudinal_policy_tracking",
"comparative_policy_analysis",
"resource_allocation_optimization"
]
)
self.logger = get_logger("agent.bonifacio")
# Policy evaluation frameworks
self._evaluation_frameworks = {
"logic_model": self._apply_logic_model_framework,
"results_chain": self._apply_results_chain_framework,
"theory_of_change": self._apply_theory_of_change_framework,
"cost_effectiveness": self._apply_cost_effectiveness_framework
}
# Data sources for policy analysis
self._data_sources = [
"Portal da Transparência", "TCU", "CGU", "IBGE",
"IPEA", "DataSUS", "INEP", "SIAFI", "SICONV",
"Tesouro Nacional", "CAPES", "CNJ", "CNMP"
]
# Policy areas and their key indicators
self._policy_indicators = {
"education": ["literacy_rate", "school_completion", "pisa_scores", "teacher_quality"],
"health": ["mortality_rate", "vaccination_coverage", "hospital_capacity", "health_expenditure"],
"security": ["crime_rate", "homicide_rate", "police_effectiveness", "prison_population"],
"social": ["poverty_rate", "inequality_index", "employment_rate", "social_mobility"],
"infrastructure": ["road_quality", "internet_access", "urban_mobility", "housing_deficit"],
"environment": ["deforestation_rate", "air_quality", "water_quality", "renewable_energy"]
}
async def process(
self,
message: AgentMessage,
context: AgentContext,
) -> AgentResponse:
"""
Process public policy analysis request.
Args:
message: Policy analysis request
context: Agent execution context
Returns:
Comprehensive policy evaluation results
"""
try:
self.logger.info(
"Processing policy analysis request",
investigation_id=context.investigation_id,
message_type=message.type,
)
# Parse request
if isinstance(message.data, dict):
request = PolicyAnalysisRequest(**message.data)
else:
request = PolicyAnalysisRequest(policy_name=str(message.data))
# Perform comprehensive policy evaluation
evaluation = await self._evaluate_policy(request, context)
# Generate strategic recommendations
strategic_recommendations = await self._generate_strategic_recommendations(
evaluation, request, context
)
# Perform benchmarking analysis
benchmarking = await self._perform_benchmarking_analysis(
evaluation, request
)
response_data = {
"policy_id": evaluation.policy_id,
"timestamp": datetime.utcnow().isoformat(),
"agent": "bonifacio",
"analysis_type": "policy_effectiveness",
"policy_evaluation": {
"policy_name": evaluation.policy_name,
"status": evaluation.status.value,
"investment": evaluation.investment,
"beneficiaries": evaluation.beneficiaries,
"effectiveness_scores": evaluation.effectiveness_score,
"roi_social": evaluation.roi_social,
"sustainability_score": evaluation.sustainability_score,
"impact_level": evaluation.impact_level.value,
"analysis_confidence": evaluation.analysis_confidence
},
"indicators": [
{
"name": ind.name,
"baseline": ind.baseline_value,
"current": ind.current_value,
"target": ind.target_value,
"performance_ratio": ind.current_value / ind.baseline_value if ind.baseline_value != 0 else 1.0,
"goal_achievement": (ind.current_value / ind.target_value * 100) if ind.target_value != 0 else 0,
"trend": ind.trend,
"significance": ind.statistical_significance
}
for ind in evaluation.indicators
],
"strategic_recommendations": strategic_recommendations,
"benchmarking": benchmarking,
"evidence_sources": evaluation.evidence_sources,
"hash_verification": evaluation.hash_verification
}
self.logger.info(
"Policy analysis completed",
investigation_id=context.investigation_id,
policy_name=evaluation.policy_name,
effectiveness_score=evaluation.effectiveness_score.get("effectiveness", 0),
impact_level=evaluation.impact_level.value,
)
return AgentResponse(
agent_name=self.name,
response_type="policy_analysis",
data=response_data,
success=True,
context=context,
)
except Exception as e:
self.logger.error(
"Policy analysis failed",
investigation_id=context.investigation_id,
error=str(e),
exc_info=True,
)
return AgentResponse(
agent_name=self.name,
response_type="error",
data={"error": str(e), "analysis_type": "policy_effectiveness"},
success=False,
context=context,
)
async def _evaluate_policy(
self,
request: PolicyAnalysisRequest,
context: AgentContext
) -> PolicyEvaluation:
"""Perform comprehensive policy evaluation."""
self.logger.info(
"Starting policy evaluation",
policy_name=request.policy_name,
policy_area=request.policy_area,
)
# Generate policy ID
policy_id = hashlib.md5(
f"{request.policy_name}{request.policy_area}".encode()
).hexdigest()[:12]
# Simulate comprehensive evaluation (replace with real implementation)
await asyncio.sleep(3) # Simulate processing time
# Determine analysis period
if request.analysis_period:
period_start = datetime.strptime(request.analysis_period[0], "%Y-%m-%d")
period_end = datetime.strptime(request.analysis_period[1], "%Y-%m-%d")
else:
period_end = datetime.utcnow()
period_start = period_end - timedelta(days=365) # Last year
# Generate financial data
investment_data = await self._analyze_investment_data(request, context)
# Analyze beneficiaries
beneficiary_data = await self._analyze_beneficiaries(request, context)
# Evaluate indicators
indicators = await self._evaluate_policy_indicators(request, context)
# Calculate effectiveness scores
effectiveness_scores = await self._calculate_effectiveness_scores(
investment_data, beneficiary_data, indicators
)
# Calculate social ROI
social_roi = await self._calculate_social_roi(
investment_data, beneficiary_data, indicators
)
# Assess sustainability
sustainability_score = await self._assess_policy_sustainability(
request, investment_data, indicators
)
# Determine impact level
impact_level = self._classify_impact_level(effectiveness_scores, social_roi)
# Generate evidence hash
evidence_hash = self._generate_evidence_hash(
policy_id, investment_data, beneficiary_data, indicators
)
return PolicyEvaluation(
policy_id=policy_id,
policy_name=request.policy_name,
analysis_period=(period_start, period_end),
status=PolicyStatus.ACTIVE, # Assume active for now
investment=investment_data,
beneficiaries=beneficiary_data,
indicators=indicators,
effectiveness_score=effectiveness_scores,
roi_social=social_roi,
sustainability_score=sustainability_score,
impact_level=impact_level,
recommendations=[], # Will be filled by recommendation generator
evidence_sources=self._data_sources,
analysis_confidence=0.82,
hash_verification=evidence_hash
)
async def _analyze_investment_data(
self,
request: PolicyAnalysisRequest,
context: AgentContext
) -> Dict[str, float]:
"""Analyze policy investment and budget execution."""
# Use provided budget data or simulate
if request.budget_data:
planned = request.budget_data.get("planned", 0)
executed = request.budget_data.get("executed", 0)
else:
# Simulate budget data
planned = np.random.uniform(10_000_000, 500_000_000)
executed = planned * np.random.uniform(0.7, 1.2) # 70-120% execution
deviation = ((executed - planned) / planned) * 100 if planned > 0 else 0
return {
"planned": planned,
"executed": executed,
"deviation_percentage": deviation,
"cost_per_beneficiary": executed / max(1, np.random.randint(1000, 100000))
}
async def _analyze_beneficiaries(
self,
request: PolicyAnalysisRequest,
context: AgentContext
) -> Dict[str, Any]:
"""Analyze policy beneficiaries and coverage."""
# Simulate beneficiary analysis
target_population = np.random.randint(10000, 1000000)
reached_population = int(target_population * np.random.uniform(0.6, 1.1))
coverage_rate = (reached_population / target_population) * 100
return {
"target_population": target_population,
"reached_population": reached_population,
"coverage_rate": coverage_rate,
"demographic_breakdown": {
"urban": reached_population * 0.7,
"rural": reached_population * 0.3,
"vulnerable_groups": reached_population * 0.4
}
}
async def _evaluate_policy_indicators(
self,
request: PolicyAnalysisRequest,
context: AgentContext
) -> List[PolicyIndicator]:
"""Evaluate key policy performance indicators."""
indicators = []
# Get relevant indicators for policy area
policy_area = request.policy_area or "social"
relevant_indicators = self._policy_indicators.get(policy_area, ["generic_outcome"])
for indicator_name in relevant_indicators[:5]: # Limit to 5 indicators
baseline = np.random.uniform(10, 100)
current = baseline * np.random.uniform(0.8, 1.4) # -20% to +40% change
target = baseline * np.random.uniform(1.1, 1.5) # 10-50% improvement target
# Determine trend
if current > baseline * 1.05:
trend = "improving"
elif current < baseline * 0.95:
trend = "deteriorating"
else:
trend = "stable"
indicators.append(PolicyIndicator(
name=indicator_name,
baseline_value=baseline,
current_value=current,
target_value=target,
unit="rate" if "rate" in indicator_name else "index",
data_source=np.random.choice(self._data_sources[:5]),
last_update=datetime.utcnow() - timedelta(days=np.random.randint(1, 90)),
statistical_significance=np.random.uniform(0.7, 0.95),
trend=trend
))
return indicators
async def _calculate_effectiveness_scores(
self,
investment: Dict[str, float],
beneficiaries: Dict[str, Any],
indicators: List[PolicyIndicator]
) -> Dict[str, float]:
"""Calculate efficacy, efficiency, and effectiveness scores."""
# Efficacy: achievement of intended results
target_achievements = []
for ind in indicators:
if ind.target_value > 0:
achievement = min(1.0, ind.current_value / ind.target_value)
target_achievements.append(achievement)
efficacy = statistics.mean(target_achievements) if target_achievements else 0.5
# Efficiency: resource utilization
budget_efficiency = 1.0 - abs(investment["deviation_percentage"]) / 100
budget_efficiency = max(0.0, min(1.0, budget_efficiency))
coverage_efficiency = min(1.0, beneficiaries["coverage_rate"] / 100)
efficiency = (budget_efficiency + coverage_efficiency) / 2
# Effectiveness: overall impact considering costs and benefits
cost_effectiveness = efficacy / (investment["cost_per_beneficiary"] / 1000) if investment["cost_per_beneficiary"] > 0 else 0
cost_effectiveness = min(1.0, cost_effectiveness)
effectiveness = (efficacy * 0.4 + efficiency * 0.3 + cost_effectiveness * 0.3)
return {
"efficacy": round(efficacy, 3),
"efficiency": round(efficiency, 3),
"effectiveness": round(effectiveness, 3),
"cost_effectiveness": round(cost_effectiveness, 3)
}
async def _calculate_social_roi(
self,
investment: Dict[str, float],
beneficiaries: Dict[str, Any],
indicators: List[PolicyIndicator]
) -> float:
"""Calculate Social Return on Investment."""
# Estimate social benefits (simplified model)
total_investment = investment["executed"]
# Calculate benefits based on indicator improvements
social_benefits = 0
for ind in indicators:
improvement = max(0, ind.current_value - ind.baseline_value)
# Monetize improvement (simplified estimation)
benefit_per_unit = np.random.uniform(100, 1000) # R$ per unit improvement
social_benefits += improvement * benefit_per_unit * beneficiaries["reached_population"]
# Calculate ROI
if total_investment > 0:
social_roi = (social_benefits - total_investment) / total_investment
else:
social_roi = 0
return round(social_roi, 3)
async def _assess_policy_sustainability(
self,
request: PolicyAnalysisRequest,
investment: Dict[str, float],
indicators: List[PolicyIndicator]
) -> int:
"""Assess policy sustainability score (0-100)."""
sustainability_factors = []
# Budget sustainability
if abs(investment["deviation_percentage"]) < 10:
sustainability_factors.append(85) # Good budget control
elif abs(investment["deviation_percentage"]) < 25:
sustainability_factors.append(65) # Moderate control
else:
sustainability_factors.append(35) # Poor control
# Performance sustainability (trend analysis)
improving_indicators = len([ind for ind in indicators if ind.trend == "improving"])
total_indicators = len(indicators)
if total_indicators > 0:
performance_sustainability = (improving_indicators / total_indicators) * 100
sustainability_factors.append(performance_sustainability)
# Institutional capacity (simulated)
institutional_score = np.random.uniform(50, 90)
sustainability_factors.append(institutional_score)
# Political support (simulated)
political_score = np.random.uniform(40, 85)
sustainability_factors.append(political_score)
return int(statistics.mean(sustainability_factors))
def _classify_impact_level(
self,
effectiveness_scores: Dict[str, float],
social_roi: float
) -> ImpactLevel:
"""Classify policy impact level."""
overall_effectiveness = effectiveness_scores["effectiveness"]
if overall_effectiveness >= 0.8 and social_roi >= 2.0:
return ImpactLevel.VERY_HIGH
elif overall_effectiveness >= 0.7 and social_roi >= 1.0:
return ImpactLevel.HIGH
elif overall_effectiveness >= 0.5 and social_roi >= 0.5:
return ImpactLevel.MEDIUM
elif overall_effectiveness >= 0.3 and social_roi >= 0.0:
return ImpactLevel.LOW
else:
return ImpactLevel.VERY_LOW
async def _generate_strategic_recommendations(
self,
evaluation: PolicyEvaluation,
request: PolicyAnalysisRequest,
context: AgentContext
) -> List[Dict[str, Any]]:
"""Generate strategic policy recommendations."""
recommendations = []
# Budget recommendations
if abs(evaluation.investment["deviation_percentage"]) > 15:
recommendations.append({
"area": "budget_management",
"recommendation": "Implement enhanced budget monitoring and control mechanisms",
"priority": "high",
"expected_impact": 0.8,
"implementation_timeframe": "immediate",
"success_metrics": ["Reduce budget deviation to <10%"]
})
# Coverage recommendations
if evaluation.beneficiaries["coverage_rate"] < 80:
recommendations.append({
"area": "coverage_expansion",
"recommendation": "Expand outreach and improve access mechanisms",
"priority": "medium",
"expected_impact": 0.7,
"implementation_timeframe": "short_term",
"success_metrics": ["Increase coverage rate to >85%"]
})
# Performance recommendations
deteriorating_indicators = [ind for ind in evaluation.indicators if ind.trend == "deteriorating"]
if deteriorating_indicators:
recommendations.append({
"area": "performance_improvement",
"recommendation": f"Address declining performance in {len(deteriorating_indicators)} key indicators",
"priority": "high",
"expected_impact": 0.9,
"implementation_timeframe": "immediate",
"success_metrics": ["Reverse negative trends in all indicators"]
})
# Sustainability recommendations
if evaluation.sustainability_score < 70:
recommendations.append({
"area": "sustainability",
"recommendation": "Strengthen institutional capacity and long-term planning",
"priority": "medium",
"expected_impact": 0.6,
"implementation_timeframe": "medium_term",
"success_metrics": ["Achieve sustainability score >75"]
})
return recommendations
async def _perform_benchmarking_analysis(
self,
evaluation: PolicyEvaluation,
request: PolicyAnalysisRequest
) -> Dict[str, Any]:
"""Perform benchmarking against similar policies."""
# Simulate benchmarking data
benchmarking = {
"reference_policies": [
{"name": "Similar Policy A", "effectiveness": 0.72, "roi": 1.8},
{"name": "Similar Policy B", "effectiveness": 0.68, "roi": 1.4},
{"name": "Best Practice Example", "effectiveness": 0.85, "roi": 2.3}
],
"percentile_ranking": {
"effectiveness": np.random.randint(40, 95),
"efficiency": np.random.randint(35, 90),
"roi": np.random.randint(45, 88)
},
"improvement_potential": {
"effectiveness": max(0, 0.85 - evaluation.effectiveness_score["effectiveness"]),
"roi": max(0, 2.3 - evaluation.roi_social)
}
}
return benchmarking
def _generate_evidence_hash(
self,
policy_id: str,
investment: Dict[str, float],
beneficiaries: Dict[str, Any],
indicators: List[PolicyIndicator]
) -> str:
"""Generate SHA-256 hash for evidence verification."""
evidence_data = f"{policy_id}{investment['executed']}{beneficiaries['reached_population']}{len(indicators)}{datetime.utcnow().date()}"
return hashlib.sha256(evidence_data.encode()).hexdigest()
# Framework application methods
async def _apply_logic_model_framework(self, request, evaluation):
"""Apply logic model evaluation framework."""
pass # Implementation would depend on specific requirements
async def _apply_results_chain_framework(self, request, evaluation):
"""Apply results chain evaluation framework."""
pass # Implementation would depend on specific requirements
async def _apply_theory_of_change_framework(self, request, evaluation):
"""Apply theory of change evaluation framework."""
pass # Implementation would depend on specific requirements
async def _apply_cost_effectiveness_framework(self, request, evaluation):
"""Apply cost-effectiveness evaluation framework."""
pass # Implementation would depend on specific requirements