""" API de Deployment para Cidadão.AI Interface completa para servir o modelo especializado em transparência pública. Similar ao padrão Kimi K2, mas otimizado para análise governamental brasileira. """ from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, File, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, FileResponse from pydantic import BaseModel, Field from typing import Dict, List, Optional, Union, Generator import asyncio import torch from src.core import json_utils import logging from pathlib import Path from datetime import datetime import uvicorn from contextlib import asynccontextmanager import tempfile import pandas as pd from io import StringIO from .cidadao_model import CidadaoAIForTransparency, create_cidadao_model from .training_pipeline import TransparencyDataset from transformers import AutoTokenizer logger = logging.getLogger(__name__) # === MODELOS DE REQUEST/RESPONSE === class TransparencyAnalysisRequest(BaseModel): """Request para análise de transparência""" text: str = Field(..., description="Texto para análise (contrato, despesa, etc.)") analysis_type: str = Field( default="complete", description="Tipo de análise: 'anomaly', 'financial', 'legal', 'complete'" ) include_explanation: bool = Field( default=True, description="Incluir explicação detalhada dos resultados" ) confidence_threshold: float = Field( default=0.7, description="Limiar de confiança para alertas", ge=0.0, le=1.0 ) class BatchAnalysisRequest(BaseModel): """Request para análise em lote""" texts: List[str] = Field(..., description="Lista de textos para análise") analysis_type: str = Field(default="complete") include_explanation: bool = Field(default=True) format: str = Field(default="json", description="Formato de saída: 'json' ou 'csv'") class ChatRequest(BaseModel): """Request para chat com Cidadão.AI""" messages: List[Dict[str, str]] = Field(..., description="Histórico de mensagens") temperature: float = Field(default=0.6, ge=0.0, le=2.0) max_tokens: int = Field(default=512, ge=1, le=2048) stream: bool = Field(default=False, description="Usar streaming de resposta") tools: Optional[List[Dict]] = Field(default=None, description="Ferramentas disponíveis") class TransparencyAnalysisResponse(BaseModel): """Response da análise de transparência""" analysis_id: str = Field(..., description="ID único da análise") text: str = Field(..., description="Texto analisado") timestamp: str = Field(..., description="Timestamp da análise") # Resultados de anomalia anomaly_detection: Optional[Dict] = Field(None, description="Resultados de detecção de anomalias") # Resultados financeiros financial_analysis: Optional[Dict] = Field(None, description="Análise de risco financeiro") # Resultados legais legal_compliance: Optional[Dict] = Field(None, description="Verificação de conformidade legal") # Resumo executivo executive_summary: Dict = Field(..., description="Resumo executivo da análise") # Recomendações recommendations: List[str] = Field(..., description="Recomendações baseadas na análise") # Metadados confidence: float = Field(..., description="Confiança geral da análise") processing_time: float = Field(..., description="Tempo de processamento em segundos") class ChatResponse(BaseModel): """Response do chat""" message: str = Field(..., description="Resposta do assistente") tools_used: Optional[List[str]] = Field(None, description="Ferramentas utilizadas") confidence: float = Field(..., description="Confiança da resposta") sources: Optional[List[str]] = Field(None, description="Fontes consultadas") class ModelInfoResponse(BaseModel): """Informações do modelo""" model_name: str = Field(..., description="Nome do modelo") version: str = Field(..., description="Versão do modelo") specialization: List[str] = Field(..., description="Tarefas especializadas") total_parameters: int = Field(..., description="Número total de parâmetros") training_data: Dict = Field(..., description="Informações sobre dados de treinamento") performance_metrics: Dict = Field(..., description="Métricas de performance") # === GERENCIADOR DE MODELO === class CidadaoAIManager: """Gerenciador do modelo Cidadão.AI""" def __init__(self, model_path: Optional[str] = None): self.model_path = model_path self.model: Optional[CidadaoAIForTransparency] = None self.tokenizer: Optional[AutoTokenizer] = None self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.loaded = False # Estatísticas de uso self.usage_stats = { "total_requests": 0, "anomaly_detections": 0, "financial_analyses": 0, "legal_checks": 0, "chat_requests": 0, "average_processing_time": 0.0 } async def load_model(self): """Carregar modelo""" try: logger.info("🤖 Carregando Cidadão.AI...") if self.model_path and Path(self.model_path).exists(): # Carregar modelo treinado self.model = CidadaoAIForTransparency.load_model(self.model_path) logger.info(f"✅ Modelo carregado de {self.model_path}") else: # Carregar modelo base self.model = create_cidadao_model( specialized_tasks=["all"], model_size="medium" ) logger.info("✅ Modelo base criado") # Carregar tokenizer self.tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium") self.tokenizer.pad_token = self.tokenizer.eos_token # Mover para device self.model.to(self.device) self.model.eval() self.loaded = True logger.info(f"🎯 Modelo pronto no device: {self.device}") except Exception as e: logger.error(f"❌ Erro ao carregar modelo: {e}") raise async def analyze_transparency( self, request: TransparencyAnalysisRequest ) -> TransparencyAnalysisResponse: """Executar análise de transparência""" if not self.loaded: raise HTTPException(status_code=503, detail="Modelo não carregado") start_time = datetime.now() try: # Tokenizar texto inputs = self.tokenizer( request.text, return_tensors="pt", truncation=True, padding=True, max_length=512 ).to(self.device) # Executar análises baseadas no tipo solicitado results = {} if request.analysis_type in ["anomaly", "complete"]: anomaly_results = self.model.detect_anomalies( input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"] ) results["anomaly_detection"] = anomaly_results if request.analysis_type in ["financial", "complete"]: financial_results = self.model.analyze_financial_risk( input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"] ) results["financial_analysis"] = financial_results if request.analysis_type in ["legal", "complete"]: legal_results = self.model.check_legal_compliance( input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"] ) results["legal_compliance"] = legal_results # Gerar resumo executivo e recomendações executive_summary, recommendations, overall_confidence = self._generate_summary( results, request.confidence_threshold ) # Calcular tempo de processamento processing_time = (datetime.now() - start_time).total_seconds() # Atualizar estatísticas self.usage_stats["total_requests"] += 1 if "anomaly_detection" in results: self.usage_stats["anomaly_detections"] += 1 if "financial_analysis" in results: self.usage_stats["financial_analyses"] += 1 if "legal_compliance" in results: self.usage_stats["legal_checks"] += 1 # Atualizar tempo médio current_avg = self.usage_stats["average_processing_time"] total_requests = self.usage_stats["total_requests"] self.usage_stats["average_processing_time"] = ( (current_avg * (total_requests - 1) + processing_time) / total_requests ) # Construir response response = TransparencyAnalysisResponse( analysis_id=f"cidadao_{int(start_time.timestamp())}", text=request.text, timestamp=start_time.isoformat(), anomaly_detection=results.get("anomaly_detection"), financial_analysis=results.get("financial_analysis"), legal_compliance=results.get("legal_compliance"), executive_summary=executive_summary, recommendations=recommendations, confidence=overall_confidence, processing_time=processing_time ) return response except Exception as e: logger.error(f"❌ Erro na análise: {e}") raise HTTPException(status_code=500, detail=f"Erro na análise: {str(e)}") async def batch_analyze( self, request: BatchAnalysisRequest ) -> Union[List[TransparencyAnalysisResponse], str]: """Análise em lote""" results = [] for text in request.texts: analysis_request = TransparencyAnalysisRequest( text=text, analysis_type=request.analysis_type, include_explanation=request.include_explanation ) result = await self.analyze_transparency(analysis_request) results.append(result) if request.format == "csv": return self._convert_to_csv(results) return results async def chat_completion(self, request: ChatRequest) -> Union[ChatResponse, Generator]: """Completação de chat""" if not self.loaded: raise HTTPException(status_code=503, detail="Modelo não carregado") self.usage_stats["chat_requests"] += 1 try: # Extrair última mensagem do usuário user_message = request.messages[-1]["content"] # Detectar se é uma pergunta sobre transparência transparency_keywords = [ "contrato", "licitação", "despesa", "gasto", "anomalia", "suspeito", "irregular", "transparência", "corrupção" ] is_transparency_query = any( keyword in user_message.lower() for keyword in transparency_keywords ) if is_transparency_query: # Usar análise especializada analysis_request = TransparencyAnalysisRequest( text=user_message, analysis_type="complete" ) analysis_result = await self.analyze_transparency(analysis_request) # Gerar resposta em linguagem natural response_message = self._format_analysis_for_chat(analysis_result) return ChatResponse( message=response_message, tools_used=["transparency_analysis"], confidence=analysis_result.confidence, sources=["Portal da Transparência", "Cidadão.AI Analysis"] ) else: # Resposta geral do chatbot response_message = self._generate_general_response(user_message) return ChatResponse( message=response_message, tools_used=None, confidence=0.8, sources=None ) except Exception as e: logger.error(f"❌ Erro no chat: {e}") raise HTTPException(status_code=500, detail=f"Erro no chat: {str(e)}") def _generate_summary( self, results: Dict, confidence_threshold: float ) -> Tuple[Dict, List[str], float]: """Gerar resumo executivo e recomendações""" summary = { "overall_risk": "Baixo", "main_findings": [], "alert_level": "Verde" } recommendations = [] confidences = [] # Análise de anomalias if "anomaly_detection" in results: anomaly_data = results["anomaly_detection"] anomalous_count = anomaly_data["summary"]["anomalous_count"] if anomalous_count > 0: summary["main_findings"].append(f"{anomalous_count} anomalias detectadas") summary["alert_level"] = "Vermelho" summary["overall_risk"] = "Alto" recommendations.append("🚨 Investigação imediata necessária devido a anomalias detectadas") # Coletar confiança média high_conf_count = anomaly_data["summary"]["high_confidence_count"] total_samples = anomaly_data["summary"]["total_samples"] if total_samples > 0: confidences.append(high_conf_count / total_samples) # Análise financeira if "financial_analysis" in results: financial_data = results["financial_analysis"] high_risk_count = financial_data["summary"]["high_risk_count"] avg_value = financial_data["summary"]["average_estimated_value"] if high_risk_count > 0: summary["main_findings"].append(f"{high_risk_count} contratos de alto risco financeiro") if summary["overall_risk"] == "Baixo": summary["overall_risk"] = "Médio" summary["alert_level"] = "Amarelo" recommendations.append("⚠️ Revisão financeira recomendada para contratos de alto risco") if avg_value > 10000000: # > 10M summary["main_findings"].append(f"Valor médio elevado: R$ {avg_value:,.2f}") # Análise legal if "legal_compliance" in results: legal_data = results["legal_compliance"] compliance_rate = legal_data["summary"]["compliance_rate"] if compliance_rate < 0.8: summary["main_findings"].append(f"Taxa de conformidade baixa: {compliance_rate:.1%}") recommendations.append("📋 Revisão de processos de compliance necessária") # Calcular confiança geral overall_confidence = sum(confidences) / len(confidences) if confidences else 0.7 # Recomendações padrão if not recommendations: recommendations.append("✅ Análise não identificou problemas críticos") return summary, recommendations, overall_confidence def _format_analysis_for_chat(self, analysis: TransparencyAnalysisResponse) -> str: """Formatar análise para resposta de chat""" response_parts = [] # Resumo executivo summary = analysis.executive_summary response_parts.append(f"📊 **Análise de Transparência**") response_parts.append(f"🎯 **Nível de Risco**: {summary['overall_risk']}") response_parts.append(f"🚨 **Alerta**: {summary['alert_level']}") # Principais descobertas if summary["main_findings"]: response_parts.append("\n🔍 **Principais Descobertas**:") for finding in summary["main_findings"]: response_parts.append(f"• {finding}") # Recomendações response_parts.append("\n💡 **Recomendações**:") for rec in analysis.recommendations: response_parts.append(f"• {rec}") # Detalhes técnicos if analysis.anomaly_detection: anomaly_count = analysis.anomaly_detection["summary"]["anomalous_count"] if anomaly_count > 0: response_parts.append(f"\n⚠️ **Anomalias Detectadas**: {anomaly_count}") if analysis.financial_analysis: high_risk = analysis.financial_analysis["summary"]["high_risk_count"] if high_risk > 0: response_parts.append(f"💰 **Contratos Alto Risco**: {high_risk}") # Confiança response_parts.append(f"\n📈 **Confiança da Análise**: {analysis.confidence:.1%}") return "\n".join(response_parts) def _generate_general_response(self, message: str) -> str: """Gerar resposta geral do chatbot""" # Respostas baseadas em palavras-chave message_lower = message.lower() if any(word in message_lower for word in ["olá", "oi", "bom dia", "boa tarde"]): return ("Olá! Sou o Cidadão.AI, seu assistente de IA especializado em transparência pública brasileira. " "Posso ajudar você a analisar contratos, detectar anomalias e verificar conformidade legal. " "Como posso ajudá-lo hoje?") elif any(word in message_lower for word in ["ajuda", "help", "como"]): return ("🤖 **Cidadão.AI - Suas Funcionalidades**\n\n" "• 🔍 **Análise de Anomalias**: Detectar padrões suspeitos em contratos\n" "• 💰 **Análise Financeira**: Avaliar riscos em gastos públicos\n" "• ⚖️ **Conformidade Legal**: Verificar adequação às normas\n" "• 📊 **Relatórios**: Gerar análises detalhadas\n\n" "Compartilhe um texto de contrato ou despesa pública para análise!") elif any(word in message_lower for word in ["obrigado", "obrigada", "valeu"]): return ("Fico feliz em ajudar! 😊 A transparência pública é fundamental para a democracia. " "Se precisar de mais análises, estarei aqui!") else: return ("Entendo que você tem uma pergunta. Como sou especializado em análise de transparência pública, " "funciono melhor quando você compartilha textos de contratos, licitações ou despesas para análise. " "Você poderia reformular sua pergunta incluindo dados de transparência?") def _convert_to_csv(self, results: List[TransparencyAnalysisResponse]) -> str: """Converter resultados para CSV""" rows = [] for result in results: row = { "analysis_id": result.analysis_id, "timestamp": result.timestamp, "text_preview": result.text[:100] + "..." if len(result.text) > 100 else result.text, "overall_risk": result.executive_summary["overall_risk"], "alert_level": result.executive_summary["alert_level"], "confidence": result.confidence, "processing_time": result.processing_time } # Adicionar detalhes de anomalia if result.anomaly_detection: row["anomalous_count"] = result.anomaly_detection["summary"]["anomalous_count"] # Adicionar detalhes financeiros if result.financial_analysis: row["high_risk_count"] = result.financial_analysis["summary"]["high_risk_count"] row["avg_estimated_value"] = result.financial_analysis["summary"]["average_estimated_value"] # Adicionar conformidade legal if result.legal_compliance: row["compliance_rate"] = result.legal_compliance["summary"]["compliance_rate"] rows.append(row) # Converter para CSV df = pd.DataFrame(rows) csv_buffer = StringIO() df.to_csv(csv_buffer, index=False) return csv_buffer.getvalue() def get_model_info(self) -> ModelInfoResponse: """Obter informações do modelo""" if not self.loaded: raise HTTPException(status_code=503, detail="Modelo não carregado") # Contar parâmetros total_params = sum(p.numel() for p in self.model.parameters()) return ModelInfoResponse( model_name="Cidadão.AI", version="1.0.0", specialization=["anomaly_detection", "financial_analysis", "legal_compliance"], total_parameters=total_params, training_data={ "source": "Portal da Transparência + Dados Sintéticos", "languages": ["pt-BR"], "domains": ["contratos_públicos", "licitações", "despesas_governo"] }, performance_metrics=self.usage_stats ) # === APLICAÇÃO FASTAPI === # Instância global do gerenciador model_manager = CidadaoAIManager() @asynccontextmanager async def lifespan(app: FastAPI): """Gerenciar ciclo de vida da aplicação""" # Startup await model_manager.load_model() yield # Shutdown pass # Criar aplicação FastAPI app = FastAPI( title="Cidadão.AI API", description="API de IA especializada em análise de transparência pública brasileira", version="1.0.0", lifespan=lifespan ) # Configurar CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # === ENDPOINTS === @app.get("/", summary="Informações da API") async def root(): """Endpoint raiz com informações da API""" return { "name": "Cidadão.AI API", "version": "1.0.0", "description": "API de IA especializada em transparência pública brasileira", "docs": "/docs", "health": "/health" } @app.get("/health", summary="Health Check") async def health_check(): """Verificar saúde da API""" return { "status": "healthy" if model_manager.loaded else "loading", "model_loaded": model_manager.loaded, "device": str(model_manager.device), "timestamp": datetime.now().isoformat() } @app.get("/model/info", response_model=ModelInfoResponse, summary="Informações do Modelo") async def get_model_info(): """Obter informações detalhadas do modelo""" return model_manager.get_model_info() @app.post("/analyze", response_model=TransparencyAnalysisResponse, summary="Análise de Transparência") async def analyze_transparency(request: TransparencyAnalysisRequest): """ Analisar texto para detectar anomalias, riscos financeiros e conformidade legal - **text**: Texto do contrato, despesa ou licitação para análise - **analysis_type**: Tipo de análise (anomaly, financial, legal, complete) - **include_explanation**: Incluir explicações detalhadas - **confidence_threshold**: Limiar de confiança para alertas """ return await model_manager.analyze_transparency(request) @app.post("/analyze/batch", summary="Análise em Lote") async def batch_analyze(request: BatchAnalysisRequest): """ Analisar múltiplos textos em lote - **texts**: Lista de textos para análise - **analysis_type**: Tipo de análise - **format**: Formato de saída (json ou csv) """ results = await model_manager.batch_analyze(request) if request.format == "csv": return StreamingResponse( iter([results]), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=cidadao_analysis.csv"} ) return results @app.post("/chat", response_model=ChatResponse, summary="Chat com Cidadão.AI") async def chat_completion(request: ChatRequest): """ Conversar com o Cidadão.AI sobre transparência pública - **messages**: Histórico de mensagens - **temperature**: Criatividade da resposta - **max_tokens**: Tamanho máximo da resposta """ return await model_manager.chat_completion(request) @app.post("/upload", summary="Upload de Arquivo para Análise") async def upload_file(file: UploadFile = File(...)): """ Fazer upload de arquivo (CSV, TXT, JSON) para análise em lote """ if not file.filename.endswith(('.csv', '.txt', '.json')): raise HTTPException( status_code=400, detail="Formato não suportado. Use CSV, TXT ou JSON." ) try: content = await file.read() if file.filename.endswith('.csv'): # Processar CSV df = pd.read_csv(StringIO(content.decode('utf-8'))) texts = df.iloc[:, 0].tolist() # Primeira coluna elif file.filename.endswith('.txt'): # Processar TXT (uma linha por texto) texts = content.decode('utf-8').strip().split('\n') elif file.filename.endswith('.json'): # Processar JSON data = json_utils.loads(content.decode('utf-8')) if isinstance(data, list): texts = [str(item) for item in data] else: texts = [str(data)] # Limitar a 100 textos para evitar sobrecarga texts = texts[:100] # Executar análise em lote batch_request = BatchAnalysisRequest( texts=texts, analysis_type="complete", format="json" ) results = await model_manager.batch_analyze(batch_request) return { "filename": file.filename, "processed_count": len(texts), "results": results } except Exception as e: logger.error(f"❌ Erro no upload: {e}") raise HTTPException(status_code=500, detail=f"Erro ao processar arquivo: {str(e)}") @app.get("/stats", summary="Estatísticas de Uso") async def get_usage_stats(): """Obter estatísticas de uso da API""" return model_manager.usage_stats @app.get("/examples", summary="Exemplos de Uso") async def get_examples(): """Obter exemplos de uso da API""" return { "transparency_analysis": { "description": "Análise completa de transparência", "example": { "text": "Contrato para aquisição de equipamentos médicos no valor de R$ 2.500.000,00 firmado entre Ministério da Saúde e Empresa XYZ LTDA via dispensa de licitação.", "analysis_type": "complete", "include_explanation": True } }, "anomaly_detection": { "description": "Detectar apenas anomalias", "example": { "text": "Contrato emergencial sem licitação para fornecimento de insumos hospitalares. Valor: R$ 15.000.000,00. Empresa com CNPJ irregular.", "analysis_type": "anomaly" } }, "chat": { "description": "Conversar sobre transparência", "example": { "messages": [ {"role": "user", "content": "Analise este contrato: Aquisição de medicamentos por R$ 5 milhões sem licitação."} ] } } } # === EXECUÇÃO === if __name__ == "__main__": # Configurar logging logging.basicConfig(level=logging.INFO) # Executar servidor uvicorn.run( "src.ml.model_api:app", host="0.0.0.0", port=8001, reload=True, log_level="info" )