cidadao.ai-backend / src /services /memory_startup.py
anderson-ufrj
fix(settings): replace settings.get() with getattr() for Pydantic models
11a4d19
"""
Memory System Startup Service
Initializes the memory system and integrates it with all agents
during application startup.
"""
import asyncio
from typing import Optional
from src.agents.nana import ContextMemoryAgent
from src.services.agent_memory_integration import initialize_memory_integration
from src.core.cache import get_redis_client
from src.core import get_logger
from src.core.config import settings
logger = get_logger(__name__)
async def initialize_memory_system() -> Optional[ContextMemoryAgent]:
"""
Initialize the complete memory system.
This function:
1. Creates the Nanã memory agent
2. Sets up the memory integration service
3. Returns the configured memory agent
"""
try:
logger.info("Initializing memory system...")
# Get Redis client for memory storage
redis_client = await get_redis_client()
# Initialize vector store (using ChromaDB or similar)
# For now, we'll use a simple in-memory store
vector_store = None # This would be ChromaDB, Pinecone, etc.
# Create Nanã memory agent
memory_agent = ContextMemoryAgent(
redis_client=redis_client,
vector_store=vector_store,
max_episodic_memories=getattr(settings, "MAX_EPISODIC_MEMORIES", 10000),
max_conversation_turns=getattr(settings, "MAX_CONVERSATION_TURNS", 100),
memory_decay_days=getattr(settings, "MEMORY_DECAY_DAYS", 90)
)
# Initialize the memory agent
await memory_agent.initialize()
# Initialize memory integration service
memory_integration = await initialize_memory_integration(memory_agent)
# Configure memory integration settings
memory_integration.auto_store = getattr(settings, "AUTO_STORE_MEMORIES", True)
memory_integration.auto_retrieve = getattr(settings, "AUTO_RETRIEVE_MEMORIES", True)
memory_integration.cache_ttl = getattr(settings, "MEMORY_CACHE_TTL", 300)
logger.info("Memory system initialized successfully")
logger.info(f"Auto-store: {memory_integration.auto_store}")
logger.info(f"Auto-retrieve: {memory_integration.auto_retrieve}")
logger.info(f"Cache TTL: {memory_integration.cache_ttl}s")
return memory_agent
except Exception as e:
logger.error(f"Failed to initialize memory system: {str(e)}")
logger.warning("Continuing without memory system - agents will operate independently")
return None
async def integrate_existing_agents():
"""
Integrate all existing agents with the memory system.
This is useful when agents are already created and we need
to retrofit them with memory capabilities.
"""
try:
from src.agents.agent_pool import get_agent_pool
from src.services.agent_memory_integration import get_memory_integration
memory_integration = get_memory_integration()
if not memory_integration:
logger.warning("Memory integration not available")
return
agent_pool = get_agent_pool()
if not agent_pool:
logger.warning("Agent pool not available")
return
# Get all agents from the pool
integrated_count = 0
for agent_type, pool_entries in agent_pool._pools.items():
for entry in pool_entries:
try:
await memory_integration.integrate_agent(entry.agent)
integrated_count += 1
except Exception as e:
logger.error(f"Failed to integrate agent {entry.agent.agent_id}: {str(e)}")
logger.info(f"Integrated {integrated_count} existing agents with memory system")
except Exception as e:
logger.error(f"Failed to integrate existing agents: {str(e)}")
async def demonstrate_memory_sharing():
"""
Demonstrate how agents can share knowledge through memory.
This is an example of cross-agent learning.
"""
try:
from src.services.agent_memory_integration import get_memory_integration
memory_integration = get_memory_integration()
if not memory_integration:
logger.warning("Memory integration not available for demonstration")
return
# Example: Share anomaly patterns from Zumbi to Oxóssi
logger.info("Demonstrating knowledge sharing between agents...")
# Share anomaly patterns
success = await memory_integration.share_knowledge_between_agents(
source_agent="zumbi",
target_agent="oxossi",
knowledge_type="anomaly",
filters={"importance": "HIGH"}
)
if success:
logger.info("Successfully shared anomaly patterns from Zumbi to Oxóssi")
# Share fraud patterns back to Zumbi
success = await memory_integration.share_knowledge_between_agents(
source_agent="oxossi",
target_agent="zumbi",
knowledge_type="fraud"
)
if success:
logger.info("Successfully shared fraud patterns from Oxóssi to Zumbi")
# Get memory statistics
stats = await memory_integration.get_memory_statistics()
logger.info(f"Memory statistics: {stats}")
except Exception as e:
logger.error(f"Memory sharing demonstration failed: {str(e)}")
async def optimize_agent_memories():
"""
Optimize memories for all agents by consolidating and cleaning up.
This should be run periodically (e.g., daily) to maintain
memory system performance.
"""
try:
from src.services.agent_memory_integration import get_memory_integration
memory_integration = get_memory_integration()
if not memory_integration:
logger.warning("Memory integration not available for optimization")
return
logger.info("Starting memory optimization for all agents...")
# List of agents to optimize
agents_to_optimize = [
"zumbi", "anita", "oxossi", "bonifacio", "dandara",
"machado", "lampiao", "maria_quiteria", "obaluaie"
]
for agent_id in agents_to_optimize:
try:
await memory_integration.optimize_memory_for_agent(agent_id)
logger.info(f"Optimized memory for {agent_id}")
except Exception as e:
logger.error(f"Failed to optimize memory for {agent_id}: {str(e)}")
logger.info("Memory optimization completed")
except Exception as e:
logger.error(f"Memory optimization failed: {str(e)}")
# Convenience functions for FastAPI startup
async def setup_memory_on_startup():
"""Setup memory system during FastAPI startup."""
memory_agent = await initialize_memory_system()
if memory_agent:
await integrate_existing_agents()
# Optionally demonstrate memory sharing
if getattr(settings, "DEMO_MEMORY_SHARING", False):
await demonstrate_memory_sharing()
return memory_agent
async def cleanup_memory_on_shutdown():
"""Cleanup memory system during FastAPI shutdown."""
try:
from src.services.agent_memory_integration import get_memory_integration
memory_integration = get_memory_integration()
if memory_integration:
# Save any pending memories
logger.info("Saving pending memories before shutdown...")
# Get final statistics
stats = await memory_integration.get_memory_statistics()
logger.info(f"Final memory statistics: {stats}")
except Exception as e:
logger.error(f"Memory cleanup failed: {str(e)}")
# Background task for periodic optimization
async def periodic_memory_optimization():
"""Run memory optimization periodically."""
while True:
try:
# Wait for configured interval (default: 24 hours)
interval = getattr(settings, "MEMORY_OPTIMIZATION_INTERVAL", 86400)
await asyncio.sleep(interval)
# Run optimization
await optimize_agent_memories()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Periodic optimization error: {str(e)}")
# Continue after error with shorter interval
await asyncio.sleep(3600) # Retry in 1 hour