File size: 4,175 Bytes
c7fed4d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
"""
Metrics wrapper for automatic agent performance tracking.
"""
import time
import functools
from typing import Any, Callable
import psutil
import os
from src.services.agent_metrics import MetricsCollector, agent_metrics_service
from src.core import get_logger
logger = get_logger("agent.metrics_wrapper")
def track_agent_metrics(action: str = None):
"""
Decorator to automatically track agent metrics.
Args:
action: Override action name (default: use function name)
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def async_wrapper(self, *args, **kwargs):
# Determine action name
action_name = action or func.__name__
# Skip if this is not an agent instance
if not hasattr(self, 'name'):
return await func(self, *args, **kwargs)
agent_name = self.name
# Track memory before execution
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss
# Use metrics collector
async with MetricsCollector(agent_name, action_name) as collector:
try:
# Execute the function
result = await func(self, *args, **kwargs)
# Extract quality score if available
if hasattr(result, 'metadata') and isinstance(result.metadata, dict):
quality_score = result.metadata.get('quality_score')
if quality_score is not None:
collector.set_quality_score(quality_score)
# Extract reflection count if this is a reflective agent
if hasattr(self, '_reflection_count'):
collector.reflection_iterations = getattr(self, '_reflection_count', 0)
# Track memory after execution
final_memory = process.memory_info().rss
memory_delta = final_memory - initial_memory
# Record memory usage
await agent_metrics_service.record_memory_usage(
agent_name,
final_memory
)
return result
except Exception as e:
# Let the collector handle error tracking
raise
@functools.wraps(func)
def sync_wrapper(self, *args, **kwargs):
# For synchronous methods, we just pass through
# Metrics are primarily for async agent operations
return func(self, *args, **kwargs)
# Return appropriate wrapper based on function type
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
class MetricsAwareAgent:
"""
Mixin class to make agents metrics-aware.
Add this to agent inheritance to get automatic metrics tracking.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metrics_enabled = True
self._reflection_count = 0
async def _record_quality_metric(self, quality_score: float):
"""Record quality score for the agent."""
if self._metrics_enabled and hasattr(self, 'name'):
# This is handled by the decorator now
pass
def _increment_reflection(self):
"""Increment reflection counter."""
self._reflection_count += 1
def _reset_reflection_count(self):
"""Reset reflection counter."""
self._reflection_count = 0
def enable_metrics(self):
"""Enable metrics collection."""
self._metrics_enabled = True
def disable_metrics(self):
"""Disable metrics collection."""
self._metrics_enabled = False
# Import asyncio for the decorator
import asyncio |