""" Module: agents.deodoro Codinome: Deodoro da Fonseca - Fundador da Arquitetura Multi-Agente Description: Base agent class for all Cidadão.AI agents Author: Anderson H. Silva Date: 2025-01-24 License: Proprietary - All rights reserved """ from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from typing import Any, Dict, List, Optional, Type from uuid import uuid4 from pydantic import BaseModel, Field as PydanticField from src.core import AgentStatus, get_logger from src.core.exceptions import AgentError, AgentExecutionError from src.infrastructure.observability.metrics import metrics_manager, BusinessMetrics from src.services.agent_metrics import MetricsCollector import time @dataclass class AgentContext: """Context shared between agents.""" investigation_id: str = field(default_factory=lambda: str(uuid4())) user_id: Optional[str] = None session_id: Optional[str] = None timestamp: datetime = field(default_factory=datetime.utcnow) metadata: Dict[str, Any] = field(default_factory=dict) memory_context: Dict[str, Any] = field(default_factory=dict) parent_agent: Optional[str] = None trace_id: Optional[str] = None def to_dict(self) -> Dict[str, Any]: """Convert context to dictionary.""" return { "investigation_id": self.investigation_id, "user_id": self.user_id, "session_id": self.session_id, "timestamp": self.timestamp.isoformat(), "metadata": self.metadata, "memory_context": self.memory_context, "parent_agent": self.parent_agent, "trace_id": self.trace_id, } class AgentMessage(BaseModel): """Message passed between agents.""" sender: str = PydanticField(..., description="Agent that sent the message") recipient: str = PydanticField(..., description="Agent that should receive the message") action: str = PydanticField(..., description="Action to perform") payload: Dict[str, Any] = PydanticField(default_factory=dict, description="Message payload") context: Dict[str, Any] = PydanticField(default_factory=dict, description="Message context") timestamp: datetime = PydanticField(default_factory=datetime.utcnow) message_id: str = PydanticField(default_factory=lambda: str(uuid4())) requires_response: bool = PydanticField(default=True, description="Whether response is expected") class AgentResponse(BaseModel): """Response from an agent.""" agent_name: str = PydanticField(..., description="Name of the responding agent") status: AgentStatus = PydanticField(..., description="Agent status") result: Optional[Any] = PydanticField(default=None, description="Result of the action") error: Optional[str] = PydanticField(default=None, description="Error message if failed") metadata: Dict[str, Any] = PydanticField(default_factory=dict, description="Response metadata") timestamp: datetime = PydanticField(default_factory=datetime.utcnow) processing_time_ms: Optional[float] = PydanticField(default=None, description="Processing time") class BaseAgent(ABC): """Abstract base class for all agents in the system.""" def __init__( self, name: str, description: str, capabilities: List[str], max_retries: int = 3, timeout: int = 60, ) -> None: """ Initialize base agent. Args: name: Agent name description: Agent description capabilities: List of agent capabilities max_retries: Maximum number of retries timeout: Timeout in seconds """ self.name = name self.description = description self.capabilities = capabilities self.max_retries = max_retries self.timeout = timeout self.status = AgentStatus.IDLE self.logger = get_logger(f"agent.{name}") self._message_history: List[AgentMessage] = [] self._response_history: List[AgentResponse] = [] self.logger.info( "agent_initialized", agent_name=self.name, capabilities=self.capabilities, ) @abstractmethod async def process( self, message: AgentMessage, context: AgentContext, ) -> AgentResponse: """ Process a message and return a response. Args: message: Message to process context: Agent context Returns: Agent response Raises: AgentExecutionError: If processing fails """ pass @abstractmethod async def initialize(self) -> None: """Initialize agent resources.""" pass @abstractmethod async def shutdown(self) -> None: """Cleanup agent resources.""" pass async def execute( self, action: str, payload: Dict[str, Any], context: AgentContext, ) -> AgentResponse: """ Execute an action with retry logic. Args: action: Action to execute payload: Action payload context: Agent context Returns: Agent response """ message = AgentMessage( sender=context.parent_agent or "system", recipient=self.name, action=action, payload=payload, context=context.to_dict(), ) start_time = datetime.utcnow() perf_start_time = time.time() retries = 0 last_error = None # Increment task counter AGENT_TASK_COUNT.labels( agent_type=self.name, task_type=action, status="started" ).inc() while retries <= self.max_retries: try: self.status = AgentStatus.THINKING self.logger.info( "agent_executing", agent_name=self.name, action=action, retry=retries, ) # Process the message response = await self.process(message, context) # Calculate processing time processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000 response.processing_time_ms = processing_time # Record metrics using centralized metrics manager metrics_manager.increment_counter( "cidadao_ai_agent_tasks_total", labels={ "agent_name": self.name, "task_type": action, "status": "completed" } ) BusinessMetrics.record_agent_task( agent_name=self.name, task_type=action, duration_seconds=processing_time / 1000.0, status="success" ) # Update status self.status = AgentStatus.COMPLETED # Store in history self._message_history.append(message) self._response_history.append(response) self.logger.info( "agent_execution_completed", agent_name=self.name, action=action, processing_time_ms=processing_time, ) return response except Exception as e: last_error = str(e) self.logger.error( "agent_execution_failed", agent_name=self.name, action=action, error=last_error, retry=retries, ) # Record retry attempt metrics_manager.increment_counter( "cidadao_ai_agent_tasks_total", labels={ "agent_name": self.name, "task_type": action, "status": "retry" } ) retries += 1 if retries <= self.max_retries: # Exponential backoff await self._wait(2 ** retries) # All retries exhausted - record failure metrics_manager.increment_counter( "cidadao_ai_agent_tasks_total", labels={ "agent_name": self.name, "task_type": action, "status": "failed" } ) BusinessMetrics.record_agent_task( agent_name=self.name, task_type=action, duration_seconds=(datetime.utcnow() - start_time).total_seconds(), status="failed" ) self.status = AgentStatus.ERROR error_response = AgentResponse( agent_name=self.name, status=AgentStatus.ERROR, error=f"Failed after {self.max_retries} retries: {last_error}", metadata={"action": action, "retries": retries}, ) self._response_history.append(error_response) raise AgentExecutionError( f"Agent {self.name} failed to execute {action}: {last_error}", details={"agent": self.name, "action": action, "error": last_error} ) async def _wait(self, seconds: float) -> None: """Wait for specified seconds (async-friendly).""" import asyncio await asyncio.sleep(seconds) def can_handle(self, action: str) -> bool: """ Check if agent can handle the given action. Args: action: Action to check Returns: True if agent can handle the action """ return action in self.capabilities def get_status(self) -> Dict[str, Any]: """Get agent status information.""" return { "name": self.name, "description": self.description, "status": self.status.value, "capabilities": self.capabilities, "message_count": len(self._message_history), "response_count": len(self._response_history), } def get_history( self, limit: Optional[int] = None ) -> Dict[str, List[Dict[str, Any]]]: """ Get agent message and response history. Args: limit: Maximum number of entries to return Returns: Dictionary with message and response history """ if limit is None: messages = self._message_history responses = self._response_history elif limit == 0: messages = [] responses = [] else: messages = self._message_history[-limit:] responses = self._response_history[-limit:] return { "messages": [msg.model_dump() for msg in messages], "responses": [resp.model_dump() for resp in responses], } def clear_history(self) -> None: """Clear agent history.""" self._message_history.clear() self._response_history.clear() self.logger.info("agent_history_cleared", agent_name=self.name) def __repr__(self) -> str: """String representation of agent.""" return f"<{self.__class__.__name__}(name='{self.name}', status={self.status.value})>" class ReflectiveAgent(BaseAgent): """Base class for agents with reflection capabilities.""" def __init__( self, name: str, description: str, capabilities: List[str], reflection_threshold: float = 0.7, max_reflection_loops: int = 3, **kwargs: Any ) -> None: """ Initialize reflective agent. Args: name: Agent name description: Agent description capabilities: List of capabilities reflection_threshold: Minimum quality threshold max_reflection_loops: Maximum reflection iterations **kwargs: Additional arguments for BaseAgent """ super().__init__(name, description, capabilities, **kwargs) self.reflection_threshold = reflection_threshold self.max_reflection_loops = max_reflection_loops @abstractmethod async def reflect( self, result: Any, context: AgentContext, ) -> Dict[str, Any]: """ Reflect on the result and provide quality assessment. Args: result: Result to reflect on context: Agent context Returns: Reflection result with quality score and improvements """ pass async def process_with_reflection( self, message: AgentMessage, context: AgentContext, ) -> AgentResponse: """ Process message with reflection loop. Args: message: Message to process context: Agent context Returns: Agent response after reflection """ reflection_count = 0 current_result = None while reflection_count < self.max_reflection_loops: # Process the message if reflection_count == 0: current_result = await self.process(message, context) else: # Modify message based on reflection feedback message_data = message.model_dump() message_data["payload"] = { **message.payload, "reflection_feedback": current_result.metadata.get("reflection", {}), "reflection_iteration": reflection_count, } reflected_message = AgentMessage(**message_data) current_result = await self.process(reflected_message, context) # Reflect on the result reflection = await self.reflect(current_result, context) quality_score = reflection.get("quality_score", 0.0) self.logger.info( "agent_reflection", agent_name=self.name, reflection_count=reflection_count, quality_score=quality_score, ) # Check if quality threshold is met if quality_score >= self.reflection_threshold: current_result.metadata["reflection"] = reflection current_result.metadata["reflection_count"] = reflection_count + 1 return current_result # Store reflection feedback for next iteration current_result.metadata["reflection"] = reflection reflection_count += 1 # Max reflections reached self.logger.warning( "max_reflections_reached", agent_name=self.name, reflection_count=reflection_count, ) current_result.metadata["max_reflections_reached"] = True return current_result