|
|
""" |
|
|
Module: services.cache_warming_service |
|
|
Description: Cache warming strategies for improved performance |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-25 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from typing import List, Dict, Any, Optional, Set |
|
|
from datetime import datetime, timedelta, timezone |
|
|
from enum import Enum |
|
|
import hashlib |
|
|
|
|
|
from src.core import get_logger |
|
|
from src.services.cache_service import cache_service |
|
|
from src.services.data_service import data_service |
|
|
from src.services.investigation_service_selector import investigation_service |
|
|
from src.core.config import settings |
|
|
from src.db.session import get_session |
|
|
from src.infrastructure.database import Investigation |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class CacheWarmingStrategy(str, Enum): |
|
|
"""Cache warming strategies.""" |
|
|
POPULAR_DATA = "popular_data" |
|
|
RECENT_INVESTIGATIONS = "recent_investigations" |
|
|
FREQUENT_QUERIES = "frequent_queries" |
|
|
AGENT_POOLS = "agent_pools" |
|
|
STATIC_RESOURCES = "static_resources" |
|
|
PREDICTIVE = "predictive" |
|
|
|
|
|
|
|
|
class CacheWarmingConfig: |
|
|
"""Configuration for cache warming.""" |
|
|
|
|
|
|
|
|
TTL_CONFIG = { |
|
|
"contracts": 3600, |
|
|
"investigations": 1800, |
|
|
"agent_pools": 7200, |
|
|
"static_data": 86400, |
|
|
"frequent_queries": 600, |
|
|
"analytics": 3600 |
|
|
} |
|
|
|
|
|
|
|
|
MAX_ITEMS_PER_TYPE = { |
|
|
"contracts": 100, |
|
|
"investigations": 50, |
|
|
"queries": 200, |
|
|
"agents": 20 |
|
|
} |
|
|
|
|
|
|
|
|
PRIORITY_WEIGHTS = { |
|
|
"recency": 0.3, |
|
|
"frequency": 0.4, |
|
|
"importance": 0.3 |
|
|
} |
|
|
|
|
|
|
|
|
class CacheWarmingService: |
|
|
"""Service for cache warming operations.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize cache warming service.""" |
|
|
self._config = CacheWarmingConfig() |
|
|
self._warming_tasks: Set[asyncio.Task] = set() |
|
|
self._last_warming: Dict[str, datetime] = {} |
|
|
self._query_frequency: Dict[str, int] = {} |
|
|
self._warming_interval = 300 |
|
|
|
|
|
async def start_warming_scheduler(self): |
|
|
"""Start the cache warming scheduler.""" |
|
|
logger.info("cache_warming_scheduler_started") |
|
|
|
|
|
while True: |
|
|
try: |
|
|
|
|
|
await self.warm_all_caches() |
|
|
|
|
|
|
|
|
await asyncio.sleep(self._warming_interval) |
|
|
|
|
|
except asyncio.CancelledError: |
|
|
logger.info("cache_warming_scheduler_stopped") |
|
|
break |
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"cache_warming_scheduler_error", |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
await asyncio.sleep(60) |
|
|
|
|
|
async def warm_all_caches(self): |
|
|
"""Execute all cache warming strategies.""" |
|
|
start_time = datetime.now(timezone.utc) |
|
|
|
|
|
strategies = [ |
|
|
self._warm_popular_data(), |
|
|
self._warm_recent_investigations(), |
|
|
self._warm_frequent_queries(), |
|
|
self._warm_agent_pools(), |
|
|
self._warm_static_resources() |
|
|
] |
|
|
|
|
|
|
|
|
results = await asyncio.gather(*strategies, return_exceptions=True) |
|
|
|
|
|
|
|
|
duration = (datetime.now(timezone.utc) - start_time).total_seconds() |
|
|
successful = sum(1 for r in results if not isinstance(r, Exception)) |
|
|
|
|
|
logger.info( |
|
|
"cache_warming_completed", |
|
|
duration_seconds=duration, |
|
|
strategies_total=len(strategies), |
|
|
strategies_successful=successful |
|
|
) |
|
|
|
|
|
|
|
|
self._last_warming["all"] = datetime.now(timezone.utc) |
|
|
|
|
|
async def _warm_popular_data(self) -> Dict[str, Any]: |
|
|
"""Warm cache with popular/frequently accessed data.""" |
|
|
warmed_count = 0 |
|
|
|
|
|
try: |
|
|
|
|
|
popular_contracts = await self._get_popular_contracts() |
|
|
|
|
|
for contract_id in popular_contracts[:self._config.MAX_ITEMS_PER_TYPE["contracts"]]: |
|
|
cache_key = f"contract:{contract_id}" |
|
|
|
|
|
|
|
|
if await cache_service.get(cache_key): |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
contract_data = await data_service.get_contract(contract_id) |
|
|
if contract_data: |
|
|
await cache_service.set( |
|
|
cache_key, |
|
|
contract_data, |
|
|
ttl=self._config.TTL_CONFIG["contracts"] |
|
|
) |
|
|
warmed_count += 1 |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to warm contract {contract_id}: {e}") |
|
|
|
|
|
logger.info( |
|
|
"popular_data_warmed", |
|
|
contracts_warmed=warmed_count |
|
|
) |
|
|
|
|
|
return {"contracts": warmed_count} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("popular_data_warming_failed", error=str(e)) |
|
|
raise |
|
|
|
|
|
async def _warm_recent_investigations(self) -> Dict[str, Any]: |
|
|
"""Warm cache with recent investigations.""" |
|
|
warmed_count = 0 |
|
|
|
|
|
try: |
|
|
async with get_session() as session: |
|
|
|
|
|
from sqlalchemy import select, desc |
|
|
query = select(Investigation).order_by( |
|
|
desc(Investigation.created_at) |
|
|
).limit(self._config.MAX_ITEMS_PER_TYPE["investigations"]) |
|
|
|
|
|
result = await session.execute(query) |
|
|
investigations = result.scalars().all() |
|
|
|
|
|
for investigation in investigations: |
|
|
cache_key = f"investigation:{investigation.id}" |
|
|
|
|
|
|
|
|
await cache_service.set( |
|
|
cache_key, |
|
|
{ |
|
|
"id": investigation.id, |
|
|
"status": investigation.status, |
|
|
"contract_id": investigation.contract_id, |
|
|
"results": investigation.results, |
|
|
"created_at": investigation.created_at.isoformat() |
|
|
}, |
|
|
ttl=self._config.TTL_CONFIG["investigations"] |
|
|
) |
|
|
warmed_count += 1 |
|
|
|
|
|
logger.info( |
|
|
"recent_investigations_warmed", |
|
|
count=warmed_count |
|
|
) |
|
|
|
|
|
return {"investigations": warmed_count} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("recent_investigations_warming_failed", error=str(e)) |
|
|
raise |
|
|
|
|
|
async def _warm_frequent_queries(self) -> Dict[str, Any]: |
|
|
"""Warm cache with results of frequent queries.""" |
|
|
warmed_count = 0 |
|
|
|
|
|
try: |
|
|
|
|
|
frequent_queries = sorted( |
|
|
self._query_frequency.items(), |
|
|
key=lambda x: x[1], |
|
|
reverse=True |
|
|
)[:self._config.MAX_ITEMS_PER_TYPE["queries"]] |
|
|
|
|
|
for query_hash, frequency in frequent_queries: |
|
|
cache_key = f"query_result:{query_hash}" |
|
|
|
|
|
|
|
|
if await cache_service.get(cache_key): |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
warmed_count += 1 |
|
|
|
|
|
logger.info( |
|
|
"frequent_queries_warmed", |
|
|
count=warmed_count |
|
|
) |
|
|
|
|
|
return {"queries": warmed_count} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("frequent_queries_warming_failed", error=str(e)) |
|
|
raise |
|
|
|
|
|
async def _warm_agent_pools(self) -> Dict[str, Any]: |
|
|
"""Warm agent pool connections.""" |
|
|
warmed_count = 0 |
|
|
|
|
|
try: |
|
|
|
|
|
agent_types = [ |
|
|
"zumbi", |
|
|
"anita", |
|
|
"tiradentes", |
|
|
"machado", |
|
|
"dandara" |
|
|
] |
|
|
|
|
|
for agent_type in agent_types[:self._config.MAX_ITEMS_PER_TYPE["agents"]]: |
|
|
cache_key = f"agent_pool:{agent_type}:status" |
|
|
|
|
|
|
|
|
await cache_service.set( |
|
|
cache_key, |
|
|
{ |
|
|
"type": agent_type, |
|
|
"initialized": True, |
|
|
"last_used": datetime.now(timezone.utc).isoformat() |
|
|
}, |
|
|
ttl=self._config.TTL_CONFIG["agent_pools"] |
|
|
) |
|
|
warmed_count += 1 |
|
|
|
|
|
logger.info( |
|
|
"agent_pools_warmed", |
|
|
count=warmed_count |
|
|
) |
|
|
|
|
|
return {"agents": warmed_count} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("agent_pools_warming_failed", error=str(e)) |
|
|
raise |
|
|
|
|
|
async def _warm_static_resources(self) -> Dict[str, Any]: |
|
|
"""Warm cache with static resources.""" |
|
|
warmed_count = 0 |
|
|
|
|
|
try: |
|
|
|
|
|
static_data = { |
|
|
"system_config": { |
|
|
"version": "1.0.0", |
|
|
"features": ["investigations", "reports", "analysis"], |
|
|
"agents": ["zumbi", "anita", "tiradentes"] |
|
|
}, |
|
|
"contract_types": [ |
|
|
"licitacao", |
|
|
"contrato", |
|
|
"convenio", |
|
|
"termo_aditivo" |
|
|
], |
|
|
"anomaly_types": [ |
|
|
"valor_atipico", |
|
|
"padrao_temporal", |
|
|
"fornecedor_suspeito", |
|
|
"fragmentacao" |
|
|
] |
|
|
} |
|
|
|
|
|
for key, data in static_data.items(): |
|
|
cache_key = f"static:{key}" |
|
|
await cache_service.set( |
|
|
cache_key, |
|
|
data, |
|
|
ttl=self._config.TTL_CONFIG["static_data"] |
|
|
) |
|
|
warmed_count += 1 |
|
|
|
|
|
logger.info( |
|
|
"static_resources_warmed", |
|
|
count=warmed_count |
|
|
) |
|
|
|
|
|
return {"static": warmed_count} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("static_resources_warming_failed", error=str(e)) |
|
|
raise |
|
|
|
|
|
async def _get_popular_contracts(self) -> List[str]: |
|
|
"""Get list of popular contract IDs.""" |
|
|
|
|
|
|
|
|
return [ |
|
|
"CONT-2024-001", |
|
|
"CONT-2024-002", |
|
|
"CONT-2024-003", |
|
|
"CONT-2024-004", |
|
|
"CONT-2024-005" |
|
|
] |
|
|
|
|
|
def track_query(self, query_params: Dict[str, Any]): |
|
|
"""Track query frequency for cache warming.""" |
|
|
|
|
|
query_str = str(sorted(query_params.items())) |
|
|
query_hash = hashlib.md5(query_str.encode()).hexdigest() |
|
|
|
|
|
|
|
|
self._query_frequency[query_hash] = self._query_frequency.get(query_hash, 0) + 1 |
|
|
|
|
|
|
|
|
if len(self._query_frequency) > 1000: |
|
|
|
|
|
sorted_queries = sorted( |
|
|
self._query_frequency.items(), |
|
|
key=lambda x: x[1] |
|
|
) |
|
|
for query, _ in sorted_queries[:100]: |
|
|
del self._query_frequency[query] |
|
|
|
|
|
async def warm_specific_data( |
|
|
self, |
|
|
data_type: str, |
|
|
identifiers: List[str], |
|
|
ttl: Optional[int] = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Warm cache with specific data.""" |
|
|
if ttl is None: |
|
|
ttl = self._config.TTL_CONFIG.get(data_type, 3600) |
|
|
|
|
|
warmed = [] |
|
|
failed = [] |
|
|
|
|
|
for identifier in identifiers: |
|
|
try: |
|
|
cache_key = f"{data_type}:{identifier}" |
|
|
|
|
|
|
|
|
if await cache_service.get(cache_key): |
|
|
continue |
|
|
|
|
|
|
|
|
data = None |
|
|
if data_type == "contract": |
|
|
data = await data_service.get_contract(identifier) |
|
|
elif data_type == "investigation": |
|
|
data = await investigation_service.get_investigation(identifier) |
|
|
|
|
|
if data: |
|
|
await cache_service.set(cache_key, data, ttl=ttl) |
|
|
warmed.append(identifier) |
|
|
else: |
|
|
failed.append(identifier) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
f"Failed to warm {data_type}:{identifier}: {e}" |
|
|
) |
|
|
failed.append(identifier) |
|
|
|
|
|
return { |
|
|
"warmed": warmed, |
|
|
"failed": failed, |
|
|
"total": len(identifiers) |
|
|
} |
|
|
|
|
|
async def get_warming_status(self) -> Dict[str, Any]: |
|
|
"""Get current cache warming status.""" |
|
|
status = { |
|
|
"last_warming": self._last_warming.get("all"), |
|
|
"query_frequency_tracked": len(self._query_frequency), |
|
|
"top_queries": sorted( |
|
|
self._query_frequency.items(), |
|
|
key=lambda x: x[1], |
|
|
reverse=True |
|
|
)[:10], |
|
|
"config": { |
|
|
"interval_seconds": self._warming_interval, |
|
|
"ttls": self._config.TTL_CONFIG, |
|
|
"limits": self._config.MAX_ITEMS_PER_TYPE |
|
|
} |
|
|
} |
|
|
|
|
|
return status |
|
|
|
|
|
async def trigger_manual_warming( |
|
|
self, |
|
|
strategies: Optional[List[CacheWarmingStrategy]] = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Manually trigger cache warming.""" |
|
|
if strategies is None: |
|
|
return await self.warm_all_caches() |
|
|
|
|
|
results = {} |
|
|
for strategy in strategies: |
|
|
try: |
|
|
if strategy == CacheWarmingStrategy.POPULAR_DATA: |
|
|
results[strategy] = await self._warm_popular_data() |
|
|
elif strategy == CacheWarmingStrategy.RECENT_INVESTIGATIONS: |
|
|
results[strategy] = await self._warm_recent_investigations() |
|
|
elif strategy == CacheWarmingStrategy.FREQUENT_QUERIES: |
|
|
results[strategy] = await self._warm_frequent_queries() |
|
|
elif strategy == CacheWarmingStrategy.AGENT_POOLS: |
|
|
results[strategy] = await self._warm_agent_pools() |
|
|
elif strategy == CacheWarmingStrategy.STATIC_RESOURCES: |
|
|
results[strategy] = await self._warm_static_resources() |
|
|
except Exception as e: |
|
|
results[strategy] = {"error": str(e)} |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
cache_warming_service = CacheWarmingService() |