anderson-ufrj
feat(cli): implement comprehensive CLI commands for investigate, analyze and report
0f41dac
| """ | |
| Module: cli.commands.investigate | |
| Description: Investigation command for CLI | |
| Author: Anderson H. Silva | |
| Date: 2025-01-25 | |
| License: Proprietary - All rights reserved | |
| """ | |
| import asyncio | |
| import sys | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional, List, Dict, Any | |
| from enum import Enum | |
| import typer | |
| from rich.console import Console | |
| from rich.progress import Progress, SpinnerColumn, TextColumn | |
| from rich.table import Table | |
| from rich.panel import Panel | |
| from rich.text import Text | |
| from rich.syntax import Syntax | |
| import httpx | |
| from pydantic import BaseModel, Field | |
| # CLI app | |
| app = typer.Typer(help="Execute investigations on government data") | |
| console = Console() | |
| class OutputFormat(str, Enum): | |
| """Output format options.""" | |
| JSON = "json" | |
| MARKDOWN = "markdown" | |
| HTML = "html" | |
| TABLE = "table" | |
| class InvestigationRequest(BaseModel): | |
| """Investigation request model.""" | |
| query: str = Field(description="Natural language query") | |
| organization_code: Optional[str] = Field(None, description="Organization code") | |
| year: Optional[int] = Field(None, description="Year filter") | |
| threshold: float = Field(0.7, description="Anomaly detection threshold") | |
| max_results: int = Field(100, description="Maximum results") | |
| include_contracts: bool = Field(True, description="Include contract analysis") | |
| class InvestigationResult(BaseModel): | |
| """Investigation result model.""" | |
| id: str | |
| status: str | |
| created_at: datetime | |
| summary: Optional[str] = None | |
| anomalies_count: int = 0 | |
| total_analyzed: int = 0 | |
| risk_score: float = 0.0 | |
| anomalies: List[Dict[str, Any]] = [] | |
| contracts: List[Dict[str, Any]] = [] | |
| async def call_api( | |
| endpoint: str, | |
| method: str = "GET", | |
| data: Optional[Dict[str, Any]] = None, | |
| params: Optional[Dict[str, Any]] = None, | |
| auth_token: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """Make API call to backend.""" | |
| # Get API URL from environment or use default | |
| api_url = "http://localhost:8000" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "User-Agent": "Cidadao.AI-CLI/1.0" | |
| } | |
| if auth_token: | |
| headers["Authorization"] = f"Bearer {auth_token}" | |
| async with httpx.AsyncClient() as client: | |
| response = await client.request( | |
| method=method, | |
| url=f"{api_url}{endpoint}", | |
| headers=headers, | |
| json=data, | |
| params=params, | |
| timeout=60.0 | |
| ) | |
| if response.status_code >= 400: | |
| error_detail = response.json().get("detail", "Unknown error") | |
| raise Exception(f"API Error: {error_detail}") | |
| return response.json() | |
| def format_anomaly(anomaly: Dict[str, Any]) -> str: | |
| """Format anomaly for display.""" | |
| severity_color = "red" if anomaly.get("severity", 0) >= 0.8 else "yellow" if anomaly.get("severity", 0) >= 0.5 else "green" | |
| return ( | |
| f"[{severity_color}]● Severidade: {anomaly.get('severity', 0):.2f}[/{severity_color}]\n" | |
| f" Tipo: {anomaly.get('type', 'Unknown')}\n" | |
| f" Descrição: {anomaly.get('description', 'N/A')}\n" | |
| f" Explicação: {anomaly.get('explanation', 'N/A')}" | |
| ) | |
| def display_results_table(result: InvestigationResult): | |
| """Display results in table format.""" | |
| # Summary panel | |
| summary_text = f""" | |
| [bold]Investigation ID:[/bold] {result.id} | |
| [bold]Status:[/bold] {result.status} | |
| [bold]Created:[/bold] {result.created_at.strftime('%Y-%m-%d %H:%M:%S')} | |
| [bold]Risk Score:[/bold] [{get_risk_color(result.risk_score)}]{result.risk_score:.2f}[/] | |
| [bold]Anomalies Found:[/bold] {result.anomalies_count} / {result.total_analyzed} | |
| """ | |
| console.print(Panel(summary_text.strip(), title="📊 Investigation Summary", border_style="blue")) | |
| # Anomalies table | |
| if result.anomalies: | |
| console.print("\n[bold]🚨 Anomalies Detected:[/bold]") | |
| table = Table(show_header=True, header_style="bold magenta") | |
| table.add_column("Type", style="dim", width=20) | |
| table.add_column("Severity", justify="center") | |
| table.add_column("Description", width=50) | |
| table.add_column("Contract", style="dim") | |
| for anomaly in result.anomalies[:10]: # Show first 10 | |
| severity = anomaly.get("severity", 0) | |
| severity_color = get_risk_color(severity) | |
| table.add_row( | |
| anomaly.get("type", "Unknown"), | |
| f"[{severity_color}]{severity:.2f}[/{severity_color}]", | |
| anomaly.get("description", "N/A")[:50], | |
| anomaly.get("contract_id", "N/A") | |
| ) | |
| console.print(table) | |
| if len(result.anomalies) > 10: | |
| console.print(f"\n[dim]... and {len(result.anomalies) - 10} more anomalies[/dim]") | |
| def display_results_markdown(result: InvestigationResult): | |
| """Display results in markdown format.""" | |
| markdown = f"""# Investigation Report | |
| **ID**: {result.id} | |
| **Status**: {result.status} | |
| **Created**: {result.created_at.strftime('%Y-%m-%d %H:%M:%S')} | |
| **Risk Score**: {result.risk_score:.2f} | |
| **Anomalies**: {result.anomalies_count} found out of {result.total_analyzed} analyzed | |
| ## Summary | |
| {result.summary or 'Investigation completed successfully.'} | |
| ## Anomalies Detected | |
| """ | |
| for i, anomaly in enumerate(result.anomalies, 1): | |
| markdown += f"""### Anomaly {i} | |
| - **Type**: {anomaly.get('type', 'Unknown')} | |
| - **Severity**: {anomaly.get('severity', 0):.2f} | |
| - **Description**: {anomaly.get('description', 'N/A')} | |
| - **Explanation**: {anomaly.get('explanation', 'N/A')} | |
| - **Contract ID**: {anomaly.get('contract_id', 'N/A')} | |
| """ | |
| syntax = Syntax(markdown, "markdown", theme="monokai", line_numbers=False) | |
| console.print(syntax) | |
| def display_results_json(result: InvestigationResult): | |
| """Display results in JSON format.""" | |
| import json | |
| json_str = json.dumps(result.dict(), indent=2, default=str, ensure_ascii=False) | |
| syntax = Syntax(json_str, "json", theme="monokai", line_numbers=False) | |
| console.print(syntax) | |
| def get_risk_color(risk_score: float) -> str: | |
| """Get color based on risk score.""" | |
| if risk_score >= 0.8: | |
| return "red" | |
| elif risk_score >= 0.5: | |
| return "yellow" | |
| else: | |
| return "green" | |
| def investigate( | |
| query: str = typer.Argument(help="Natural language description of what to investigate"), | |
| org: Optional[str] = typer.Option(None, "--org", "-o", help="Organization code to focus investigation"), | |
| year: Optional[int] = typer.Option(None, "--year", "-y", help="Year to investigate"), | |
| threshold: float = typer.Option(0.7, "--threshold", "-t", min=0.0, max=1.0, help="Anomaly detection threshold"), | |
| output: OutputFormat = typer.Option(OutputFormat.TABLE, "--output", "-f", help="Output format"), | |
| max_results: int = typer.Option(100, "--max-results", "-m", help="Maximum number of results"), | |
| no_contracts: bool = typer.Option(False, "--no-contracts", help="Exclude contract analysis"), | |
| save: Optional[Path] = typer.Option(None, "--save", "-s", help="Save results to file"), | |
| api_key: Optional[str] = typer.Option(None, "--api-key", envvar="CIDADAO_API_KEY", help="API key for authentication"), | |
| ): | |
| """ | |
| 🔍 Execute an investigation on government spending data. | |
| This command starts a comprehensive investigation using multiple AI agents | |
| to analyze government contracts and spending patterns for anomalies. | |
| Examples: | |
| cidadao investigate "contratos suspeitos em 2024" | |
| cidadao investigate "gastos com educação" --org MIN_EDUCACAO --year 2024 | |
| cidadao investigate "anomalias em licitações" --threshold 0.8 --output json | |
| """ | |
| # Display start message | |
| console.print(f"\n[bold blue]🔍 Starting Investigation[/bold blue]") | |
| console.print(f"Query: [green]{query}[/green]") | |
| if org: | |
| console.print(f"Organization: [cyan]{org}[/cyan]") | |
| if year: | |
| console.print(f"Year: [cyan]{year}[/cyan]") | |
| console.print(f"Anomaly threshold: [yellow]{threshold}[/yellow]") | |
| console.print(f"Max results: [yellow]{max_results}[/yellow]") | |
| console.print() | |
| # Create investigation request | |
| request = InvestigationRequest( | |
| query=query, | |
| organization_code=org, | |
| year=year, | |
| threshold=threshold, | |
| max_results=max_results, | |
| include_contracts=not no_contracts | |
| ) | |
| try: | |
| # Execute investigation with progress | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| console=console | |
| ) as progress: | |
| task = progress.add_task("Initializing investigation...", total=None) | |
| # Start investigation | |
| progress.update(task, description="Starting investigation...") | |
| result_data = asyncio.run( | |
| call_api( | |
| "/api/v1/investigations/analyze", | |
| method="POST", | |
| data=request.dict(), | |
| auth_token=api_key | |
| ) | |
| ) | |
| investigation_id = result_data.get("investigation_id") | |
| progress.update(task, description=f"Investigation ID: {investigation_id}") | |
| # Poll for results | |
| while True: | |
| progress.update(task, description="Checking investigation status...") | |
| status_data = asyncio.run( | |
| call_api( | |
| f"/api/v1/investigations/{investigation_id}", | |
| auth_token=api_key | |
| ) | |
| ) | |
| status = status_data.get("status", "unknown") | |
| progress_pct = status_data.get("progress", 0) * 100 | |
| progress.update( | |
| task, | |
| description=f"Status: {status} ({progress_pct:.0f}%)" | |
| ) | |
| if status in ["completed", "failed"]: | |
| break | |
| asyncio.run(asyncio.sleep(2)) | |
| # Process results | |
| if status == "failed": | |
| console.print(f"[red]❌ Investigation failed: {status_data.get('error', 'Unknown error')}[/red]") | |
| raise typer.Exit(1) | |
| # Create result object | |
| result = InvestigationResult( | |
| id=investigation_id, | |
| status=status, | |
| created_at=datetime.fromisoformat(status_data["created_at"]), | |
| summary=status_data.get("summary"), | |
| anomalies_count=len(status_data.get("anomalies", [])), | |
| total_analyzed=status_data.get("total_analyzed", 0), | |
| risk_score=status_data.get("risk_score", 0.0), | |
| anomalies=status_data.get("anomalies", []), | |
| contracts=status_data.get("contracts", []) | |
| ) | |
| # Display results based on format | |
| console.print() | |
| if output == OutputFormat.TABLE: | |
| display_results_table(result) | |
| elif output == OutputFormat.MARKDOWN: | |
| display_results_markdown(result) | |
| elif output == OutputFormat.JSON: | |
| display_results_json(result) | |
| elif output == OutputFormat.HTML: | |
| # For HTML, we'll convert markdown to HTML | |
| console.print("[yellow]HTML output not yet implemented, showing markdown[/yellow]") | |
| display_results_markdown(result) | |
| # Save results if requested | |
| if save: | |
| save_path = save.expanduser().resolve() | |
| if output == OutputFormat.JSON: | |
| import json | |
| with open(save_path, "w", encoding="utf-8") as f: | |
| json.dump(result.dict(), f, indent=2, default=str, ensure_ascii=False) | |
| else: | |
| # Save as text for other formats | |
| with open(save_path, "w", encoding="utf-8") as f: | |
| if output == OutputFormat.MARKDOWN: | |
| f.write(generate_markdown_report(result)) | |
| else: | |
| f.write(generate_text_report(result)) | |
| console.print(f"\n[green]✅ Results saved to: {save_path}[/green]") | |
| # Summary message | |
| if result.anomalies_count > 0: | |
| risk_color = get_risk_color(result.risk_score) | |
| console.print( | |
| f"\n[bold {risk_color}]⚠️ Investigation complete: " | |
| f"{result.anomalies_count} anomalies detected " | |
| f"(risk score: {result.risk_score:.2f})[/bold {risk_color}]" | |
| ) | |
| else: | |
| console.print("\n[green]✅ Investigation complete: No anomalies detected[/green]") | |
| except Exception as e: | |
| console.print(f"[red]❌ Error: {e}[/red]") | |
| raise typer.Exit(1) | |
| def generate_markdown_report(result: InvestigationResult) -> str: | |
| """Generate full markdown report.""" | |
| report = f"""# Cidadão.AI Investigation Report | |
| **Investigation ID**: {result.id} | |
| **Date**: {result.created_at.strftime('%Y-%m-%d %H:%M:%S')} | |
| **Status**: {result.status} | |
| ## Executive Summary | |
| **Risk Score**: {result.risk_score:.2f} | |
| **Anomalies Found**: {result.anomalies_count} | |
| **Total Items Analyzed**: {result.total_analyzed} | |
| {result.summary or 'Investigation completed successfully.'} | |
| ## Detailed Findings | |
| ### Anomalies Detected | |
| """ | |
| for i, anomaly in enumerate(result.anomalies, 1): | |
| report += f"""#### Anomaly {i} | |
| - **Type**: {anomaly.get('type', 'Unknown')} | |
| - **Severity**: {anomaly.get('severity', 0):.2f} | |
| - **Description**: {anomaly.get('description', 'N/A')} | |
| - **Explanation**: {anomaly.get('explanation', 'N/A')} | |
| - **Contract ID**: {anomaly.get('contract_id', 'N/A')} | |
| - **Value**: R$ {anomaly.get('value', 0):,.2f} | |
| """ | |
| if result.contracts: | |
| report += "\n### Related Contracts\n\n" | |
| for contract in result.contracts[:10]: | |
| report += f"""- **{contract.get('id', 'N/A')}**: {contract.get('description', 'N/A')} | |
| - Value: R$ {contract.get('value', 0):,.2f} | |
| - Supplier: {contract.get('supplier', 'N/A')} | |
| """ | |
| report += "\n---\n*Report generated by Cidadão.AI - Multi-agent AI system for government transparency*" | |
| return report | |
| def generate_text_report(result: InvestigationResult) -> str: | |
| """Generate plain text report.""" | |
| report = f"""CIDADÃO.AI INVESTIGATION REPORT | |
| ================================ | |
| Investigation ID: {result.id} | |
| Date: {result.created_at.strftime('%Y-%m-%d %H:%M:%S')} | |
| Status: {result.status} | |
| SUMMARY | |
| ------- | |
| Risk Score: {result.risk_score:.2f} | |
| Anomalies Found: {result.anomalies_count} | |
| Total Analyzed: {result.total_analyzed} | |
| {result.summary or 'Investigation completed successfully.'} | |
| ANOMALIES | |
| --------- | |
| """ | |
| for i, anomaly in enumerate(result.anomalies, 1): | |
| report += f""" | |
| {i}. {anomaly.get('type', 'Unknown')} (Severity: {anomaly.get('severity', 0):.2f}) | |
| Description: {anomaly.get('description', 'N/A')} | |
| Contract: {anomaly.get('contract_id', 'N/A')} | |
| Value: R$ {anomaly.get('value', 0):,.2f} | |
| """ | |
| return report | |
| if __name__ == "__main__": | |
| app() |