File size: 13,236 Bytes
f89ac19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14c8d0a
 
f89ac19
 
 
 
 
 
 
 
 
14c8d0a
f89ac19
 
 
 
 
14c8d0a
f89ac19
 
 
 
 
 
 
 
 
 
 
 
14c8d0a
 
f89ac19
 
 
 
 
 
 
 
 
 
14c8d0a
 
 
 
 
 
 
f89ac19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14c8d0a
 
 
 
 
 
f89ac19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14c8d0a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
426ed22
 
 
 
 
 
f89ac19
 
 
 
 
426ed22
 
 
f89ac19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
426ed22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f89ac19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
"""
Agent pooling system for improved performance.

This module provides a pool of pre-initialized agents that can be
reused across requests, avoiding the overhead of creating new instances.
"""

import asyncio
from typing import Dict, Type, Optional, Any, List
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
import weakref

from src.core import get_logger
from src.agents.deodoro import BaseAgent, AgentContext

logger = get_logger(__name__)


class AgentPoolEntry:
    """Entry in the agent pool."""
    
    def __init__(self, agent: BaseAgent):
        self.agent = agent
        self.in_use = False
        self.last_used = datetime.now()
        self.usage_count = 0
        self.created_at = datetime.now()
        self._lock = asyncio.Lock()
    
    @property
    def idle_time(self) -> float:
        """Get idle time in seconds."""
        return (datetime.now() - self.last_used).total_seconds()
    
    async def acquire(self) -> BaseAgent:
        """Acquire the agent for use."""
        async with self._lock:
            if self.in_use:
                raise RuntimeError("Agent already in use")
            self.in_use = True
            self.usage_count += 1
            return self.agent
    
    async def release(self):
        """Release the agent back to pool."""
        async with self._lock:
            self.in_use = False
            self.last_used = datetime.now()


