anderson-ufrj
feat: integrate Portal da Transparência real data fetching with chat endpoints
c1e6a7a
| """ | |
| Chat Data Integration Service | |
| Connects chat agents with Portal da Transparência data | |
| """ | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime, date, timedelta | |
| import re | |
| from src.core import get_logger | |
| from src.services.portal_transparencia_service import portal_transparencia | |
| from src.services.maritaca_client import MaritacaClient, MaritacaModel | |
| from src.core.config import settings | |
| logger = get_logger(__name__) | |
| class ChatDataIntegration: | |
| """Integrates chat requests with real government data.""" | |
| def __init__(self): | |
| """Initialize the integration service.""" | |
| self.portal = portal_transparencia | |
| self.ai_client = None | |
| self._init_ai_client() | |
| def _init_ai_client(self): | |
| """Initialize AI client for data interpretation.""" | |
| api_key = getattr(settings, "maritaca_api_key", None) | |
| if api_key: | |
| api_key_value = api_key.get_secret_value() if hasattr(api_key, 'get_secret_value') else api_key | |
| self.ai_client = MaritacaClient( | |
| api_key=api_key_value, | |
| model=MaritacaModel.SABIAZINHO_3 | |
| ) | |
| async def process_user_query(self, message: str, context: Optional[Dict] = None) -> Dict[str, Any]: | |
| """ | |
| Process user query and fetch relevant data. | |
| Args: | |
| message: User's message | |
| context: Optional conversation context | |
| Returns: | |
| Dict with data and formatted response | |
| """ | |
| # Extract entities and intent from message | |
| entities = await self._extract_entities(message) | |
| # Determine data type to search | |
| data_type = self._determine_data_type(message) | |
| logger.info(f"Processing query - Type: {data_type}, Entities: {entities}") | |
| # Fetch relevant data | |
| try: | |
| if data_type == "contratos": | |
| data = await self._search_contracts(message, entities) | |
| elif data_type == "licitacoes": | |
| data = await self._search_biddings(message, entities) | |
| elif data_type == "despesas": | |
| data = await self._search_expenses(message, entities) | |
| elif data_type == "servidores": | |
| data = await self._search_servants(message, entities) | |
| elif data_type == "fornecedor": | |
| data = await self._get_supplier_details(message, entities) | |
| elif data_type == "analise": | |
| data = await self._analyze_patterns(message, entities) | |
| else: | |
| data = {"tipo": "desconhecido", "mensagem": "Não entendi que tipo de dados você procura"} | |
| # Format response with AI | |
| formatted_response = await self._format_response_with_ai(data, message) | |
| return { | |
| "data": data, | |
| "response": formatted_response, | |
| "entities": entities, | |
| "data_type": data_type | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing query: {e}") | |
| return { | |
| "data": None, | |
| "response": "Desculpe, tive um problema ao buscar os dados. Por favor, tente novamente.", | |
| "error": str(e) | |
| } | |
| async def _extract_entities(self, message: str) -> Dict[str, Any]: | |
| """Extract entities from user message.""" | |
| entities = {} | |
| # Extract CNPJ | |
| cnpj_match = re.search(r'\b\d{2}\.?\d{3}\.?\d{3}/?\d{4}-?\d{2}\b', message) | |
| if cnpj_match: | |
| entities["cnpj"] = re.sub(r'[^\d]', '', cnpj_match.group()) | |
| # Extract CPF | |
| cpf_match = re.search(r'\b\d{3}\.?\d{3}\.?\d{3}-?\d{2}\b', message) | |
| if cpf_match: | |
| entities["cpf"] = re.sub(r'[^\d]', '', cpf_match.group()) | |
| # Extract dates | |
| date_patterns = [ | |
| (r'\b(\d{1,2})/(\d{1,2})/(\d{4})\b', '%d/%m/%Y'), | |
| (r'\b(\d{4})-(\d{1,2})-(\d{1,2})\b', '%Y-%m-%d') | |
| ] | |
| for pattern, fmt in date_patterns: | |
| matches = re.findall(pattern, message) | |
| if matches: | |
| try: | |
| if fmt == '%d/%m/%Y': | |
| date_str = f"{matches[0][0]}/{matches[0][1]}/{matches[0][2]}" | |
| else: | |
| date_str = f"{matches[0][0]}-{matches[0][1]}-{matches[0][2]}" | |
| entities["data"] = datetime.strptime(date_str, fmt).date() | |
| except: | |
| pass | |
| # Extract year | |
| year_match = re.search(r'\b(20\d{2})\b', message) | |
| if year_match and "data" not in entities: | |
| entities["ano"] = int(year_match.group(1)) | |
| # Extract monetary values | |
| value_patterns = [ | |
| r'R\$\s*([\d.,]+)', | |
| r'([\d.,]+)\s*reais', | |
| r'([\d.,]+)\s*mil\s*reais' | |
| ] | |
| for pattern in value_patterns: | |
| match = re.search(pattern, message, re.IGNORECASE) | |
| if match: | |
| value_str = match.group(1).replace('.', '').replace(',', '.') | |
| try: | |
| value = float(value_str) | |
| if 'mil' in message.lower(): | |
| value *= 1000 | |
| entities["valor"] = value | |
| except: | |
| pass | |
| break | |
| # Extract agency/organization names | |
| org_keywords = ["ministério", "secretaria", "prefeitura", "governo", "órgão"] | |
| for keyword in org_keywords: | |
| pattern = rf'{keyword}\s+(?:de\s+|da\s+|do\s+)?([A-Za-zÀ-ú\s]+?)(?:\.|,|$)' | |
| match = re.search(pattern, message, re.IGNORECASE) | |
| if match: | |
| entities["orgao"] = match.group(1).strip() | |
| break | |
| return entities | |
| def _determine_data_type(self, message: str) -> str: | |
| """Determine what type of data the user is asking for.""" | |
| message_lower = message.lower() | |
| # Keywords for each data type | |
| keywords = { | |
| "contratos": ["contrato", "contratos", "contratação", "contratações", "contratou", "contratado"], | |
| "licitacoes": ["licitação", "licitações", "pregão", "concorrência", "tomada de preço"], | |
| "despesas": ["despesa", "despesas", "gasto", "gastos", "pagamento", "pagamentos"], | |
| "servidores": ["servidor", "servidores", "funcionário", "funcionários", "salário", "remuneração"], | |
| "fornecedor": ["fornecedor", "fornecedores", "empresa", "cnpj"], | |
| "analise": ["análise", "analisar", "padrão", "padrões", "tendência", "evolução", "comparar"] | |
| } | |
| # Count matches for each type | |
| scores = {} | |
| for data_type, words in keywords.items(): | |
| scores[data_type] = sum(1 for word in words if word in message_lower) | |
| # Return type with highest score | |
| if max(scores.values()) > 0: | |
| return max(scores, key=scores.get) | |
| # Default to contracts if no clear match | |
| return "contratos" | |
| async def _search_contracts(self, message: str, entities: Dict) -> Dict[str, Any]: | |
| """Search for contracts based on extracted entities.""" | |
| # Build search parameters | |
| params = {} | |
| if "orgao" in entities: | |
| # TODO: Map organization name to code | |
| params["orgao"] = entities["orgao"] | |
| if "cnpj" in entities: | |
| params["cnpj_fornecedor"] = entities["cnpj"] | |
| if "data" in entities: | |
| # Search 30 days around the date | |
| params["data_inicial"] = entities["data"] - timedelta(days=30) | |
| params["data_final"] = entities["data"] + timedelta(days=30) | |
| elif "ano" in entities: | |
| params["data_inicial"] = date(entities["ano"], 1, 1) | |
| params["data_final"] = date(entities["ano"], 12, 31) | |
| if "valor" in entities: | |
| # Search 20% range around value | |
| params["valor_minimo"] = entities["valor"] * 0.8 | |
| params["valor_maximo"] = entities["valor"] * 1.2 | |
| # Search contracts | |
| result = await self.portal.search_contracts(**params, size=20) | |
| return { | |
| "tipo": "contratos", | |
| "dados": result["contratos"], | |
| "total": result["total"], | |
| "parametros": params | |
| } | |
| async def _search_biddings(self, message: str, entities: Dict) -> Dict[str, Any]: | |
| """Search for biddings based on extracted entities.""" | |
| params = {} | |
| if "orgao" in entities: | |
| params["orgao"] = entities["orgao"] | |
| if "data" in entities: | |
| params["data_inicial"] = entities["data"] - timedelta(days=30) | |
| params["data_final"] = entities["data"] + timedelta(days=30) | |
| elif "ano" in entities: | |
| params["data_inicial"] = date(entities["ano"], 1, 1) | |
| params["data_final"] = date(entities["ano"], 12, 31) | |
| result = await self.portal.search_biddings(**params, size=20) | |
| return { | |
| "tipo": "licitacoes", | |
| "dados": result["licitacoes"], | |
| "total": result["total"], | |
| "parametros": params | |
| } | |
| async def _search_expenses(self, message: str, entities: Dict) -> Dict[str, Any]: | |
| """Search for expenses based on extracted entities.""" | |
| params = {} | |
| if "orgao" in entities: | |
| params["orgao"] = entities["orgao"] | |
| # Determine month/year | |
| if "data" in entities: | |
| params["mes_ano"] = entities["data"].strftime("%m/%Y") | |
| elif "ano" in entities: | |
| # Get current month for the specified year | |
| params["mes_ano"] = f"{datetime.now().month:02d}/{entities['ano']}" | |
| result = await self.portal.search_expenses(**params, size=50) | |
| return { | |
| "tipo": "despesas", | |
| "dados": result["despesas"], | |
| "total": result["total"], | |
| "parametros": params | |
| } | |
| async def _search_servants(self, message: str, entities: Dict) -> Dict[str, Any]: | |
| """Search for public servants based on extracted entities.""" | |
| params = {} | |
| # Extract name from message | |
| name_pattern = r'(?:servidor|funcionário)\s+([A-Za-zÀ-ú\s]+?)(?:\.|,|$|trabalha|recebe)' | |
| name_match = re.search(name_pattern, message, re.IGNORECASE) | |
| if name_match: | |
| params["nome"] = name_match.group(1).strip() | |
| if "cpf" in entities: | |
| params["cpf"] = entities["cpf"] | |
| if "orgao" in entities: | |
| params["orgao"] = entities["orgao"] | |
| result = await self.portal.search_public_servants(**params, size=20) | |
| return { | |
| "tipo": "servidores", | |
| "dados": result["servidores"], | |
| "total": result["total"], | |
| "parametros": params | |
| } | |
| async def _get_supplier_details(self, message: str, entities: Dict) -> Dict[str, Any]: | |
| """Get detailed supplier information.""" | |
| if "cnpj" not in entities: | |
| return { | |
| "tipo": "fornecedor", | |
| "erro": "CNPJ não encontrado na mensagem" | |
| } | |
| result = await self.portal.get_supplier_info(entities["cnpj"]) | |
| return { | |
| "tipo": "fornecedor", | |
| "dados": result, | |
| "cnpj": entities["cnpj"] | |
| } | |
| async def _analyze_patterns(self, message: str, entities: Dict) -> Dict[str, Any]: | |
| """Analyze spending patterns.""" | |
| params = {} | |
| if "orgao" in entities: | |
| params["orgao"] = entities["orgao"] | |
| # Determine period | |
| if "ano" in entities: | |
| params["periodo_meses"] = 12 | |
| else: | |
| params["periodo_meses"] = 6 # Default to 6 months | |
| result = await self.portal.analyze_spending_patterns(**params) | |
| return { | |
| "tipo": "analise", | |
| "dados": result | |
| } | |
| async def _format_response_with_ai(self, data: Dict, original_query: str) -> str: | |
| """Format the data response using AI.""" | |
| if not self.ai_client or not data.get("dados"): | |
| return self._format_response_simple(data) | |
| try: | |
| # Prepare context for AI | |
| system_prompt = """Você é um assistente especializado em transparência pública. | |
| Sua tarefa é explicar dados governamentais de forma clara e acessível. | |
| Use linguagem simples, destaque informações importantes e sempre seja preciso com valores e datas. | |
| Se encontrar possíveis irregularidades, mencione-as de forma objetiva.""" | |
| # Prepare data summary | |
| if data["tipo"] == "contratos": | |
| data_summary = f"Encontrei {data.get('total', 0)} contratos. " | |
| if data.get("dados"): | |
| data_summary += "Aqui estão os principais: " | |
| for i, contract in enumerate(data["dados"][:3]): | |
| data_summary += f"\n{i+1}. {contract.get('objeto', 'Sem descrição')} - " | |
| data_summary += f"R$ {contract.get('valorTotal', 0):,.2f} - " | |
| data_summary += f"Fornecedor: {contract.get('nomeFantasiaFornecedor', 'Não informado')}" | |
| elif data["tipo"] == "analise": | |
| analysis = data["dados"] | |
| data_summary = f"Análise de gastos do período {analysis['periodo']['inicio']} a {analysis['periodo']['fim']}: " | |
| data_summary += f"\n- Total de contratos: {analysis['total_contratos']}" | |
| data_summary += f"\n- Valor total: R$ {analysis['valor_total_contratos']:,.2f}" | |
| data_summary += f"\n- Fornecedores únicos: {analysis['fornecedores_unicos']}" | |
| if analysis.get("alertas"): | |
| data_summary += "\n\nAlertas encontrados:" | |
| for alert in analysis["alertas"]: | |
| data_summary += f"\n⚠️ {alert['mensagem']}" | |
| else: | |
| data_summary = f"Encontrei {data.get('total', 0)} registros do tipo {data['tipo']}" | |
| # Generate AI response | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": f"O usuário perguntou: '{original_query}'\n\nDados encontrados:\n{data_summary}\n\nExplique esses dados de forma clara e útil."} | |
| ] | |
| response = await self.ai_client.chat_completion( | |
| messages=messages, | |
| max_tokens=500, | |
| temperature=0.7 | |
| ) | |
| return response.content | |
| except Exception as e: | |
| logger.error(f"Error formatting with AI: {e}") | |
| return self._format_response_simple(data) | |
| def _format_response_simple(self, data: Dict) -> str: | |
| """Simple formatting without AI.""" | |
| if not data.get("dados"): | |
| return "Não encontrei dados com os critérios especificados. Tente refinar sua busca." | |
| response = f"Encontrei {data.get('total', 0)} registros.\n\n" | |
| if data["tipo"] == "contratos" and data.get("dados"): | |
| response += "Principais contratos:\n" | |
| for i, contract in enumerate(data["dados"][:5], 1): | |
| response += f"{i}. {contract.get('objeto', 'Sem descrição')[:100]}...\n" | |
| response += f" Valor: R$ {contract.get('valorTotal', 0):,.2f}\n" | |
| response += f" Fornecedor: {contract.get('nomeFantasiaFornecedor', 'Não informado')}\n\n" | |
| return response | |
| # Singleton instance | |
| chat_data_integration = ChatDataIntegration() |