|
|
""" |
|
|
Chat service for managing conversations and intent detection |
|
|
""" |
|
|
from enum import Enum |
|
|
from dataclasses import dataclass |
|
|
from typing import Optional, List, Dict, Any |
|
|
from datetime import datetime |
|
|
import re |
|
|
from src.core import json_utils |
|
|
from collections import defaultdict |
|
|
|
|
|
from src.core import get_logger |
|
|
from src.services.cache_service import cache_service |
|
|
from src.agents import ( |
|
|
MasterAgent, InvestigatorAgent, AnalystAgent, |
|
|
ReporterAgent, BaseAgent |
|
|
) |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
class IntentType(Enum): |
|
|
"""Types of user intents |
|
|
|
|
|
Task-specific intents: |
|
|
- INVESTIGATE: Request investigation of contracts/expenses |
|
|
- ANALYZE: Request pattern/anomaly analysis |
|
|
- REPORT: Request report generation |
|
|
- STATUS: Check investigation status |
|
|
|
|
|
Conversational intents: |
|
|
- GREETING: Initial greeting/salutation |
|
|
- CONVERSATION: General conversation |
|
|
- HELP_REQUEST: Request for help/guidance |
|
|
- ABOUT_SYSTEM: Questions about the system |
|
|
- SMALLTALK: Casual conversation |
|
|
- THANKS: Gratitude expressions |
|
|
- GOODBYE: Farewell expressions |
|
|
|
|
|
Other: |
|
|
- QUESTION: General questions |
|
|
- HELP: Help (legacy, use HELP_REQUEST) |
|
|
- UNKNOWN: Could not determine intent |
|
|
""" |
|
|
|
|
|
INVESTIGATE = "investigate" |
|
|
ANALYZE = "analyze" |
|
|
REPORT = "report" |
|
|
STATUS = "status" |
|
|
|
|
|
|
|
|
GREETING = "greeting" |
|
|
CONVERSATION = "conversation" |
|
|
HELP_REQUEST = "help_request" |
|
|
ABOUT_SYSTEM = "about_system" |
|
|
SMALLTALK = "smalltalk" |
|
|
THANKS = "thanks" |
|
|
GOODBYE = "goodbye" |
|
|
|
|
|
|
|
|
QUESTION = "question" |
|
|
HELP = "help" |
|
|
UNKNOWN = "unknown" |
|
|
|
|
|
@dataclass |
|
|
class Intent: |
|
|
"""Detected user intent""" |
|
|
type: IntentType |
|
|
entities: Dict[str, Any] |
|
|
confidence: float |
|
|
suggested_agent: str |
|
|
|
|
|
def dict(self): |
|
|
return { |
|
|
"type": self.type.value, |
|
|
"entities": self.entities, |
|
|
"confidence": self.confidence, |
|
|
"suggested_agent": self.suggested_agent |
|
|
} |
|
|
|
|
|
@dataclass |
|
|
class ChatSession: |
|
|
"""Chat session data""" |
|
|
id: str |
|
|
user_id: Optional[str] |
|
|
created_at: datetime |
|
|
last_activity: datetime |
|
|
current_investigation_id: Optional[str] = None |
|
|
context: Dict[str, Any] = None |
|
|
|
|
|
def to_dict(self): |
|
|
return { |
|
|
"id": self.id, |
|
|
"user_id": self.user_id, |
|
|
"created_at": self.created_at.isoformat(), |
|
|
"last_activity": self.last_activity.isoformat(), |
|
|
"current_investigation_id": self.current_investigation_id, |
|
|
"context": self.context or {} |
|
|
} |
|
|
|
|
|
class IntentDetector: |
|
|
"""Detects user intent from messages""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
self.patterns = { |
|
|
IntentType.INVESTIGATE: [ |
|
|
r"investigar?\s+(\w+)", |
|
|
r"analis[ae]r?\s+contratos", |
|
|
r"verificar?\s+gastos", |
|
|
r"procurar?\s+irregularidades", |
|
|
r"detectar?\s+anomalias", |
|
|
r"buscar?\s+problemas" |
|
|
], |
|
|
IntentType.ANALYZE: [ |
|
|
r"anomalias?\s+", |
|
|
r"padr[õo]es?\s+suspeitos", |
|
|
r"gastos?\s+excessivos", |
|
|
r"fornecedores?\s+concentrados", |
|
|
r"an[áa]lise\s+de", |
|
|
r"mostrar?\s+gr[áa]ficos" |
|
|
], |
|
|
IntentType.REPORT: [ |
|
|
r"gerar?\s+relat[óo]rio", |
|
|
r"documento\s+", |
|
|
r"resumo\s+", |
|
|
r"exportar?\s+dados", |
|
|
r"baixar?\s+", |
|
|
r"pdf\s+" |
|
|
], |
|
|
IntentType.STATUS: [ |
|
|
r"status\s+", |
|
|
r"progresso\s+", |
|
|
r"como\s+est[áa]", |
|
|
r"andamento\s+" |
|
|
], |
|
|
IntentType.HELP: [ |
|
|
r"como\s+funciona", |
|
|
r"ajuda", |
|
|
r"help", |
|
|
r"o\s+que\s+[ée]", |
|
|
r"explicar?" |
|
|
], |
|
|
IntentType.GREETING: [ |
|
|
r"ol[áa]", |
|
|
r"oi", |
|
|
r"bom\s+dia", |
|
|
r"boa\s+tarde", |
|
|
r"boa\s+noite", |
|
|
r"e\s+a[íi]", |
|
|
r"tudo\s+bem", |
|
|
r"como\s+vai" |
|
|
], |
|
|
IntentType.CONVERSATION: [ |
|
|
r"conversar", |
|
|
r"falar\s+sobre", |
|
|
r"me\s+conte", |
|
|
r"vamos\s+conversar", |
|
|
r"quero\s+saber", |
|
|
r"pode\s+me\s+falar" |
|
|
], |
|
|
IntentType.HELP_REQUEST: [ |
|
|
r"preciso\s+de\s+ajuda", |
|
|
r"me\s+ajud[ae]", |
|
|
r"pode\s+ajudar", |
|
|
r"n[ãa]o\s+sei\s+como", |
|
|
r"n[ãa]o\s+entendi", |
|
|
r"como\s+fa[çc]o" |
|
|
], |
|
|
IntentType.ABOUT_SYSTEM: [ |
|
|
r"o\s+que\s+[ée]\s+o\s+cidad[ãa]o", |
|
|
r"como\s+voc[êe]\s+funciona", |
|
|
r"quem\s+[ée]\s+voc[êe]", |
|
|
r"para\s+que\s+serve", |
|
|
r"o\s+que\s+voc[êe]\s+faz", |
|
|
r"qual\s+sua\s+fun[çc][ãa]o" |
|
|
], |
|
|
IntentType.SMALLTALK: [ |
|
|
r"como\s+est[áa]\s+o\s+tempo", |
|
|
r"voc[êe]\s+gosta", |
|
|
r"qual\s+sua\s+opini[ãa]o", |
|
|
r"o\s+que\s+acha", |
|
|
r"conte\s+uma\s+hist[óo]ria", |
|
|
r"voc[êe]\s+[ée]\s+brasileiro" |
|
|
], |
|
|
IntentType.THANKS: [ |
|
|
r"obrigad[oa]", |
|
|
r"muito\s+obrigad[oa]", |
|
|
r"valeu", |
|
|
r"gratid[ãa]o", |
|
|
r"agradec[çc]o", |
|
|
r"foi\s+[úu]til" |
|
|
], |
|
|
IntentType.GOODBYE: [ |
|
|
r"tchau", |
|
|
r"at[ée]\s+logo", |
|
|
r"at[ée]\s+mais", |
|
|
r"adeus", |
|
|
r"falou", |
|
|
r"tenho\s+que\s+ir", |
|
|
r"at[ée]\s+breve" |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
self.organ_map = { |
|
|
"saúde": {"name": "Ministério da Saúde", "code": "26000"}, |
|
|
"saude": {"name": "Ministério da Saúde", "code": "26000"}, |
|
|
"educação": {"name": "Ministério da Educação", "code": "25000"}, |
|
|
"educacao": {"name": "Ministério da Educação", "code": "25000"}, |
|
|
"presidência": {"name": "Presidência da República", "code": "20000"}, |
|
|
"presidencia": {"name": "Presidência da República", "code": "20000"}, |
|
|
"justiça": {"name": "Ministério da Justiça", "code": "30000"}, |
|
|
"justica": {"name": "Ministério da Justiça", "code": "30000"}, |
|
|
"agricultura": {"name": "Ministério da Agricultura", "code": "22000"} |
|
|
} |
|
|
|
|
|
async def detect(self, message: str) -> Intent: |
|
|
"""Detect intent from user message""" |
|
|
message_lower = message.lower().strip() |
|
|
|
|
|
|
|
|
entities = { |
|
|
"original_message": message, |
|
|
"organs": self._extract_organs(message_lower), |
|
|
"period": self._extract_period(message_lower), |
|
|
"values": self._extract_values(message_lower) |
|
|
} |
|
|
|
|
|
|
|
|
best_match = None |
|
|
best_confidence = 0.0 |
|
|
|
|
|
for intent_type, patterns in self.patterns.items(): |
|
|
for pattern in patterns: |
|
|
match = re.search(pattern, message_lower) |
|
|
if match: |
|
|
|
|
|
confidence = 0.8 |
|
|
if len(match.groups()) > 0: |
|
|
confidence = 0.9 |
|
|
|
|
|
if confidence > best_confidence: |
|
|
best_confidence = confidence |
|
|
best_match = intent_type |
|
|
|
|
|
|
|
|
if not best_match: |
|
|
best_match = IntentType.QUESTION |
|
|
best_confidence = 0.5 |
|
|
|
|
|
|
|
|
suggested_agent = self._get_agent_for_intent(best_match) |
|
|
|
|
|
return Intent( |
|
|
type=best_match, |
|
|
entities=entities, |
|
|
confidence=best_confidence, |
|
|
suggested_agent=suggested_agent |
|
|
) |
|
|
|
|
|
def _extract_organs(self, text: str) -> List[Dict[str, str]]: |
|
|
"""Extract government organ mentions""" |
|
|
found = [] |
|
|
for keyword, info in self.organ_map.items(): |
|
|
if keyword in text: |
|
|
found.append(info) |
|
|
return found |
|
|
|
|
|
def _extract_period(self, text: str) -> Optional[Dict[str, str]]: |
|
|
"""Extract time period mentions""" |
|
|
|
|
|
year_match = re.search(r"20\d{2}", text) |
|
|
if year_match: |
|
|
year = year_match.group() |
|
|
return {"year": year, "type": "year"} |
|
|
|
|
|
|
|
|
months = { |
|
|
"janeiro": 1, "fevereiro": 2, "março": 3, "marco": 3, |
|
|
"abril": 4, "maio": 5, "junho": 6, |
|
|
"julho": 7, "agosto": 8, "setembro": 9, |
|
|
"outubro": 10, "novembro": 11, "dezembro": 12 |
|
|
} |
|
|
|
|
|
for month_name, month_num in months.items(): |
|
|
if month_name in text: |
|
|
return {"month": month_num, "type": "month"} |
|
|
|
|
|
|
|
|
if "último" in text or "ultimo" in text: |
|
|
if "mês" in text or "mes" in text: |
|
|
return {"relative": "last_month", "type": "relative"} |
|
|
elif "ano" in text: |
|
|
return {"relative": "last_year", "type": "relative"} |
|
|
|
|
|
return None |
|
|
|
|
|
def _extract_values(self, text: str) -> List[float]: |
|
|
"""Extract monetary values""" |
|
|
values = [] |
|
|
|
|
|
|
|
|
patterns = [ |
|
|
r"R\$\s*([\d.,]+)", |
|
|
r"([\d.,]+)\s*reais", |
|
|
r"([\d.,]+)\s*milhões", |
|
|
r"([\d.,]+)\s*mil" |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
matches = re.findall(pattern, text) |
|
|
for match in matches: |
|
|
try: |
|
|
|
|
|
value_str = match.replace(".", "").replace(",", ".") |
|
|
value = float(value_str) |
|
|
|
|
|
|
|
|
if "milhões" in text: |
|
|
value *= 1_000_000 |
|
|
elif "mil" in text: |
|
|
value *= 1_000 |
|
|
|
|
|
values.append(value) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return values |
|
|
|
|
|
def _get_agent_for_intent(self, intent_type: IntentType) -> str: |
|
|
"""Get the best agent for handling this intent |
|
|
|
|
|
Routing strategy: |
|
|
- Conversational intents -> Drummond (conversational AI) |
|
|
- Task-specific intents -> Specialized agents |
|
|
- Unknown intents -> Abaporu (master orchestrator) |
|
|
""" |
|
|
mapping = { |
|
|
|
|
|
IntentType.INVESTIGATE: "abaporu", |
|
|
IntentType.ANALYZE: "anita", |
|
|
IntentType.REPORT: "tiradentes", |
|
|
IntentType.STATUS: "abaporu", |
|
|
|
|
|
|
|
|
IntentType.GREETING: "drummond", |
|
|
IntentType.CONVERSATION: "drummond", |
|
|
IntentType.HELP_REQUEST: "drummond", |
|
|
IntentType.ABOUT_SYSTEM: "drummond", |
|
|
IntentType.SMALLTALK: "drummond", |
|
|
IntentType.THANKS: "drummond", |
|
|
IntentType.GOODBYE: "drummond", |
|
|
|
|
|
|
|
|
IntentType.QUESTION: "drummond", |
|
|
IntentType.HELP: "drummond", |
|
|
IntentType.UNKNOWN: "drummond" |
|
|
} |
|
|
return mapping.get(intent_type, "abaporu") |
|
|
|
|
|
class ChatService: |
|
|
"""Service for managing chat sessions and conversations""" |
|
|
|
|
|
def __init__(self): |
|
|
self.cache = cache_service |
|
|
self.sessions: Dict[str, ChatSession] = {} |
|
|
self.messages: Dict[str, List[Dict]] = defaultdict(list) |
|
|
|
|
|
|
|
|
self.agents = None |
|
|
self._agents_initialized = False |
|
|
|
|
|
async def get_or_create_session( |
|
|
self, |
|
|
session_id: str, |
|
|
user_id: Optional[str] = None |
|
|
) -> ChatSession: |
|
|
"""Get existing session or create new one""" |
|
|
if session_id in self.sessions: |
|
|
session = self.sessions[session_id] |
|
|
session.last_activity = datetime.utcnow() |
|
|
return session |
|
|
|
|
|
|
|
|
session = ChatSession( |
|
|
id=session_id, |
|
|
user_id=user_id, |
|
|
created_at=datetime.utcnow(), |
|
|
last_activity=datetime.utcnow(), |
|
|
context={} |
|
|
) |
|
|
|
|
|
self.sessions[session_id] = session |
|
|
return session |
|
|
|
|
|
async def get_session(self, session_id: str) -> Optional[ChatSession]: |
|
|
"""Get session by ID""" |
|
|
return self.sessions.get(session_id) |
|
|
|
|
|
async def save_message( |
|
|
self, |
|
|
session_id: str, |
|
|
role: str, |
|
|
content: str, |
|
|
agent_id: Optional[str] = None |
|
|
): |
|
|
"""Save message to session history""" |
|
|
message = { |
|
|
"role": role, |
|
|
"content": content, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"agent_id": agent_id |
|
|
} |
|
|
|
|
|
self.messages[session_id].append(message) |
|
|
|
|
|
|
|
|
if session_id in self.sessions: |
|
|
self.sessions[session_id].last_activity = datetime.utcnow() |
|
|
|
|
|
async def get_session_messages( |
|
|
self, |
|
|
session_id: str, |
|
|
limit: int = 50 |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Get messages for a session""" |
|
|
messages = self.messages.get(session_id, []) |
|
|
return messages[-limit:] if len(messages) > limit else messages |
|
|
|
|
|
async def clear_session(self, session_id: str): |
|
|
"""Clear session data""" |
|
|
self.sessions.pop(session_id, None) |
|
|
self.messages.pop(session_id, None) |
|
|
|
|
|
async def get_agent_for_intent(self, intent: Intent) -> BaseAgent: |
|
|
"""Get the appropriate agent for an intent""" |
|
|
self._ensure_agents_initialized() |
|
|
agent_id = intent.suggested_agent |
|
|
if self.agents: |
|
|
return self.agents.get(agent_id, self.agents.get("abaporu")) |
|
|
return None |
|
|
|
|
|
async def update_session_investigation( |
|
|
self, |
|
|
session_id: str, |
|
|
investigation_id: str |
|
|
): |
|
|
"""Update session with current investigation""" |
|
|
if session_id in self.sessions: |
|
|
self.sessions[session_id].current_investigation_id = investigation_id |
|
|
self.sessions[session_id].last_activity = datetime.utcnow() |
|
|
|
|
|
def _ensure_agents_initialized(self): |
|
|
"""Initialize agents on first use (lazy loading)""" |
|
|
if self._agents_initialized: |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
from src.agents import ( |
|
|
MasterAgent, InvestigatorAgent, AnalystAgent, |
|
|
ReporterAgent, CommunicationAgent |
|
|
) |
|
|
|
|
|
|
|
|
agents = { |
|
|
"abaporu": MasterAgent(), |
|
|
"zumbi": InvestigatorAgent(), |
|
|
"anita": AnalystAgent(), |
|
|
"tiradentes": ReporterAgent(), |
|
|
"drummond": CommunicationAgent() |
|
|
} |
|
|
|
|
|
|
|
|
for agent_id, agent in agents.items(): |
|
|
agent.agent_id = agent_id |
|
|
|
|
|
self.agents = agents |
|
|
self._agents_initialized = True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize agents: {type(e).__name__}: {e}") |
|
|
|
|
|
self.agents = {} |
|
|
self._agents_initialized = True |