anderson-ufrj
refactor(performance): replace all json imports with json_utils
9730fbc
"""
Sistema de Persistência Distribuída - Nível Enterprise
Suporte para PostgreSQL, Redis Cluster, e cache inteligente
"""
import asyncio
import logging
import os
from typing import Dict, List, Optional, Any, Union
from datetime import datetime, timedelta
from src.core import json_utils
import hashlib
from enum import Enum
from contextlib import asynccontextmanager
import asyncpg
import redis.asyncio as redis
from redis.asyncio.cluster import RedisCluster
import aiocache
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, Table, Column, String, DateTime, JSON, Text, Integer, Float, Boolean
from pydantic import BaseModel, Field
import structlog
logger = structlog.get_logger(__name__)
class DatabaseConfig(BaseModel):
"""Configuração do sistema de banco de dados"""
# PostgreSQL
postgres_url: str = "postgresql+asyncpg://user:pass@localhost:5432/cidadao_ai"
postgres_pool_size: int = 20
postgres_max_overflow: int = 30
postgres_pool_timeout: int = 30
# Redis Cluster
redis_nodes: List[Dict[str, Union[str, int]]] = [
{"host": "localhost", "port": 7000},
{"host": "localhost", "port": 7001},
{"host": "localhost", "port": 7002}
]
redis_password: Optional[str] = None
redis_decode_responses: bool = True
# Cache TTL configurations
cache_ttl_short: int = 300 # 5 minutes
cache_ttl_medium: int = 3600 # 1 hour
cache_ttl_long: int = 86400 # 24 hours
# Performance tuning
connection_retry_attempts: int = 3
connection_retry_delay: float = 1.0
query_timeout: int = 30
class CacheLayer(Enum):
"""Camadas de cache com diferentes TTLs"""
MEMORY = "memory" # In-process cache
REDIS = "redis" # Distributed cache
PERSISTENT = "db" # Database cache
class Investigation(BaseModel):
"""Modelo para investigações"""
id: str = Field(..., description="ID único da investigação")
user_id: Optional[str] = Field(None, description="ID do usuário")
query: str = Field(..., description="Query da investigação")
status: str = Field("pending", description="Status atual")
results: Optional[Dict[str, Any]] = Field(None, description="Resultados")
metadata: Dict[str, Any] = Field(default_factory=dict)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
completed_at: Optional[datetime] = None
error_message: Optional[str] = None
confidence_score: Optional[float] = None
anomalies_found: int = 0
processing_time_ms: Optional[int] = None
class DatabaseManager:
"""Gerenciador avançado de banco de dados com cache distribuído"""
def __init__(self, config: DatabaseConfig):
self.config = config
self.pg_engine = None
self.redis_cluster = None
self.session_factory = None
self._initialized = False
# Métricas de performance
self.metrics = {
"queries_executed": 0,
"cache_hits": 0,
"cache_misses": 0,
"avg_query_time": 0.0
}
async def initialize(self) -> bool:
"""Inicializar todas as conexões de banco"""
try:
logger.info("Inicializando sistema de persistência...")
# PostgreSQL
await self._init_postgresql()
# Redis Cluster
await self._init_redis_cluster()
# Cache layers
await self._init_cache_layers()
# Health checks
await self._verify_connections()
self._initialized = True
logger.info("✅ Sistema de persistência inicializado com sucesso")
return True
except Exception as e:
logger.error(f"❌ Falha na inicialização do banco: {e}")
return False
async def _init_postgresql(self):
"""Inicializar PostgreSQL com pool de conexões"""
self.pg_engine = create_async_engine(
self.config.postgres_url,
pool_size=self.config.postgres_pool_size,
max_overflow=self.config.postgres_max_overflow,
pool_timeout=self.config.postgres_pool_timeout,
echo=False, # Set True for SQL debugging
future=True
)
self.session_factory = sessionmaker(
self.pg_engine,
class_=AsyncSession,
expire_on_commit=False
)
# Criar tabelas se não existirem
await self._create_tables()
logger.info("✅ PostgreSQL inicializado")
async def _init_redis_cluster(self):
"""Inicializar Redis Cluster"""
try:
# Tentar cluster primeiro
self.redis_cluster = RedisCluster(
startup_nodes=self.config.redis_nodes,
password=self.config.redis_password,
decode_responses=self.config.redis_decode_responses,
skip_full_coverage_check=True,
health_check_interval=30
)
# Testar conexão
await self.redis_cluster.ping()
logger.info("✅ Redis Cluster conectado")
except Exception as e:
logger.warning(f"⚠️ Redis Cluster falhou, usando Redis simples: {e}")
# Fallback para Redis simples
node = self.config.redis_nodes[0]
self.redis_cluster = redis.Redis(
host=node["host"],
port=node["port"],
password=self.config.redis_password,
decode_responses=self.config.redis_decode_responses
)
await self.redis_cluster.ping()
logger.info("✅ Redis simples conectado")
async def _init_cache_layers(self):
"""Configurar camadas de cache"""
# Memory cache
aiocache.caches.set_config({
'default': {
'cache': "aiocache.SimpleMemoryCache",
'serializer': {
'class': "aiocache.serializers.PickleSerializer"
}
},
'redis': {
'cache': "aiocache.RedisCache",
'endpoint': self.config.redis_nodes[0]["host"],
'port': self.config.redis_nodes[0]["port"],
'serializer': {
'class': "aiocache.serializers.JsonSerializer"
}
}
})
logger.info("✅ Cache layers configurados")
async def _create_tables(self):
"""Criar estrutura de tabelas"""
metadata = MetaData()
# Tabela de investigações
investigations_table = Table(
'investigations',
metadata,
Column('id', String(50), primary_key=True),
Column('user_id', String(50), nullable=True),
Column('query', Text, nullable=False),
Column('status', String(20), nullable=False, default='pending'),
Column('results', JSON, nullable=True),
Column('metadata', JSON, nullable=True),
Column('created_at', DateTime, nullable=False),
Column('updated_at', DateTime, nullable=False),
Column('completed_at', DateTime, nullable=True),
Column('error_message', Text, nullable=True),
Column('confidence_score', Float, nullable=True),
Column('anomalies_found', Integer, default=0),
Column('processing_time_ms', Integer, nullable=True)
)
# Tabela de audit logs
audit_logs_table = Table(
'audit_logs',
metadata,
Column('id', String(50), primary_key=True),
Column('investigation_id', String(50), nullable=True),
Column('agent_name', String(100), nullable=False),
Column('action', String(100), nullable=False),
Column('timestamp', DateTime, nullable=False),
Column('data', JSON, nullable=True),
Column('hash_chain', String(64), nullable=True)
)
# Tabela de métricas
metrics_table = Table(
'metrics',
metadata,
Column('id', String(50), primary_key=True),
Column('metric_name', String(100), nullable=False),
Column('metric_value', Float, nullable=False),
Column('tags', JSON, nullable=True),
Column('timestamp', DateTime, nullable=False)
)
async with self.pg_engine.begin() as conn:
await conn.run_sync(metadata.create_all)
logger.info("✅ Tabelas criadas/verificadas")
async def _verify_connections(self):
"""Verificar todas as conexões"""
# Test PostgreSQL
async with self.session_factory() as session:
result = await session.execute("SELECT 1")
assert result.scalar() == 1
# Test Redis
pong = await self.redis_cluster.ping()
assert pong
logger.info("✅ Todas as conexões verificadas")
@asynccontextmanager
async def get_session(self):
"""Context manager para sessões do PostgreSQL"""
async with self.session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def save_investigation(self, investigation: Investigation) -> bool:
"""Salvar investigação no banco"""
try:
async with self.get_session() as session:
query = """
INSERT INTO investigations
(id, user_id, query, status, results, metadata, created_at, updated_at,
completed_at, error_message, confidence_score, anomalies_found, processing_time_ms)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (id) DO UPDATE SET
status = EXCLUDED.status,
results = EXCLUDED.results,
updated_at = EXCLUDED.updated_at,
completed_at = EXCLUDED.completed_at,
error_message = EXCLUDED.error_message,
confidence_score = EXCLUDED.confidence_score,
anomalies_found = EXCLUDED.anomalies_found,
processing_time_ms = EXCLUDED.processing_time_ms
"""
await session.execute(query, [
investigation.id,
investigation.user_id,
investigation.query,
investigation.status,
json_utils.dumps(investigation.results) if investigation.results else None,
json_utils.dumps(investigation.metadata),
investigation.created_at,
investigation.updated_at,
investigation.completed_at,
investigation.error_message,
investigation.confidence_score,
investigation.anomalies_found,
investigation.processing_time_ms
])
# Cache na Redis também
cache_key = f"investigation:{investigation.id}"
await self.redis_cluster.setex(
cache_key,
self.config.cache_ttl_medium,
investigation.model_dump_json()
)
logger.info(f"✅ Investigação {investigation.id} salva")
return True
except Exception as e:
logger.error(f"❌ Erro ao salvar investigação {investigation.id}: {e}")
return False
async def get_investigation(self, investigation_id: str) -> Optional[Investigation]:
"""Buscar investigação por ID (com cache)"""
# Tentar cache primeiro
cache_key = f"investigation:{investigation_id}"
try:
cached = await self.redis_cluster.get(cache_key)
if cached:
self.metrics["cache_hits"] += 1
return Investigation.model_validate_json(cached)
except Exception:
pass
# Se não está no cache, buscar no banco
self.metrics["cache_misses"] += 1
try:
async with self.get_session() as session:
query = "SELECT * FROM investigations WHERE id = $1"
result = await session.execute(query, [investigation_id])
row = result.fetchone()
if row:
investigation = Investigation(
id=row["id"],
user_id=row["user_id"],
query=row["query"],
status=row["status"],
results=json_utils.loads(row["results"]) if row["results"] else None,
metadata=json_utils.loads(row["metadata"]) if row["metadata"] else {},
created_at=row["created_at"],
updated_at=row["updated_at"],
completed_at=row["completed_at"],
error_message=row["error_message"],
confidence_score=row["confidence_score"],
anomalies_found=row["anomalies_found"],
processing_time_ms=row["processing_time_ms"]
)
# Adicionar ao cache
await self.redis_cluster.setex(
cache_key,
self.config.cache_ttl_medium,
investigation.model_dump_json()
)
return investigation
except Exception as e:
logger.error(f"❌ Erro ao buscar investigação {investigation_id}: {e}")
return None
async def cache_set(self, key: str, value: Any, ttl: int = None, layer: CacheLayer = CacheLayer.REDIS) -> bool:
"""Cache genérico com diferentes camadas"""
try:
if layer == CacheLayer.REDIS:
ttl = ttl or self.config.cache_ttl_medium
if isinstance(value, (dict, list)):
value = json_utils.dumps(value)
await self.redis_cluster.setex(key, ttl, value)
return True
except Exception as e:
logger.error(f"❌ Erro ao salvar cache {key}: {e}")
return False
async def cache_get(self, key: str, layer: CacheLayer = CacheLayer.REDIS) -> Optional[Any]:
"""Buscar no cache"""
try:
if layer == CacheLayer.REDIS:
result = await self.redis_cluster.get(key)
if result:
self.metrics["cache_hits"] += 1
try:
return json_utils.loads(result)
except:
return result
else:
self.metrics["cache_misses"] += 1
except Exception as e:
logger.error(f"❌ Erro ao buscar cache {key}: {e}")
return None
async def get_health_status(self) -> Dict[str, Any]:
"""Status de saúde do sistema de persistência"""
status = {
"postgresql": {"status": "unknown", "latency_ms": None},
"redis": {"status": "unknown", "latency_ms": None},
"cache_metrics": self.metrics,
"timestamp": datetime.utcnow().isoformat()
}
# Test PostgreSQL
try:
start_time = asyncio.get_event_loop().time()
async with self.get_session() as session:
await session.execute("SELECT 1")
pg_latency = (asyncio.get_event_loop().time() - start_time) * 1000
status["postgresql"] = {
"status": "healthy",
"latency_ms": round(pg_latency, 2)
}
except Exception as e:
status["postgresql"] = {
"status": "unhealthy",
"error": str(e)
}
# Test Redis
try:
start_time = asyncio.get_event_loop().time()
await self.redis_cluster.ping()
redis_latency = (asyncio.get_event_loop().time() - start_time) * 1000
status["redis"] = {
"status": "healthy",
"latency_ms": round(redis_latency, 2)
}
except Exception as e:
status["redis"] = {
"status": "unhealthy",
"error": str(e)
}
return status
async def cleanup(self):
"""Cleanup de recursos"""
try:
if self.redis_cluster:
await self.redis_cluster.close()
if self.pg_engine:
await self.pg_engine.dispose()
logger.info("✅ Cleanup do sistema de persistência concluído")
except Exception as e:
logger.error(f"❌ Erro no cleanup: {e}")
# Singleton instance
_db_manager: Optional[DatabaseManager] = None
_db_pool: Optional[asyncpg.Pool] = None
async def get_database_manager() -> DatabaseManager:
"""Obter instância singleton do database manager"""
global _db_manager
if _db_manager is None or not _db_manager._initialized:
config = DatabaseConfig()
_db_manager = DatabaseManager(config)
await _db_manager.initialize()
return _db_manager
async def get_db_pool() -> asyncpg.Pool:
"""Get PostgreSQL connection pool for direct queries"""
global _db_pool
if _db_pool is None:
database_url = os.getenv("DATABASE_URL")
if not database_url:
raise ValueError("DATABASE_URL environment variable is required")
_db_pool = await asyncpg.create_pool(
database_url,
min_size=10,
max_size=20,
command_timeout=60,
max_queries=50000,
max_inactive_connection_lifetime=300
)
return _db_pool
async def cleanup_database():
"""Cleanup global do sistema de banco"""
global _db_manager
if _db_manager:
await _db_manager.cleanup()
_db_manager = None
if __name__ == "__main__":
# Teste do sistema
import asyncio
async def test_database_system():
"""Teste completo do sistema de persistência"""
print("🧪 Testando sistema de persistência...")
# Inicializar
db = await get_database_manager()
# Teste de investigação
investigation = Investigation(
id="test_001",
user_id="user_123",
query="Contratos suspeitos de 2024",
status="completed",
results={"anomalies": 5, "contracts": 100},
confidence_score=0.89,
anomalies_found=5,
processing_time_ms=1250
)
# Salvar
success = await db.save_investigation(investigation)
print(f"✅ Salvar investigação: {success}")
# Buscar
retrieved = await db.get_investigation("test_001")
print(f"✅ Buscar investigação: {retrieved is not None}")
# Cache test
await db.cache_set("test_key", {"data": "test"}, ttl=60)
cached_data = await db.cache_get("test_key")
print(f"✅ Cache funcionando: {cached_data is not None}")
# Health check
health = await db.get_health_status()
print(f"✅ Health status: {health}")
# Cleanup
await cleanup_database()
print("✅ Teste concluído!")
asyncio.run(test_database_system())