cidadao.ai-backend / src /services /chat_data_integration.py
anderson-ufrj
feat: integrate Portal da Transparência real data fetching with chat endpoints
c1e6a7a
"""
Chat Data Integration Service
Connects chat agents with Portal da Transparência data
"""
from typing import Dict, List, Optional, Any
from datetime import datetime, date, timedelta
import re
from src.core import get_logger
from src.services.portal_transparencia_service import portal_transparencia
from src.services.maritaca_client import MaritacaClient, MaritacaModel
from src.core.config import settings
logger = get_logger(__name__)
class ChatDataIntegration:
"""Integrates chat requests with real government data."""
def __init__(self):
"""Initialize the integration service."""
self.portal = portal_transparencia
self.ai_client = None
self._init_ai_client()
def _init_ai_client(self):
"""Initialize AI client for data interpretation."""
api_key = getattr(settings, "maritaca_api_key", None)
if api_key:
api_key_value = api_key.get_secret_value() if hasattr(api_key, 'get_secret_value') else api_key
self.ai_client = MaritacaClient(
api_key=api_key_value,
model=MaritacaModel.SABIAZINHO_3
)
async def process_user_query(self, message: str, context: Optional[Dict] = None) -> Dict[str, Any]:
"""
Process user query and fetch relevant data.
Args:
message: User's message
context: Optional conversation context
Returns:
Dict with data and formatted response
"""
# Extract entities and intent from message
entities = await self._extract_entities(message)
# Determine data type to search
data_type = self._determine_data_type(message)
logger.info(f"Processing query - Type: {data_type}, Entities: {entities}")
# Fetch relevant data
try:
if data_type == "contratos":
data = await self._search_contracts(message, entities)
elif data_type == "licitacoes":
data = await self._search_biddings(message, entities)
elif data_type == "despesas":
data = await self._search_expenses(message, entities)
elif data_type == "servidores":
data = await self._search_servants(message, entities)
elif data_type == "fornecedor":
data = await self._get_supplier_details(message, entities)
elif data_type == "analise":
data = await self._analyze_patterns(message, entities)
else:
data = {"tipo": "desconhecido", "mensagem": "Não entendi que tipo de dados você procura"}
# Format response with AI
formatted_response = await self._format_response_with_ai(data, message)
return {
"data": data,
"response": formatted_response,
"entities": entities,
"data_type": data_type
}
except Exception as e:
logger.error(f"Error processing query: {e}")
return {
"data": None,
"response": "Desculpe, tive um problema ao buscar os dados. Por favor, tente novamente.",
"error": str(e)
}
async def _extract_entities(self, message: str) -> Dict[str, Any]:
"""Extract entities from user message."""
entities = {}
# Extract CNPJ
cnpj_match = re.search(r'\b\d{2}\.?\d{3}\.?\d{3}/?\d{4}-?\d{2}\b', message)
if cnpj_match:
entities["cnpj"] = re.sub(r'[^\d]', '', cnpj_match.group())
# Extract CPF
cpf_match = re.search(r'\b\d{3}\.?\d{3}\.?\d{3}-?\d{2}\b', message)
if cpf_match:
entities["cpf"] = re.sub(r'[^\d]', '', cpf_match.group())
# Extract dates
date_patterns = [
(r'\b(\d{1,2})/(\d{1,2})/(\d{4})\b', '%d/%m/%Y'),
(r'\b(\d{4})-(\d{1,2})-(\d{1,2})\b', '%Y-%m-%d')
]
for pattern, fmt in date_patterns:
matches = re.findall(pattern, message)
if matches:
try:
if fmt == '%d/%m/%Y':
date_str = f"{matches[0][0]}/{matches[0][1]}/{matches[0][2]}"
else:
date_str = f"{matches[0][0]}-{matches[0][1]}-{matches[0][2]}"
entities["data"] = datetime.strptime(date_str, fmt).date()
except:
pass
# Extract year
year_match = re.search(r'\b(20\d{2})\b', message)
if year_match and "data" not in entities:
entities["ano"] = int(year_match.group(1))
# Extract monetary values
value_patterns = [
r'R\$\s*([\d.,]+)',
r'([\d.,]+)\s*reais',
r'([\d.,]+)\s*mil\s*reais'
]
for pattern in value_patterns:
match = re.search(pattern, message, re.IGNORECASE)
if match:
value_str = match.group(1).replace('.', '').replace(',', '.')
try:
value = float(value_str)
if 'mil' in message.lower():
value *= 1000
entities["valor"] = value
except:
pass
break
# Extract agency/organization names
org_keywords = ["ministério", "secretaria", "prefeitura", "governo", "órgão"]
for keyword in org_keywords:
pattern = rf'{keyword}\s+(?:de\s+|da\s+|do\s+)?([A-Za-zÀ-ú\s]+?)(?:\.|,|$)'
match = re.search(pattern, message, re.IGNORECASE)
if match:
entities["orgao"] = match.group(1).strip()
break
return entities
def _determine_data_type(self, message: str) -> str:
"""Determine what type of data the user is asking for."""
message_lower = message.lower()
# Keywords for each data type
keywords = {
"contratos": ["contrato", "contratos", "contratação", "contratações", "contratou", "contratado"],
"licitacoes": ["licitação", "licitações", "pregão", "concorrência", "tomada de preço"],
"despesas": ["despesa", "despesas", "gasto", "gastos", "pagamento", "pagamentos"],
"servidores": ["servidor", "servidores", "funcionário", "funcionários", "salário", "remuneração"],
"fornecedor": ["fornecedor", "fornecedores", "empresa", "cnpj"],
"analise": ["análise", "analisar", "padrão", "padrões", "tendência", "evolução", "comparar"]
}
# Count matches for each type
scores = {}
for data_type, words in keywords.items():
scores[data_type] = sum(1 for word in words if word in message_lower)
# Return type with highest score
if max(scores.values()) > 0:
return max(scores, key=scores.get)
# Default to contracts if no clear match
return "contratos"
async def _search_contracts(self, message: str, entities: Dict) -> Dict[str, Any]:
"""Search for contracts based on extracted entities."""
# Build search parameters
params = {}
if "orgao" in entities:
# TODO: Map organization name to code
params["orgao"] = entities["orgao"]
if "cnpj" in entities:
params["cnpj_fornecedor"] = entities["cnpj"]
if "data" in entities:
# Search 30 days around the date
params["data_inicial"] = entities["data"] - timedelta(days=30)
params["data_final"] = entities["data"] + timedelta(days=30)
elif "ano" in entities:
params["data_inicial"] = date(entities["ano"], 1, 1)
params["data_final"] = date(entities["ano"], 12, 31)
if "valor" in entities:
# Search 20% range around value
params["valor_minimo"] = entities["valor"] * 0.8
params["valor_maximo"] = entities["valor"] * 1.2
# Search contracts
result = await self.portal.search_contracts(**params, size=20)
return {
"tipo": "contratos",
"dados": result["contratos"],
"total": result["total"],
"parametros": params
}
async def _search_biddings(self, message: str, entities: Dict) -> Dict[str, Any]:
"""Search for biddings based on extracted entities."""
params = {}
if "orgao" in entities:
params["orgao"] = entities["orgao"]
if "data" in entities:
params["data_inicial"] = entities["data"] - timedelta(days=30)
params["data_final"] = entities["data"] + timedelta(days=30)
elif "ano" in entities:
params["data_inicial"] = date(entities["ano"], 1, 1)
params["data_final"] = date(entities["ano"], 12, 31)
result = await self.portal.search_biddings(**params, size=20)
return {
"tipo": "licitacoes",
"dados": result["licitacoes"],
"total": result["total"],
"parametros": params
}
async def _search_expenses(self, message: str, entities: Dict) -> Dict[str, Any]:
"""Search for expenses based on extracted entities."""
params = {}
if "orgao" in entities:
params["orgao"] = entities["orgao"]
# Determine month/year
if "data" in entities:
params["mes_ano"] = entities["data"].strftime("%m/%Y")
elif "ano" in entities:
# Get current month for the specified year
params["mes_ano"] = f"{datetime.now().month:02d}/{entities['ano']}"
result = await self.portal.search_expenses(**params, size=50)
return {
"tipo": "despesas",
"dados": result["despesas"],
"total": result["total"],
"parametros": params
}
async def _search_servants(self, message: str, entities: Dict) -> Dict[str, Any]:
"""Search for public servants based on extracted entities."""
params = {}
# Extract name from message
name_pattern = r'(?:servidor|funcionário)\s+([A-Za-zÀ-ú\s]+?)(?:\.|,|$|trabalha|recebe)'
name_match = re.search(name_pattern, message, re.IGNORECASE)
if name_match:
params["nome"] = name_match.group(1).strip()
if "cpf" in entities:
params["cpf"] = entities["cpf"]
if "orgao" in entities:
params["orgao"] = entities["orgao"]
result = await self.portal.search_public_servants(**params, size=20)
return {
"tipo": "servidores",
"dados": result["servidores"],
"total": result["total"],
"parametros": params
}
async def _get_supplier_details(self, message: str, entities: Dict) -> Dict[str, Any]:
"""Get detailed supplier information."""
if "cnpj" not in entities:
return {
"tipo": "fornecedor",
"erro": "CNPJ não encontrado na mensagem"
}
result = await self.portal.get_supplier_info(entities["cnpj"])
return {
"tipo": "fornecedor",
"dados": result,
"cnpj": entities["cnpj"]
}
async def _analyze_patterns(self, message: str, entities: Dict) -> Dict[str, Any]:
"""Analyze spending patterns."""
params = {}
if "orgao" in entities:
params["orgao"] = entities["orgao"]
# Determine period
if "ano" in entities:
params["periodo_meses"] = 12
else:
params["periodo_meses"] = 6 # Default to 6 months
result = await self.portal.analyze_spending_patterns(**params)
return {
"tipo": "analise",
"dados": result
}
async def _format_response_with_ai(self, data: Dict, original_query: str) -> str:
"""Format the data response using AI."""
if not self.ai_client or not data.get("dados"):
return self._format_response_simple(data)
try:
# Prepare context for AI
system_prompt = """Você é um assistente especializado em transparência pública.
Sua tarefa é explicar dados governamentais de forma clara e acessível.
Use linguagem simples, destaque informações importantes e sempre seja preciso com valores e datas.
Se encontrar possíveis irregularidades, mencione-as de forma objetiva."""
# Prepare data summary
if data["tipo"] == "contratos":
data_summary = f"Encontrei {data.get('total', 0)} contratos. "
if data.get("dados"):
data_summary += "Aqui estão os principais: "
for i, contract in enumerate(data["dados"][:3]):
data_summary += f"\n{i+1}. {contract.get('objeto', 'Sem descrição')} - "
data_summary += f"R$ {contract.get('valorTotal', 0):,.2f} - "
data_summary += f"Fornecedor: {contract.get('nomeFantasiaFornecedor', 'Não informado')}"
elif data["tipo"] == "analise":
analysis = data["dados"]
data_summary = f"Análise de gastos do período {analysis['periodo']['inicio']} a {analysis['periodo']['fim']}: "
data_summary += f"\n- Total de contratos: {analysis['total_contratos']}"
data_summary += f"\n- Valor total: R$ {analysis['valor_total_contratos']:,.2f}"
data_summary += f"\n- Fornecedores únicos: {analysis['fornecedores_unicos']}"
if analysis.get("alertas"):
data_summary += "\n\nAlertas encontrados:"
for alert in analysis["alertas"]:
data_summary += f"\n⚠️ {alert['mensagem']}"
else:
data_summary = f"Encontrei {data.get('total', 0)} registros do tipo {data['tipo']}"
# Generate AI response
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"O usuário perguntou: '{original_query}'\n\nDados encontrados:\n{data_summary}\n\nExplique esses dados de forma clara e útil."}
]
response = await self.ai_client.chat_completion(
messages=messages,
max_tokens=500,
temperature=0.7
)
return response.content
except Exception as e:
logger.error(f"Error formatting with AI: {e}")
return self._format_response_simple(data)
def _format_response_simple(self, data: Dict) -> str:
"""Simple formatting without AI."""
if not data.get("dados"):
return "Não encontrei dados com os critérios especificados. Tente refinar sua busca."
response = f"Encontrei {data.get('total', 0)} registros.\n\n"
if data["tipo"] == "contratos" and data.get("dados"):
response += "Principais contratos:\n"
for i, contract in enumerate(data["dados"][:5], 1):
response += f"{i}. {contract.get('objeto', 'Sem descrição')[:100]}...\n"
response += f" Valor: R$ {contract.get('valorTotal', 0):,.2f}\n"
response += f" Fornecedor: {contract.get('nomeFantasiaFornecedor', 'Não informado')}\n\n"
return response
# Singleton instance
chat_data_integration = ChatDataIntegration()