anderson-ufrj
feat(database): allow app to run without database connection for HuggingFace Spaces
316a293
| """ | |
| Module: services.connection_pool_service | |
| Description: Advanced connection pooling management | |
| Author: Anderson H. Silva | |
| Date: 2025-01-25 | |
| License: Proprietary - All rights reserved | |
| """ | |
| import asyncio | |
| from typing import Dict, Any, Optional, List | |
| from datetime import datetime, timedelta, timezone | |
| from contextlib import asynccontextmanager | |
| import time | |
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine | |
| from sqlalchemy.pool import NullPool, StaticPool | |
| from sqlalchemy.orm import sessionmaker | |
| from sqlalchemy import text, event | |
| import redis.asyncio as redis | |
| from src.core import get_logger | |
| from src.core.config import settings | |
| logger = get_logger(__name__) | |
| class ConnectionStats: | |
| """Track connection pool statistics.""" | |
| def __init__(self): | |
| self.connections_created = 0 | |
| self.connections_closed = 0 | |
| self.connections_recycled = 0 | |
| self.connection_errors = 0 | |
| self.wait_time_total = 0.0 | |
| self.wait_count = 0 | |
| self.active_connections = 0 | |
| self.peak_connections = 0 | |
| self.last_reset = datetime.now(timezone.utc) | |
| def record_connection_created(self): | |
| """Record new connection creation.""" | |
| self.connections_created += 1 | |
| self.active_connections += 1 | |
| if self.active_connections > self.peak_connections: | |
| self.peak_connections = self.active_connections | |
| def record_connection_closed(self): | |
| """Record connection closure.""" | |
| self.connections_closed += 1 | |
| self.active_connections = max(0, self.active_connections - 1) | |
| def record_wait(self, wait_time: float): | |
| """Record connection wait time.""" | |
| self.wait_time_total += wait_time | |
| self.wait_count += 1 | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get current statistics.""" | |
| uptime = (datetime.now(timezone.utc) - self.last_reset).total_seconds() | |
| return { | |
| "connections_created": self.connections_created, | |
| "connections_closed": self.connections_closed, | |
| "connections_recycled": self.connections_recycled, | |
| "connection_errors": self.connection_errors, | |
| "active_connections": self.active_connections, | |
| "peak_connections": self.peak_connections, | |
| "average_wait_time": self.wait_time_total / max(self.wait_count, 1), | |
| "total_waits": self.wait_count, | |
| "uptime_seconds": uptime, | |
| "connections_per_second": self.connections_created / max(uptime, 1) | |
| } | |
| class ConnectionPoolService: | |
| """Advanced connection pool management service.""" | |
| def __init__(self): | |
| """Initialize connection pool service.""" | |
| self._engines: Dict[str, AsyncEngine] = {} | |
| self._redis_pools: Dict[str, redis.ConnectionPool] = {} | |
| self._stats: Dict[str, ConnectionStats] = {} | |
| self._pool_configs: Dict[str, Dict[str, Any]] = {} | |
| # Default pool configurations | |
| self.DEFAULT_DB_POOL_CONFIG = { | |
| "pool_size": settings.database_pool_size, | |
| "max_overflow": settings.database_pool_overflow, | |
| "pool_timeout": settings.database_pool_timeout, | |
| "pool_recycle": 3600, # Recycle connections after 1 hour | |
| "pool_pre_ping": True, # Test connections before use | |
| "echo_pool": settings.debug, | |
| "pool_use_lifo": True, # Last In First Out for better cache locality | |
| } | |
| self.DEFAULT_REDIS_POOL_CONFIG = { | |
| "max_connections": settings.redis_pool_size, | |
| "decode_responses": True, | |
| "socket_keepalive": True, | |
| "socket_keepalive_options": { | |
| 1: 1, # TCP_KEEPIDLE | |
| 2: 1, # TCP_KEEPINTVL | |
| 3: 5, # TCP_KEEPCNT | |
| }, | |
| "retry_on_timeout": True, | |
| "health_check_interval": 30 | |
| } | |
| async def initialize(self): | |
| """Initialize connection pools.""" | |
| try: | |
| # Initialize main database pool | |
| await self.create_db_pool( | |
| "main", | |
| settings.get_database_url(async_mode=True), | |
| self.DEFAULT_DB_POOL_CONFIG | |
| ) | |
| # Initialize read replica pool if configured | |
| if hasattr(settings, "database_read_url"): | |
| read_config = self.DEFAULT_DB_POOL_CONFIG.copy() | |
| read_config["pool_size"] = settings.database_pool_size * 2 # More connections for reads | |
| await self.create_db_pool( | |
| "read", | |
| settings.database_read_url, | |
| read_config | |
| ) | |
| # Initialize Redis pools | |
| await self.create_redis_pool( | |
| "main", | |
| settings.redis_url, | |
| self.DEFAULT_REDIS_POOL_CONFIG | |
| ) | |
| # Initialize cache Redis pool with different settings | |
| cache_config = self.DEFAULT_REDIS_POOL_CONFIG.copy() | |
| cache_config["max_connections"] = settings.redis_pool_size * 2 | |
| await self.create_redis_pool( | |
| "cache", | |
| settings.redis_url, | |
| cache_config | |
| ) | |
| logger.info("connection_pools_initialized") | |
| except Exception as e: | |
| logger.error( | |
| "connection_pool_initialization_failed", | |
| error=str(e), | |
| exc_info=True | |
| ) | |
| raise | |
| async def create_db_pool( | |
| self, | |
| name: str, | |
| url: str, | |
| config: Dict[str, Any] | |
| ) -> AsyncEngine: | |
| """Create database connection pool.""" | |
| try: | |
| # Filter out pool-specific config when using NullPool | |
| nullpool_config = { | |
| k: v for k, v in config.items() | |
| if k not in ['pool_size', 'max_overflow', 'pool_timeout', 'pool_use_lifo'] | |
| } | |
| # Create engine with async-compatible pool | |
| engine = create_async_engine( | |
| url, | |
| poolclass=NullPool, | |
| **nullpool_config | |
| ) | |
| # Initialize stats | |
| self._stats[f"db_{name}"] = ConnectionStats() | |
| stats = self._stats[f"db_{name}"] | |
| # Add event listeners for monitoring | |
| def on_connect(dbapi_conn, connection_record): | |
| stats.record_connection_created() | |
| connection_record.info['connected_at'] = time.time() | |
| logger.debug(f"Database connection created for pool '{name}'") | |
| def on_close(dbapi_conn, connection_record): | |
| stats.record_connection_closed() | |
| if 'connected_at' in connection_record.info: | |
| lifetime = time.time() - connection_record.info['connected_at'] | |
| logger.debug(f"Database connection closed for pool '{name}', lifetime: {lifetime:.1f}s") | |
| def on_checkout(dbapi_conn, connection_record, connection_proxy): | |
| connection_record.info['checkout_at'] = time.time() | |
| def on_checkin(dbapi_conn, connection_record): | |
| if 'checkout_at' in connection_record.info: | |
| usage_time = time.time() - connection_record.info['checkout_at'] | |
| if usage_time > 1.0: # Log slow connection usage | |
| logger.warning(f"Slow connection usage in pool '{name}': {usage_time:.2f}s") | |
| # Store engine and config | |
| self._engines[name] = engine | |
| self._pool_configs[f"db_{name}"] = config | |
| # Test connection | |
| async with engine.connect() as conn: | |
| await conn.execute(text("SELECT 1")) | |
| logger.info( | |
| f"database_pool_created", | |
| pool=name, | |
| size=config["pool_size"], | |
| max_overflow=config["max_overflow"] | |
| ) | |
| return engine | |
| except Exception as e: | |
| logger.error( | |
| f"database_pool_creation_failed", | |
| pool=name, | |
| error=str(e) | |
| ) | |
| raise | |
| async def create_redis_pool( | |
| self, | |
| name: str, | |
| url: str, | |
| config: Dict[str, Any] | |
| ) -> redis.ConnectionPool: | |
| """Create Redis connection pool.""" | |
| try: | |
| # Parse password from URL if present | |
| if settings.redis_password: | |
| config["password"] = settings.redis_password.get_secret_value() | |
| # Create connection pool | |
| pool = redis.ConnectionPool.from_url( | |
| url, | |
| **config | |
| ) | |
| # Initialize stats | |
| self._stats[f"redis_{name}"] = ConnectionStats() | |
| # Store pool and config | |
| self._redis_pools[name] = pool | |
| self._pool_configs[f"redis_{name}"] = config | |
| # Test connection | |
| client = redis.Redis(connection_pool=pool) | |
| await client.ping() | |
| await client.aclose() | |
| logger.info( | |
| "redis_pool_created", | |
| pool=name, | |
| max_connections=config["max_connections"] | |
| ) | |
| return pool | |
| except Exception as e: | |
| logger.error( | |
| "redis_pool_creation_failed", | |
| pool=name, | |
| error=str(e) | |
| ) | |
| raise | |
| async def get_db_session( | |
| self, | |
| pool_name: str = "main", | |
| read_only: bool = False | |
| ): | |
| """Get database session from pool.""" | |
| # Use read pool if available and requested | |
| if read_only and "read" in self._engines: | |
| pool_name = "read" | |
| engine = self._engines.get(pool_name) | |
| if not engine: | |
| # No database available - return None | |
| logger.warning(f"Database pool '{pool_name}' not found - running without database") | |
| yield None | |
| return | |
| # Track wait time | |
| start_time = time.time() | |
| async_session = sessionmaker( | |
| engine, | |
| class_=AsyncSession, | |
| expire_on_commit=False | |
| ) | |
| async with async_session() as session: | |
| wait_time = time.time() - start_time | |
| if wait_time > 0.1: | |
| self._stats[f"db_{pool_name}"].record_wait(wait_time) | |
| try: | |
| yield session | |
| await session.commit() | |
| except Exception: | |
| await session.rollback() | |
| raise | |
| finally: | |
| await session.close() | |
| async def get_redis_client( | |
| self, | |
| pool_name: str = "main" | |
| ) -> redis.Redis: | |
| """Get Redis client from pool.""" | |
| pool = self._redis_pools.get(pool_name) | |
| if not pool: | |
| raise ValueError(f"Redis pool '{pool_name}' not found") | |
| return redis.Redis(connection_pool=pool) | |
| async def get_pool_stats(self) -> Dict[str, Any]: | |
| """Get statistics for all connection pools.""" | |
| stats = { | |
| "database_pools": {}, | |
| "redis_pools": {}, | |
| "recommendations": [] | |
| } | |
| # Database pool stats | |
| for name, engine in self._engines.items(): | |
| pool = engine.pool | |
| pool_stats = self._stats.get(f"db_{name}") | |
| if pool_stats: | |
| db_stats = pool_stats.get_stats() | |
| # Add pool-specific stats | |
| if hasattr(pool, 'size'): | |
| db_stats["pool_size"] = pool.size() | |
| if hasattr(pool, 'checked_out'): | |
| db_stats["checked_out"] = pool.checked_out() | |
| if hasattr(pool, 'overflow'): | |
| db_stats["overflow"] = pool.overflow() | |
| stats["database_pools"][name] = db_stats | |
| # Generate recommendations | |
| if db_stats.get("average_wait_time", 0) > 0.5: | |
| stats["recommendations"].append({ | |
| "pool": f"db_{name}", | |
| "issue": "High wait times", | |
| "suggestion": "Increase pool_size or max_overflow" | |
| }) | |
| if db_stats.get("connection_errors", 0) > 10: | |
| stats["recommendations"].append({ | |
| "pool": f"db_{name}", | |
| "issue": "High error rate", | |
| "suggestion": "Check database health and network stability" | |
| }) | |
| # Redis pool stats | |
| for name, pool in self._redis_pools.items(): | |
| pool_stats = self._stats.get(f"redis_{name}") | |
| if pool_stats: | |
| redis_stats = pool_stats.get_stats() | |
| # Add Redis-specific stats | |
| redis_stats["created_connections"] = pool.created_connections | |
| redis_stats["available_connections"] = len(pool._available_connections) | |
| redis_stats["in_use_connections"] = len(pool._in_use_connections) | |
| stats["redis_pools"][name] = redis_stats | |
| # Recommendations | |
| if redis_stats["in_use_connections"] > pool.max_connections * 0.8: | |
| stats["recommendations"].append({ | |
| "pool": f"redis_{name}", | |
| "issue": "Near connection limit", | |
| "suggestion": "Increase max_connections" | |
| }) | |
| return stats | |
| async def optimize_pools(self) -> Dict[str, Any]: | |
| """Analyze and optimize connection pools.""" | |
| optimizations = { | |
| "performed": [], | |
| "suggested": [] | |
| } | |
| # Check database pools | |
| for name, engine in self._engines.items(): | |
| pool = engine.pool | |
| stats = self._stats.get(f"db_{name}") | |
| if stats: | |
| # Auto-adjust pool size based on usage | |
| current_config = self._pool_configs.get(f"db_{name}", {}) | |
| current_size = current_config.get("pool_size", 10) | |
| if stats.peak_connections > current_size * 0.9: | |
| suggested_size = min(current_size * 2, 50) | |
| optimizations["suggested"].append({ | |
| "pool": f"db_{name}", | |
| "action": "increase_pool_size", | |
| "current": current_size, | |
| "suggested": suggested_size, | |
| "reason": f"Peak usage ({stats.peak_connections}) near limit" | |
| }) | |
| # Check for idle connections | |
| if hasattr(pool, 'size') and hasattr(pool, 'checked_out'): | |
| idle_ratio = 1 - (pool.checked_out() / max(pool.size(), 1)) | |
| if idle_ratio > 0.7 and current_size > 5: | |
| suggested_size = max(5, current_size // 2) | |
| optimizations["suggested"].append({ | |
| "pool": f"db_{name}", | |
| "action": "decrease_pool_size", | |
| "current": current_size, | |
| "suggested": suggested_size, | |
| "reason": f"High idle ratio ({idle_ratio:.1%})" | |
| }) | |
| # Check Redis pools | |
| for name, pool in self._redis_pools.items(): | |
| stats = self._stats.get(f"redis_{name}") | |
| if stats: | |
| current_max = pool.max_connections | |
| if stats.peak_connections > current_max * 0.8: | |
| suggested_max = min(current_max * 2, 100) | |
| optimizations["suggested"].append({ | |
| "pool": f"redis_{name}", | |
| "action": "increase_max_connections", | |
| "current": current_max, | |
| "suggested": suggested_max, | |
| "reason": f"Peak usage ({stats.peak_connections}) near limit" | |
| }) | |
| return optimizations | |
| async def health_check(self) -> Dict[str, Any]: | |
| """Perform health check on all pools.""" | |
| health = { | |
| "status": "healthy", | |
| "pools": {}, | |
| "errors": [] | |
| } | |
| # Check database pools | |
| for name, engine in self._engines.items(): | |
| try: | |
| async with engine.connect() as conn: | |
| result = await conn.execute(text("SELECT 1")) | |
| health["pools"][f"db_{name}"] = { | |
| "status": "healthy", | |
| "response_time_ms": 0 # Would need to measure | |
| } | |
| except Exception as e: | |
| health["status"] = "unhealthy" | |
| health["pools"][f"db_{name}"] = { | |
| "status": "unhealthy", | |
| "error": str(e) | |
| } | |
| health["errors"].append(f"Database pool '{name}': {str(e)}") | |
| # Check Redis pools | |
| for name, pool in self._redis_pools.items(): | |
| try: | |
| client = redis.Redis(connection_pool=pool) | |
| start = time.time() | |
| await client.ping() | |
| response_time = (time.time() - start) * 1000 | |
| health["pools"][f"redis_{name}"] = { | |
| "status": "healthy", | |
| "response_time_ms": round(response_time, 2) | |
| } | |
| await client.aclose() | |
| except Exception as e: | |
| health["status"] = "unhealthy" | |
| health["pools"][f"redis_{name}"] = { | |
| "status": "unhealthy", | |
| "error": str(e) | |
| } | |
| health["errors"].append(f"Redis pool '{name}': {str(e)}") | |
| return health | |
| async def cleanup(self): | |
| """Clean up all connection pools.""" | |
| # Close database engines | |
| for name, engine in self._engines.items(): | |
| try: | |
| await engine.dispose() | |
| logger.info(f"Database pool '{name}' closed") | |
| except Exception as e: | |
| logger.error(f"Error closing database pool '{name}': {e}") | |
| # Close Redis pools | |
| for name, pool in self._redis_pools.items(): | |
| try: | |
| await pool.disconnect() | |
| logger.info(f"Redis pool '{name}' closed") | |
| except Exception as e: | |
| logger.error(f"Error closing Redis pool '{name}': {e}") | |
| self._engines.clear() | |
| self._redis_pools.clear() | |
| self._stats.clear() | |
| # Global instance | |
| connection_pool_service = ConnectionPoolService() |