|
|
""" |
|
|
Memory System Startup Service |
|
|
|
|
|
Initializes the memory system and integrates it with all agents |
|
|
during application startup. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from typing import Optional |
|
|
|
|
|
from src.agents.nana import ContextMemoryAgent |
|
|
from src.services.agent_memory_integration import initialize_memory_integration |
|
|
from src.core.cache import get_redis_client |
|
|
from src.core import get_logger |
|
|
from src.core.config import settings |
|
|
|
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
async def initialize_memory_system() -> Optional[ContextMemoryAgent]: |
|
|
""" |
|
|
Initialize the complete memory system. |
|
|
|
|
|
This function: |
|
|
1. Creates the Nanã memory agent |
|
|
2. Sets up the memory integration service |
|
|
3. Returns the configured memory agent |
|
|
""" |
|
|
try: |
|
|
logger.info("Initializing memory system...") |
|
|
|
|
|
|
|
|
redis_client = await get_redis_client() |
|
|
|
|
|
|
|
|
|
|
|
vector_store = None |
|
|
|
|
|
|
|
|
memory_agent = ContextMemoryAgent( |
|
|
redis_client=redis_client, |
|
|
vector_store=vector_store, |
|
|
max_episodic_memories=getattr(settings, "MAX_EPISODIC_MEMORIES", 10000), |
|
|
max_conversation_turns=getattr(settings, "MAX_CONVERSATION_TURNS", 100), |
|
|
memory_decay_days=getattr(settings, "MEMORY_DECAY_DAYS", 90) |
|
|
) |
|
|
|
|
|
|
|
|
await memory_agent.initialize() |
|
|
|
|
|
|
|
|
memory_integration = await initialize_memory_integration(memory_agent) |
|
|
|
|
|
|
|
|
memory_integration.auto_store = getattr(settings, "AUTO_STORE_MEMORIES", True) |
|
|
memory_integration.auto_retrieve = getattr(settings, "AUTO_RETRIEVE_MEMORIES", True) |
|
|
memory_integration.cache_ttl = getattr(settings, "MEMORY_CACHE_TTL", 300) |
|
|
|
|
|
logger.info("Memory system initialized successfully") |
|
|
logger.info(f"Auto-store: {memory_integration.auto_store}") |
|
|
logger.info(f"Auto-retrieve: {memory_integration.auto_retrieve}") |
|
|
logger.info(f"Cache TTL: {memory_integration.cache_ttl}s") |
|
|
|
|
|
return memory_agent |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize memory system: {str(e)}") |
|
|
logger.warning("Continuing without memory system - agents will operate independently") |
|
|
return None |
|
|
|
|
|
|
|
|
async def integrate_existing_agents(): |
|
|
""" |
|
|
Integrate all existing agents with the memory system. |
|
|
|
|
|
This is useful when agents are already created and we need |
|
|
to retrofit them with memory capabilities. |
|
|
""" |
|
|
try: |
|
|
from src.agents.agent_pool import get_agent_pool |
|
|
from src.services.agent_memory_integration import get_memory_integration |
|
|
|
|
|
memory_integration = get_memory_integration() |
|
|
if not memory_integration: |
|
|
logger.warning("Memory integration not available") |
|
|
return |
|
|
|
|
|
agent_pool = get_agent_pool() |
|
|
if not agent_pool: |
|
|
logger.warning("Agent pool not available") |
|
|
return |
|
|
|
|
|
|
|
|
integrated_count = 0 |
|
|
for agent_type, pool_entries in agent_pool._pools.items(): |
|
|
for entry in pool_entries: |
|
|
try: |
|
|
await memory_integration.integrate_agent(entry.agent) |
|
|
integrated_count += 1 |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to integrate agent {entry.agent.agent_id}: {str(e)}") |
|
|
|
|
|
logger.info(f"Integrated {integrated_count} existing agents with memory system") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to integrate existing agents: {str(e)}") |
|
|
|
|
|
|
|
|
async def demonstrate_memory_sharing(): |
|
|
""" |
|
|
Demonstrate how agents can share knowledge through memory. |
|
|
|
|
|
This is an example of cross-agent learning. |
|
|
""" |
|
|
try: |
|
|
from src.services.agent_memory_integration import get_memory_integration |
|
|
|
|
|
memory_integration = get_memory_integration() |
|
|
if not memory_integration: |
|
|
logger.warning("Memory integration not available for demonstration") |
|
|
return |
|
|
|
|
|
|
|
|
logger.info("Demonstrating knowledge sharing between agents...") |
|
|
|
|
|
|
|
|
success = await memory_integration.share_knowledge_between_agents( |
|
|
source_agent="zumbi", |
|
|
target_agent="oxossi", |
|
|
knowledge_type="anomaly", |
|
|
filters={"importance": "HIGH"} |
|
|
) |
|
|
|
|
|
if success: |
|
|
logger.info("Successfully shared anomaly patterns from Zumbi to Oxóssi") |
|
|
|
|
|
|
|
|
success = await memory_integration.share_knowledge_between_agents( |
|
|
source_agent="oxossi", |
|
|
target_agent="zumbi", |
|
|
knowledge_type="fraud" |
|
|
) |
|
|
|
|
|
if success: |
|
|
logger.info("Successfully shared fraud patterns from Oxóssi to Zumbi") |
|
|
|
|
|
|
|
|
stats = await memory_integration.get_memory_statistics() |
|
|
logger.info(f"Memory statistics: {stats}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Memory sharing demonstration failed: {str(e)}") |
|
|
|
|
|
|
|
|
async def optimize_agent_memories(): |
|
|
""" |
|
|
Optimize memories for all agents by consolidating and cleaning up. |
|
|
|
|
|
This should be run periodically (e.g., daily) to maintain |
|
|
memory system performance. |
|
|
""" |
|
|
try: |
|
|
from src.services.agent_memory_integration import get_memory_integration |
|
|
|
|
|
memory_integration = get_memory_integration() |
|
|
if not memory_integration: |
|
|
logger.warning("Memory integration not available for optimization") |
|
|
return |
|
|
|
|
|
logger.info("Starting memory optimization for all agents...") |
|
|
|
|
|
|
|
|
agents_to_optimize = [ |
|
|
"zumbi", "anita", "oxossi", "bonifacio", "dandara", |
|
|
"machado", "lampiao", "maria_quiteria", "obaluaie" |
|
|
] |
|
|
|
|
|
for agent_id in agents_to_optimize: |
|
|
try: |
|
|
await memory_integration.optimize_memory_for_agent(agent_id) |
|
|
logger.info(f"Optimized memory for {agent_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to optimize memory for {agent_id}: {str(e)}") |
|
|
|
|
|
logger.info("Memory optimization completed") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Memory optimization failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def setup_memory_on_startup(): |
|
|
"""Setup memory system during FastAPI startup.""" |
|
|
memory_agent = await initialize_memory_system() |
|
|
if memory_agent: |
|
|
await integrate_existing_agents() |
|
|
|
|
|
if getattr(settings, "DEMO_MEMORY_SHARING", False): |
|
|
await demonstrate_memory_sharing() |
|
|
return memory_agent |
|
|
|
|
|
|
|
|
async def cleanup_memory_on_shutdown(): |
|
|
"""Cleanup memory system during FastAPI shutdown.""" |
|
|
try: |
|
|
from src.services.agent_memory_integration import get_memory_integration |
|
|
|
|
|
memory_integration = get_memory_integration() |
|
|
if memory_integration: |
|
|
|
|
|
logger.info("Saving pending memories before shutdown...") |
|
|
|
|
|
|
|
|
stats = await memory_integration.get_memory_statistics() |
|
|
logger.info(f"Final memory statistics: {stats}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Memory cleanup failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
async def periodic_memory_optimization(): |
|
|
"""Run memory optimization periodically.""" |
|
|
while True: |
|
|
try: |
|
|
|
|
|
interval = getattr(settings, "MEMORY_OPTIMIZATION_INTERVAL", 86400) |
|
|
await asyncio.sleep(interval) |
|
|
|
|
|
|
|
|
await optimize_agent_memories() |
|
|
|
|
|
except asyncio.CancelledError: |
|
|
break |
|
|
except Exception as e: |
|
|
logger.error(f"Periodic optimization error: {str(e)}") |
|
|
|
|
|
await asyncio.sleep(3600) |