|
|
""" |
|
|
Module: agents.ayrton_senna |
|
|
Codinome: Ayrton Senna - Navegador das Rotas Perfeitas |
|
|
Description: Semantic router for directing queries to appropriate agents with precision and speed |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-24 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
import re |
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
|
|
|
from pydantic import BaseModel, Field as PydanticField |
|
|
|
|
|
from src.core import AgentStatus, get_logger |
|
|
from src.core.exceptions import AgentError, ValidationError |
|
|
from .deodoro import ( |
|
|
AgentContext, |
|
|
AgentMessage, |
|
|
AgentResponse, |
|
|
BaseAgent, |
|
|
) |
|
|
|
|
|
|
|
|
class RoutingRule(BaseModel): |
|
|
"""Rule for routing queries to agents.""" |
|
|
|
|
|
name: str = PydanticField(..., description="Rule name") |
|
|
patterns: List[str] = PydanticField(..., description="Regex patterns to match") |
|
|
keywords: List[str] = PydanticField(default_factory=list, description="Keywords to match") |
|
|
target_agent: str = PydanticField(..., description="Target agent name") |
|
|
action: str = PydanticField(..., description="Action to perform") |
|
|
priority: int = PydanticField(default=5, description="Rule priority (1-10)") |
|
|
confidence_threshold: float = PydanticField(default=0.7, description="Confidence threshold") |
|
|
metadata: Dict[str, Any] = PydanticField(default_factory=dict, description="Additional metadata") |
|
|
|
|
|
|
|
|
class RoutingDecision(BaseModel): |
|
|
"""Result of routing decision.""" |
|
|
|
|
|
target_agent: str = PydanticField(..., description="Selected agent") |
|
|
action: str = PydanticField(..., description="Action to perform") |
|
|
confidence: float = PydanticField(..., description="Confidence in decision") |
|
|
rule_used: str = PydanticField(..., description="Rule that matched") |
|
|
parameters: Dict[str, Any] = PydanticField(default_factory=dict, description="Parameters for agent") |
|
|
fallback_agents: List[str] = PydanticField(default_factory=list, description="Fallback agents") |
|
|
|
|
|
|
|
|
class SemanticRouter(BaseAgent): |
|
|
""" |
|
|
Semantic router that analyzes queries and routes them to appropriate agents. |
|
|
|
|
|
The router uses: |
|
|
- Rule-based routing with regex patterns and keywords |
|
|
- Semantic similarity for complex queries |
|
|
- Intent detection for conversational flows |
|
|
- Fallback strategies for ambiguous cases |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
llm_service: Any, |
|
|
embedding_service: Optional[Any] = None, |
|
|
confidence_threshold: float = 0.7, |
|
|
**kwargs: Any |
|
|
) -> None: |
|
|
""" |
|
|
Initialize semantic router. |
|
|
|
|
|
Args: |
|
|
llm_service: LLM service for intent detection |
|
|
embedding_service: Embedding service for semantic similarity |
|
|
confidence_threshold: Minimum confidence for routing decisions |
|
|
**kwargs: Additional arguments |
|
|
""" |
|
|
super().__init__( |
|
|
name="SemanticRouter", |
|
|
description="Routes queries to appropriate agents based on semantic analysis", |
|
|
capabilities=[ |
|
|
"route_query", |
|
|
"detect_intent", |
|
|
"analyze_query_type", |
|
|
"suggest_agents", |
|
|
"validate_routing", |
|
|
], |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
self.llm_service = llm_service |
|
|
self.embedding_service = embedding_service |
|
|
self.confidence_threshold = confidence_threshold |
|
|
self.routing_rules: List[RoutingRule] = [] |
|
|
self.agent_capabilities: Dict[str, List[str]] = {} |
|
|
|
|
|
self._initialize_default_rules() |
|
|
|
|
|
self.logger.info( |
|
|
"semantic_router_initialized", |
|
|
confidence_threshold=confidence_threshold, |
|
|
rules_count=len(self.routing_rules), |
|
|
) |
|
|
|
|
|
async def initialize(self) -> None: |
|
|
"""Initialize semantic router.""" |
|
|
self.logger.info("semantic_router_initializing") |
|
|
|
|
|
|
|
|
if hasattr(self.llm_service, 'initialize'): |
|
|
await self.llm_service.initialize() |
|
|
|
|
|
if self.embedding_service and hasattr(self.embedding_service, 'initialize'): |
|
|
await self.embedding_service.initialize() |
|
|
|
|
|
self.status = AgentStatus.IDLE |
|
|
self.logger.info("semantic_router_initialized") |
|
|
|
|
|
async def shutdown(self) -> None: |
|
|
"""Shutdown semantic router.""" |
|
|
self.logger.info("semantic_router_shutting_down") |
|
|
|
|
|
if hasattr(self.llm_service, 'shutdown'): |
|
|
await self.llm_service.shutdown() |
|
|
|
|
|
if self.embedding_service and hasattr(self.embedding_service, 'shutdown'): |
|
|
await self.embedding_service.shutdown() |
|
|
|
|
|
self.logger.info("semantic_router_shutdown_complete") |
|
|
|
|
|
async def process( |
|
|
self, |
|
|
message: AgentMessage, |
|
|
context: AgentContext, |
|
|
) -> AgentResponse: |
|
|
""" |
|
|
Process routing requests. |
|
|
|
|
|
Args: |
|
|
message: Message to process |
|
|
context: Agent context |
|
|
|
|
|
Returns: |
|
|
Agent response with routing decision |
|
|
""" |
|
|
action = message.action |
|
|
payload = message.payload |
|
|
|
|
|
self.logger.info( |
|
|
"semantic_router_processing", |
|
|
action=action, |
|
|
context_id=context.investigation_id, |
|
|
) |
|
|
|
|
|
try: |
|
|
if action == "route_query": |
|
|
result = await self._route_query(payload, context) |
|
|
elif action == "detect_intent": |
|
|
result = await self._detect_intent(payload, context) |
|
|
elif action == "analyze_query_type": |
|
|
result = await self._analyze_query_type(payload, context) |
|
|
elif action == "suggest_agents": |
|
|
result = await self._suggest_agents(payload, context) |
|
|
elif action == "validate_routing": |
|
|
result = await self._validate_routing(payload, context) |
|
|
else: |
|
|
raise AgentError( |
|
|
f"Unknown action: {action}", |
|
|
details={"action": action, "available_actions": self.capabilities} |
|
|
) |
|
|
|
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
status=AgentStatus.COMPLETED, |
|
|
result=result, |
|
|
metadata={"action": action, "context_id": context.investigation_id}, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error( |
|
|
"semantic_router_processing_failed", |
|
|
action=action, |
|
|
error=str(e), |
|
|
context_id=context.investigation_id, |
|
|
) |
|
|
|
|
|
return AgentResponse( |
|
|
agent_name=self.name, |
|
|
status=AgentStatus.ERROR, |
|
|
error=str(e), |
|
|
metadata={"action": action, "context_id": context.investigation_id}, |
|
|
) |
|
|
|
|
|
def register_agent_capabilities( |
|
|
self, |
|
|
agent_name: str, |
|
|
capabilities: List[str], |
|
|
) -> None: |
|
|
""" |
|
|
Register agent capabilities for routing decisions. |
|
|
|
|
|
Args: |
|
|
agent_name: Name of the agent |
|
|
capabilities: List of capabilities |
|
|
""" |
|
|
self.agent_capabilities[agent_name] = capabilities |
|
|
self.logger.info( |
|
|
"agent_capabilities_registered", |
|
|
agent_name=agent_name, |
|
|
capabilities=capabilities, |
|
|
) |
|
|
|
|
|
def add_routing_rule(self, rule: RoutingRule) -> None: |
|
|
""" |
|
|
Add a custom routing rule. |
|
|
|
|
|
Args: |
|
|
rule: Routing rule to add |
|
|
""" |
|
|
self.routing_rules.append(rule) |
|
|
|
|
|
self.routing_rules.sort(key=lambda r: r.priority, reverse=True) |
|
|
|
|
|
self.logger.info( |
|
|
"routing_rule_added", |
|
|
rule_name=rule.name, |
|
|
target_agent=rule.target_agent, |
|
|
priority=rule.priority, |
|
|
) |
|
|
|
|
|
async def route_query( |
|
|
self, |
|
|
query: str, |
|
|
context: AgentContext, |
|
|
user_preferences: Optional[Dict[str, Any]] = None, |
|
|
) -> RoutingDecision: |
|
|
""" |
|
|
Route a query to the most appropriate agent. |
|
|
|
|
|
Args: |
|
|
query: Query to route |
|
|
context: Agent context |
|
|
user_preferences: Optional user preferences |
|
|
|
|
|
Returns: |
|
|
Routing decision |
|
|
""" |
|
|
self.logger.info( |
|
|
"routing_query", |
|
|
query=query[:100], |
|
|
context_id=context.investigation_id, |
|
|
) |
|
|
|
|
|
|
|
|
rule_decision = await self._apply_routing_rules(query, context) |
|
|
|
|
|
if rule_decision and rule_decision.confidence >= self.confidence_threshold: |
|
|
self.logger.info( |
|
|
"rule_based_routing_success", |
|
|
target_agent=rule_decision.target_agent, |
|
|
confidence=rule_decision.confidence, |
|
|
rule=rule_decision.rule_used, |
|
|
) |
|
|
return rule_decision |
|
|
|
|
|
|
|
|
semantic_decision = await self._semantic_routing(query, context) |
|
|
|
|
|
if semantic_decision and semantic_decision.confidence >= self.confidence_threshold: |
|
|
self.logger.info( |
|
|
"semantic_routing_success", |
|
|
target_agent=semantic_decision.target_agent, |
|
|
confidence=semantic_decision.confidence, |
|
|
) |
|
|
return semantic_decision |
|
|
|
|
|
|
|
|
fallback_decision = RoutingDecision( |
|
|
target_agent="MasterAgent", |
|
|
action="investigate", |
|
|
confidence=0.5, |
|
|
rule_used="fallback", |
|
|
parameters={"query": query}, |
|
|
fallback_agents=["InvestigatorAgent", "AnalystAgent"], |
|
|
) |
|
|
|
|
|
self.logger.warning( |
|
|
"routing_fallback_used", |
|
|
query=query[:50], |
|
|
confidence=fallback_decision.confidence, |
|
|
) |
|
|
|
|
|
return fallback_decision |
|
|
|
|
|
async def _route_query( |
|
|
self, |
|
|
payload: Dict[str, Any], |
|
|
context: AgentContext, |
|
|
) -> RoutingDecision: |
|
|
"""Route query based on payload.""" |
|
|
query = payload.get("query", "") |
|
|
if not query: |
|
|
raise ValidationError("Query is required for routing") |
|
|
|
|
|
user_preferences = payload.get("user_preferences") |
|
|
return await self.route_query(query, context, user_preferences) |
|
|
|
|
|
async def _apply_routing_rules( |
|
|
self, |
|
|
query: str, |
|
|
context: AgentContext, |
|
|
) -> Optional[RoutingDecision]: |
|
|
"""Apply rule-based routing.""" |
|
|
query_lower = query.lower() |
|
|
|
|
|
for rule in self.routing_rules: |
|
|
confidence = 0.0 |
|
|
|
|
|
|
|
|
pattern_matches = 0 |
|
|
for pattern in rule.patterns: |
|
|
if re.search(pattern, query_lower, re.IGNORECASE): |
|
|
pattern_matches += 1 |
|
|
|
|
|
if rule.patterns: |
|
|
confidence += (pattern_matches / len(rule.patterns)) * 0.6 |
|
|
|
|
|
|
|
|
keyword_matches = 0 |
|
|
for keyword in rule.keywords: |
|
|
if keyword.lower() in query_lower: |
|
|
keyword_matches += 1 |
|
|
|
|
|
if rule.keywords: |
|
|
confidence += (keyword_matches / len(rule.keywords)) * 0.4 |
|
|
|
|
|
|
|
|
confidence = min(confidence * (rule.priority / 10), 1.0) |
|
|
|
|
|
if confidence >= rule.confidence_threshold: |
|
|
return RoutingDecision( |
|
|
target_agent=rule.target_agent, |
|
|
action=rule.action, |
|
|
confidence=confidence, |
|
|
rule_used=rule.name, |
|
|
parameters={"query": query, **rule.metadata}, |
|
|
) |
|
|
|
|
|
return None |
|
|
|
|
|
async def _semantic_routing( |
|
|
self, |
|
|
query: str, |
|
|
context: AgentContext, |
|
|
) -> Optional[RoutingDecision]: |
|
|
"""Use LLM for semantic routing.""" |
|
|
try: |
|
|
routing_prompt = self._create_routing_prompt(query) |
|
|
|
|
|
response = await self.llm_service.generate( |
|
|
prompt=routing_prompt, |
|
|
context=context, |
|
|
) |
|
|
|
|
|
|
|
|
decision = self._parse_routing_response(response, query) |
|
|
return decision |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error( |
|
|
"semantic_routing_failed", |
|
|
query=query[:50], |
|
|
error=str(e), |
|
|
) |
|
|
return None |
|
|
|
|
|
async def _detect_intent( |
|
|
self, |
|
|
payload: Dict[str, Any], |
|
|
context: AgentContext, |
|
|
) -> Dict[str, Any]: |
|
|
"""Detect intent from query.""" |
|
|
query = payload.get("query", "") |
|
|
|
|
|
|
|
|
intents = { |
|
|
"investigation": ["investigar", "analisar", "verificar", "buscar"], |
|
|
"explanation": ["explicar", "entender", "como", "por que"], |
|
|
"comparison": ["comparar", "diferença", "melhor", "versus"], |
|
|
"trend_analysis": ["tendência", "evolução", "histórico", "ao longo"], |
|
|
"anomaly_detection": ["suspeito", "anômalo", "irregular", "estranho"], |
|
|
} |
|
|
|
|
|
query_lower = query.lower() |
|
|
detected_intents = [] |
|
|
|
|
|
for intent, keywords in intents.items(): |
|
|
confidence = sum(1 for keyword in keywords if keyword in query_lower) |
|
|
if confidence > 0: |
|
|
detected_intents.append({ |
|
|
"intent": intent, |
|
|
"confidence": min(confidence / len(keywords), 1.0), |
|
|
}) |
|
|
|
|
|
|
|
|
detected_intents.sort(key=lambda x: x["confidence"], reverse=True) |
|
|
|
|
|
return { |
|
|
"query": query, |
|
|
"intents": detected_intents, |
|
|
"primary_intent": detected_intents[0]["intent"] if detected_intents else "unknown", |
|
|
} |
|
|
|
|
|
async def _analyze_query_type( |
|
|
self, |
|
|
payload: Dict[str, Any], |
|
|
context: AgentContext, |
|
|
) -> Dict[str, Any]: |
|
|
"""Analyze query type and complexity.""" |
|
|
query = payload.get("query", "") |
|
|
|
|
|
|
|
|
analysis = { |
|
|
"length": len(query), |
|
|
"word_count": len(query.split()), |
|
|
"has_numbers": bool(re.search(r'\d', query)), |
|
|
"has_dates": bool(re.search(r'\d{4}|\d{2}/\d{2}', query)), |
|
|
"has_organizations": bool(re.search(r'ministério|prefeitura|secretaria', query, re.IGNORECASE)), |
|
|
"complexity": "simple", |
|
|
} |
|
|
|
|
|
|
|
|
if analysis["word_count"] > 20 or "e" in query.lower(): |
|
|
analysis["complexity"] = "complex" |
|
|
elif analysis["word_count"] > 10: |
|
|
analysis["complexity"] = "medium" |
|
|
|
|
|
return analysis |
|
|
|
|
|
async def _suggest_agents( |
|
|
self, |
|
|
payload: Dict[str, Any], |
|
|
context: AgentContext, |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Suggest possible agents for a query.""" |
|
|
query = payload.get("query", "") |
|
|
|
|
|
suggestions = [] |
|
|
|
|
|
|
|
|
for agent_name, capabilities in self.agent_capabilities.items(): |
|
|
score = 0.0 |
|
|
reasons = [] |
|
|
|
|
|
query_lower = query.lower() |
|
|
|
|
|
|
|
|
if "investigar" in query_lower and "investigate" in capabilities: |
|
|
score += 0.8 |
|
|
reasons.append("Query requires investigation") |
|
|
|
|
|
if "analisar" in query_lower and "analyze" in capabilities: |
|
|
score += 0.7 |
|
|
reasons.append("Query requires analysis") |
|
|
|
|
|
if "relatório" in query_lower and "report" in capabilities: |
|
|
score += 0.6 |
|
|
reasons.append("Query mentions reports") |
|
|
|
|
|
if score > 0: |
|
|
suggestions.append({ |
|
|
"agent_name": agent_name, |
|
|
"score": score, |
|
|
"reasons": reasons, |
|
|
"capabilities": capabilities, |
|
|
}) |
|
|
|
|
|
|
|
|
suggestions.sort(key=lambda x: x["score"], reverse=True) |
|
|
|
|
|
return suggestions |
|
|
|
|
|
async def _validate_routing( |
|
|
self, |
|
|
payload: Dict[str, Any], |
|
|
context: AgentContext, |
|
|
) -> Dict[str, Any]: |
|
|
"""Validate a routing decision.""" |
|
|
decision_data = payload.get("decision", {}) |
|
|
|
|
|
target_agent = decision_data.get("target_agent") |
|
|
action = decision_data.get("action") |
|
|
|
|
|
validation = { |
|
|
"valid": True, |
|
|
"warnings": [], |
|
|
"errors": [], |
|
|
} |
|
|
|
|
|
|
|
|
if target_agent not in self.agent_capabilities: |
|
|
validation["valid"] = False |
|
|
validation["errors"].append(f"Agent {target_agent} not registered") |
|
|
|
|
|
|
|
|
elif action not in self.agent_capabilities.get(target_agent, []): |
|
|
validation["warnings"].append(f"Agent {target_agent} may not support action {action}") |
|
|
|
|
|
return validation |
|
|
|
|
|
def _initialize_default_rules(self) -> None: |
|
|
"""Initialize default routing rules.""" |
|
|
rules = [ |
|
|
|
|
|
RoutingRule( |
|
|
name="investigation_query", |
|
|
patterns=[r"investigar|verificar|analisar.*gasto"], |
|
|
keywords=["investigar", "verificar", "analisar", "suspeito"], |
|
|
target_agent="MasterAgent", |
|
|
action="investigate", |
|
|
priority=9, |
|
|
), |
|
|
|
|
|
|
|
|
RoutingRule( |
|
|
name="anomaly_detection", |
|
|
patterns=[r"suspeito|anômalo|irregular|estranho"], |
|
|
keywords=["suspeito", "anômalo", "irregular", "superfaturamento"], |
|
|
target_agent="InvestigatorAgent", |
|
|
action="detect_anomalies", |
|
|
priority=8, |
|
|
), |
|
|
|
|
|
|
|
|
RoutingRule( |
|
|
name="pattern_analysis", |
|
|
patterns=[r"padrão|tendência|evolução"], |
|
|
keywords=["padrão", "tendência", "evolução", "histórico"], |
|
|
target_agent="AnalystAgent", |
|
|
action="analyze_patterns", |
|
|
priority=7, |
|
|
), |
|
|
|
|
|
|
|
|
RoutingRule( |
|
|
name="report_generation", |
|
|
patterns=[r"relatório|resumo|gerar.*relatório"], |
|
|
keywords=["relatório", "resumo", "documento"], |
|
|
target_agent="ReporterAgent", |
|
|
action="generate_report", |
|
|
priority=6, |
|
|
), |
|
|
|
|
|
|
|
|
RoutingRule( |
|
|
name="memory_query", |
|
|
patterns=[r"lembrar|anterior|histórico.*investigação"], |
|
|
keywords=["lembrar", "anterior", "histórico"], |
|
|
target_agent="ContextMemoryAgent", |
|
|
action="retrieve_episodic", |
|
|
priority=5, |
|
|
), |
|
|
] |
|
|
|
|
|
for rule in rules: |
|
|
self.routing_rules.append(rule) |
|
|
|
|
|
|
|
|
self.routing_rules.sort(key=lambda r: r.priority, reverse=True) |
|
|
|
|
|
def _create_routing_prompt(self, query: str) -> str: |
|
|
"""Create prompt for LLM-based routing.""" |
|
|
agents_info = [] |
|
|
for agent_name, capabilities in self.agent_capabilities.items(): |
|
|
agents_info.append(f"- {agent_name}: {', '.join(capabilities)}") |
|
|
|
|
|
agents_text = "\n".join(agents_info) if agents_info else "- MasterAgent: investigate, coordinate" |
|
|
|
|
|
return f""" |
|
|
Analise a seguinte consulta e determine qual agente é mais adequado para processá-la: |
|
|
|
|
|
Consulta: "{query}" |
|
|
|
|
|
Agentes disponíveis: |
|
|
{agents_text} |
|
|
|
|
|
Responda em formato JSON: |
|
|
{{ |
|
|
"target_agent": "nome_do_agente", |
|
|
"action": "ação_a_executar", |
|
|
"confidence": 0.8, |
|
|
"reasoning": "explicação da escolha" |
|
|
}} |
|
|
""" |
|
|
|
|
|
def _parse_routing_response( |
|
|
self, |
|
|
response: str, |
|
|
query: str, |
|
|
) -> Optional[RoutingDecision]: |
|
|
"""Parse LLM routing response.""" |
|
|
try: |
|
|
import json |
|
|
|
|
|
|
|
|
json_start = response.find('{') |
|
|
json_end = response.rfind('}') + 1 |
|
|
|
|
|
if json_start >= 0 and json_end > json_start: |
|
|
json_str = response[json_start:json_end] |
|
|
data = json.loads(json_str) |
|
|
|
|
|
return RoutingDecision( |
|
|
target_agent=data.get("target_agent", "MasterAgent"), |
|
|
action=data.get("action", "investigate"), |
|
|
confidence=data.get("confidence", 0.5), |
|
|
rule_used="llm_semantic", |
|
|
parameters={"query": query, "reasoning": data.get("reasoning", "")}, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error( |
|
|
"routing_response_parse_failed", |
|
|
response=response[:100], |
|
|
error=str(e), |
|
|
) |
|
|
|
|
|
return None |