class AgentPool:
    """
    Pool manager for AI agents.
    
    Features:
    - Pre-warmed agent instances
    - Automatic cleanup of idle agents
    - Usage statistics and monitoring
    - Thread-safe operations
    """
    
    def __init__(
        self,
        min_size: int = 2,
        max_size: int = 10,
        idle_timeout: int = 300,  # 5 minutes
        max_agent_lifetime: int = 3600,  # 1 hour
        use_lazy_loading: bool = True
    ):
        """
        Initialize agent pool.
        
        Args:
            min_size: Minimum pool size per agent type
            max_size: Maximum pool size per agent type
            idle_timeout: Seconds before removing idle agents
            max_agent_lifetime: Maximum agent lifetime in seconds
            use_lazy_loading: Enable lazy loading for agents
        """
        self.min_size = min_size
        self.max_size = max_size
        self.idle_timeout = idle_timeout
        self.max_agent_lifetime = max_agent_lifetime
        self._use_lazy_loading = use_lazy_loading
        
        # Pool storage: agent_type -> list of entries
        self._pools: Dict[Type[BaseAgent], List[AgentPoolEntry]] = {}
        
        # Weak references to track all created agents
        self._all_agents: weakref.WeakSet = weakref.WeakSet()
        
        # Statistics
        self._stats = {
            "created": 0,
            "reused": 0,
            "evicted": 0,
            "errors": 0,
            "lazy_loaded": 0
        }
        
        # Cleanup task
        self._cleanup_task: Optional[asyncio.Task] = None
        self._running = False
    
    async def start(self):
        """Start the agent pool and cleanup task."""
        self._running = True
        self._cleanup_task = asyncio.create_task(self._cleanup_loop())
        
        # Initialize lazy loader if enabled
        if self._use_lazy_loading:
            from src.services.agent_lazy_loader import agent_lazy_loader
            await agent_lazy_loader.start()
        
        logger.info("Agent pool started", lazy_loading=self._use_lazy_loading)
    
    async def stop(self):
        """Stop the agent pool and cleanup resources."""
        self._running = False
        
        if self._cleanup_task:
            self._cleanup_task.cancel()
            try:
                await self._cleanup_task
            except asyncio.CancelledError:
                pass
        
        # Cleanup all agents
        for agent_type, entries in self._pools.items():
            for entry in entries:
                try:
                    if hasattr(entry.agent, 'cleanup'):
                        await entry.agent.cleanup()
                except Exception as e:
                    logger.error(f"Error cleaning up agent: {e}")
        
        self._pools.clear()
        
        # Stop lazy loader if enabled
        if self._use_lazy_loading:
            from src.services.agent_lazy_loader import agent_lazy_loader
            await agent_lazy_loader.stop()
        
        logger.info("Agent pool stopped")
    
    @asynccontextmanager
    async def acquire(self, agent_type: Type[BaseAgent], context: AgentContext):
        """
        Acquire an agent from the pool.
        
        Args:
            agent_type: Type of agent to acquire
            context: Agent execution context
            
        Yields:
            Agent instance
        """
        entry = await self._get_or_create_agent(agent_type)
        agent = await entry.acquire()
        
        try:
            # Update agent context
            agent.context = context
            yield agent
        finally:
            # Clear sensitive data
            agent.context = None
            await entry.release()
    
    async def _get_or_create_agent(self, agent_type: Type[BaseAgent]) -> AgentPoolEntry:
        """Get an available agent or create a new one."""
        # Initialize pool for agent type if needed
        if agent_type not in self._pools:
            self._pools[agent_type] = []
        
        pool = self._pools[agent_type]
        
        # Find available agent
        for entry in pool:
            if not entry.in_use:
                self._stats["reused"] += 1
                logger.debug(f"Reusing agent {agent_type.__name__} from pool")
                return entry
        
        # Create new agent if under limit
        if len(pool) < self.max_size:
            agent = await self._create_agent(agent_type)
            entry = AgentPoolEntry(agent)
            pool.append(entry)
            self._stats["created"] += 1
            logger.info(f"Created new agent {agent_type.__name__} (pool size: {len(pool)})")
            return entry
        
        # Wait for available agent
        logger.warning(f"Agent pool full for {agent_type.__name__}, waiting...")
        while True:
            await asyncio.sleep(0.1)
            for entry in pool:
                if not entry.in_use:
                    return entry
    
    async def _create_agent(self, agent_type: Type[BaseAgent]) -> BaseAgent:
        """Create and initialize a new agent."""
        try:
            # Check if we should use lazy loader
            if hasattr(agent_type, '__name__') and self._use_lazy_loading:
                # Try to get from lazy loader
                from src.services.agent_lazy_loader import agent_lazy_loader
                agent_name = agent_type.__name__.replace('Agent', '')
                
                try:
                    # Use lazy loader to create agent
                    agent = await agent_lazy_loader.create_agent(agent_name)
                    self._all_agents.add(agent)
                    self._stats["lazy_loaded"] += 1
                    logger.debug(f"Created agent {agent_name} using lazy loader")
                except Exception:
                    # Fallback to direct instantiation
                    logger.debug(f"Lazy loader failed for {agent_name}, using direct instantiation")
                    agent = agent_type()
                    self._all_agents.add(agent)
            else:
                # Direct instantiation
                agent = agent_type()
                self._all_agents.add(agent)
            
            # Initialize if needed
            if hasattr(agent, 'initialize'):
                await agent.initialize()
            
            # Integrate with memory system if available
            await self._integrate_agent_memory(agent)
            
            return agent
            
        except Exception as e:
            self._stats["errors"] += 1
            logger.error(f"Failed to create agent {agent_type.__name__}: {e}")
            raise
    
    async def _cleanup_loop(self):
        """Background task to cleanup idle agents."""
        while self._running:
            try:
                await asyncio.sleep(30)  # Check every 30 seconds
                await self._cleanup_idle_agents()
                await self._maintain_minimum_pool()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Error in cleanup loop: {e}")
    
    async def _cleanup_idle_agents(self):
        """Remove agents that have been idle too long."""
        for agent_type, pool in self._pools.items():
            to_remove = []
            
            for entry in pool:
                # Check idle timeout
                if not entry.in_use and entry.idle_time > self.idle_timeout:
                    # Keep minimum pool size
                    active_count = sum(1 for e in pool if not e.in_use)
                    if active_count > self.min_size:
                        to_remove.append(entry)
                
                # Check lifetime
                lifetime = (datetime.now() - entry.created_at).total_seconds()
                if lifetime > self.max_agent_lifetime:
                    to_remove.append(entry)
            
            # Remove identified agents
            for entry in to_remove:
                if entry.in_use:
                    continue  # Skip if now in use
                
                pool.remove(entry)
                self._stats["evicted"] += 1
                
                try:
                    if hasattr(entry.agent, 'cleanup'):
                        await entry.agent.cleanup()
                except Exception as e:
                    logger.error(f"Error cleaning up agent: {e}")
                
                logger.debug(f"Evicted idle agent {agent_type.__name__}")
    
    async def _integrate_agent_memory(self, agent: BaseAgent):
        """Integrate agent with memory system if available."""
        try:
            from src.services.agent_memory_integration import get_memory_integration
            
            memory_integration = get_memory_integration()
            if memory_integration:
                await memory_integration.integrate_agent(agent)
                logger.info(f"Integrated {agent.agent_id} with memory system")
            else:
                logger.debug("Memory integration not available")
                
        except ImportError:
            logger.debug("Memory integration module not available")
        except Exception as e:
            logger.error(f"Failed to integrate agent with memory: {e}")
    
    async def _maintain_minimum_pool(self):
        """Ensure minimum pool size for each agent type."""
        for agent_type, pool in self._pools.items():
            available = sum(1 for e in pool if not e.in_use)
            
            # Create agents to maintain minimum
            while available < self.min_size and len(pool) < self.max_size:
                try:
                    agent = await self._create_agent(agent_type)
                    entry = AgentPoolEntry(agent)
                    pool.append(entry)
                    available += 1
                    logger.debug(f"Pre-warmed agent {agent_type.__name__}")
                except Exception as e:
                    logger.error(f"Failed to maintain pool: {e}")
                    break
    
    async def prewarm(self, agent_types: List[Type[BaseAgent]]):
        """Pre-warm the pool with specified agent types."""
        for agent_type in agent_types:
            if agent_type not in self._pools:
                self._pools[agent_type] = []
            
            # Create minimum agents
            pool = self._pools[agent_type]
            while len(pool) < self.min_size:
                try:
                    agent = await self._create_agent(agent_type)
                    entry = AgentPoolEntry(agent)
                    pool.append(entry)
                    logger.info(f"Pre-warmed {agent_type.__name__} agent")
                except Exception as e:
                    logger.error(f"Failed to prewarm {agent_type.__name__}: {e}")
                    break
    
    def get_stats(self) -> Dict[str, Any]:
        """Get pool statistics."""
        pool_stats = {}
        
        for agent_type, pool in self._pools.items():
            pool_stats[agent_type.__name__] = {
                "total": len(pool),
                "in_use": sum(1 for e in pool if e.in_use),
                "available": sum(1 for e in pool if not e.in_use),
                "avg_usage": sum(e.usage_count for e in pool) / len(pool) if pool else 0
            }
        
        return {
            "pools": pool_stats,
            "global_stats": self._stats,
            "total_agents": sum(len(p) for p in self._pools.values())
        }


# Global agent pool instance
agent_pool = AgentPool()


async def get_agent_pool() -> AgentPool:
    """Get the global agent pool instance."""
    if not agent_pool._running:
        await agent_pool.start()
    return agent_pool