File size: 13,236 Bytes
f89ac19 14c8d0a f89ac19 14c8d0a f89ac19 14c8d0a f89ac19 14c8d0a f89ac19 14c8d0a f89ac19 14c8d0a f89ac19 14c8d0a 426ed22 f89ac19 426ed22 f89ac19 426ed22 f89ac19 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
"""
Agent pooling system for improved performance.
This module provides a pool of pre-initialized agents that can be
reused across requests, avoiding the overhead of creating new instances.
"""
import asyncio
from typing import Dict, Type, Optional, Any, List
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
import weakref
from src.core import get_logger
from src.agents.deodoro import BaseAgent, AgentContext
logger = get_logger(__name__)
class AgentPoolEntry:
"""Entry in the agent pool."""
def __init__(self, agent: BaseAgent):
self.agent = agent
self.in_use = False
self.last_used = datetime.now()
self.usage_count = 0
self.created_at = datetime.now()
self._lock = asyncio.Lock()
@property
def idle_time(self) -> float:
"""Get idle time in seconds."""
return (datetime.now() - self.last_used).total_seconds()
async def acquire(self) -> BaseAgent:
"""Acquire the agent for use."""
async with self._lock:
if self.in_use:
raise RuntimeError("Agent already in use")
self.in_use = True
self.usage_count += 1
return self.agent
async def release(self):
"""Release the agent back to pool."""
async with self._lock:
self.in_use = False
self.last_used = datetime.now()
class AgentPool:
"""
Pool manager for AI agents.
Features:
- Pre-warmed agent instances
- Automatic cleanup of idle agents
- Usage statistics and monitoring
- Thread-safe operations
"""
def __init__(
self,
min_size: int = 2,
max_size: int = 10,
idle_timeout: int = 300, # 5 minutes
max_agent_lifetime: int = 3600, # 1 hour
use_lazy_loading: bool = True
):
"""
Initialize agent pool.
Args:
min_size: Minimum pool size per agent type
max_size: Maximum pool size per agent type
idle_timeout: Seconds before removing idle agents
max_agent_lifetime: Maximum agent lifetime in seconds
use_lazy_loading: Enable lazy loading for agents
"""
self.min_size = min_size
self.max_size = max_size
self.idle_timeout = idle_timeout
self.max_agent_lifetime = max_agent_lifetime
self._use_lazy_loading = use_lazy_loading
# Pool storage: agent_type -> list of entries
self._pools: Dict[Type[BaseAgent], List[AgentPoolEntry]] = {}
# Weak references to track all created agents
self._all_agents: weakref.WeakSet = weakref.WeakSet()
# Statistics
self._stats = {
"created": 0,
"reused": 0,
"evicted": 0,
"errors": 0,
"lazy_loaded": 0
}
# Cleanup task
self._cleanup_task: Optional[asyncio.Task] = None
self._running = False
async def start(self):
"""Start the agent pool and cleanup task."""
self._running = True
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
# Initialize lazy loader if enabled
if self._use_lazy_loading:
from src.services.agent_lazy_loader import agent_lazy_loader
await agent_lazy_loader.start()
logger.info("Agent pool started", lazy_loading=self._use_lazy_loading)
async def stop(self):
"""Stop the agent pool and cleanup resources."""
self._running = False
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
# Cleanup all agents
for agent_type, entries in self._pools.items():
for entry in entries:
try:
if hasattr(entry.agent, 'cleanup'):
await entry.agent.cleanup()
except Exception as e:
logger.error(f"Error cleaning up agent: {e}")
self._pools.clear()
# Stop lazy loader if enabled
if self._use_lazy_loading:
from src.services.agent_lazy_loader import agent_lazy_loader
await agent_lazy_loader.stop()
logger.info("Agent pool stopped")
@asynccontextmanager
async def acquire(self, agent_type: Type[BaseAgent], context: AgentContext):
"""
Acquire an agent from the pool.
Args:
agent_type: Type of agent to acquire
context: Agent execution context
Yields:
Agent instance
"""
entry = await self._get_or_create_agent(agent_type)
agent = await entry.acquire()
try:
# Update agent context
agent.context = context
yield agent
finally:
# Clear sensitive data
agent.context = None
await entry.release()
async def _get_or_create_agent(self, agent_type: Type[BaseAgent]) -> AgentPoolEntry:
"""Get an available agent or create a new one."""
# Initialize pool for agent type if needed
if agent_type not in self._pools:
self._pools[agent_type] = []
pool = self._pools[agent_type]
# Find available agent
for entry in pool:
if not entry.in_use:
self._stats["reused"] += 1
logger.debug(f"Reusing agent {agent_type.__name__} from pool")
return entry
# Create new agent if under limit
if len(pool) < self.max_size:
agent = await self._create_agent(agent_type)
entry = AgentPoolEntry(agent)
pool.append(entry)
self._stats["created"] += 1
logger.info(f"Created new agent {agent_type.__name__} (pool size: {len(pool)})")
return entry
# Wait for available agent
logger.warning(f"Agent pool full for {agent_type.__name__}, waiting...")
while True:
await asyncio.sleep(0.1)
for entry in pool:
if not entry.in_use:
return entry
async def _create_agent(self, agent_type: Type[BaseAgent]) -> BaseAgent:
"""Create and initialize a new agent."""
try:
# Check if we should use lazy loader
if hasattr(agent_type, '__name__') and self._use_lazy_loading:
# Try to get from lazy loader
from src.services.agent_lazy_loader import agent_lazy_loader
agent_name = agent_type.__name__.replace('Agent', '')
try:
# Use lazy loader to create agent
agent = await agent_lazy_loader.create_agent(agent_name)
self._all_agents.add(agent)
self._stats["lazy_loaded"] += 1
logger.debug(f"Created agent {agent_name} using lazy loader")
except Exception:
# Fallback to direct instantiation
logger.debug(f"Lazy loader failed for {agent_name}, using direct instantiation")
agent = agent_type()
self._all_agents.add(agent)
else:
# Direct instantiation
agent = agent_type()
self._all_agents.add(agent)
# Initialize if needed
if hasattr(agent, 'initialize'):
await agent.initialize()
# Integrate with memory system if available
await self._integrate_agent_memory(agent)
return agent
except Exception as e:
self._stats["errors"] += 1
logger.error(f"Failed to create agent {agent_type.__name__}: {e}")
raise
async def _cleanup_loop(self):
"""Background task to cleanup idle agents."""
while self._running:
try:
await asyncio.sleep(30) # Check every 30 seconds
await self._cleanup_idle_agents()
await self._maintain_minimum_pool()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in cleanup loop: {e}")
async def _cleanup_idle_agents(self):
"""Remove agents that have been idle too long."""
for agent_type, pool in self._pools.items():
to_remove = []
for entry in pool:
# Check idle timeout
if not entry.in_use and entry.idle_time > self.idle_timeout:
# Keep minimum pool size
active_count = sum(1 for e in pool if not e.in_use)
if active_count > self.min_size:
to_remove.append(entry)
# Check lifetime
lifetime = (datetime.now() - entry.created_at).total_seconds()
if lifetime > self.max_agent_lifetime:
to_remove.append(entry)
# Remove identified agents
for entry in to_remove:
if entry.in_use:
continue # Skip if now in use
pool.remove(entry)
self._stats["evicted"] += 1
try:
if hasattr(entry.agent, 'cleanup'):
await entry.agent.cleanup()
except Exception as e:
logger.error(f"Error cleaning up agent: {e}")
logger.debug(f"Evicted idle agent {agent_type.__name__}")
async def _integrate_agent_memory(self, agent: BaseAgent):
"""Integrate agent with memory system if available."""
try:
from src.services.agent_memory_integration import get_memory_integration
memory_integration = get_memory_integration()
if memory_integration:
await memory_integration.integrate_agent(agent)
logger.info(f"Integrated {agent.agent_id} with memory system")
else:
logger.debug("Memory integration not available")
except ImportError:
logger.debug("Memory integration module not available")
except Exception as e:
logger.error(f"Failed to integrate agent with memory: {e}")
async def _maintain_minimum_pool(self):
"""Ensure minimum pool size for each agent type."""
for agent_type, pool in self._pools.items():
available = sum(1 for e in pool if not e.in_use)
# Create agents to maintain minimum
while available < self.min_size and len(pool) < self.max_size:
try:
agent = await self._create_agent(agent_type)
entry = AgentPoolEntry(agent)
pool.append(entry)
available += 1
logger.debug(f"Pre-warmed agent {agent_type.__name__}")
except Exception as e:
logger.error(f"Failed to maintain pool: {e}")
break
async def prewarm(self, agent_types: List[Type[BaseAgent]]):
"""Pre-warm the pool with specified agent types."""
for agent_type in agent_types:
if agent_type not in self._pools:
self._pools[agent_type] = []
# Create minimum agents
pool = self._pools[agent_type]
while len(pool) < self.min_size:
try:
agent = await self._create_agent(agent_type)
entry = AgentPoolEntry(agent)
pool.append(entry)
logger.info(f"Pre-warmed {agent_type.__name__} agent")
except Exception as e:
logger.error(f"Failed to prewarm {agent_type.__name__}: {e}")
break
def get_stats(self) -> Dict[str, Any]:
"""Get pool statistics."""
pool_stats = {}
for agent_type, pool in self._pools.items():
pool_stats[agent_type.__name__] = {
"total": len(pool),
"in_use": sum(1 for e in pool if e.in_use),
"available": sum(1 for e in pool if not e.in_use),
"avg_usage": sum(e.usage_count for e in pool) / len(pool) if pool else 0
}
return {
"pools": pool_stats,
"global_stats": self._stats,
"total_agents": sum(len(p) for p in self._pools.values())
}
# Global agent pool instance
agent_pool = AgentPool()
async def get_agent_pool() -> AgentPool:
"""Get the global agent pool instance."""
if not agent_pool._running:
await agent_pool.start()
return agent_pool |