anderson-ufrj commited on
Commit
d568e3b
·
1 Parent(s): 4230b3c

refactor(agents): migrate Zumbi agent to new BaseAgent pattern

Browse files

- Update constructor to use BaseAgent parameters (name, description, capabilities)
- Replace execute() method with process() returning AgentResponse
- Add initialize() and shutdown() methods as required by BaseAgent
- Update all agent_id references to use self.name
- Fix message parsing to use action/payload instead of message_type/content
- Update error handling and return types to match new pattern

This aligns Zumbi agent with the modern agent architecture defined in deodoro.py

Files changed (1) hide show
  1. src/agents/zumbi.py +51 -28
src/agents/zumbi.py CHANGED
@@ -16,8 +16,8 @@ import numpy as np
16
  import pandas as pd
17
  from pydantic import BaseModel, Field as PydanticField
18
 
19
- from src.agents.deodoro import BaseAgent, AgentContext, AgentMessage
20
- from src.core import get_logger
21
  from src.core.exceptions import AgentExecutionError, DataAnalysisError
22
  from src.core.monitoring import (
23
  INVESTIGATIONS_TOTAL, ANOMALIES_DETECTED, INVESTIGATION_DURATION,
@@ -70,7 +70,6 @@ class InvestigatorAgent(BaseAgent):
70
 
71
  def __init__(
72
  self,
73
- agent_id: str = "investigator",
74
  price_anomaly_threshold: float = 2.5, # Standard deviations
75
  concentration_threshold: float = 0.7, # 70% concentration trigger
76
  duplicate_similarity_threshold: float = 0.85, # 85% similarity
@@ -79,16 +78,28 @@ class InvestigatorAgent(BaseAgent):
79
  Initialize the Investigator Agent.
80
 
81
  Args:
82
- agent_id: Unique identifier for this agent
83
  price_anomaly_threshold: Number of standard deviations for price anomalies
84
  concentration_threshold: Threshold for vendor concentration (0-1)
85
  duplicate_similarity_threshold: Threshold for duplicate detection (0-1)
86
  """
87
- super().__init__(agent_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  self.price_threshold = price_anomaly_threshold
89
  self.concentration_threshold = concentration_threshold
90
  self.duplicate_threshold = duplicate_similarity_threshold
91
- self.logger = get_logger(__name__)
92
 
93
  # Initialize models client for ML inference (only if enabled)
94
  from src.core import settings
@@ -113,25 +124,33 @@ class InvestigatorAgent(BaseAgent):
113
 
114
  self.logger.info(
115
  "zumbi_initialized",
116
- agent_id=agent_id,
117
  price_threshold=price_anomaly_threshold,
118
  concentration_threshold=concentration_threshold,
119
  )
120
 
121
- async def execute(
 
 
 
 
 
 
 
 
122
  self,
123
  message: AgentMessage,
124
  context: AgentContext
125
- ) -> AgentMessage:
126
  """
127
- Execute investigation based on the incoming message.
128
 
129
  Args:
130
  message: Investigation request message
131
  context: Agent execution context
132
 
133
  Returns:
134
- Investigation results with detected anomalies
135
  """
136
  investigation_start_time = time.time()
137
 
@@ -139,13 +158,13 @@ class InvestigatorAgent(BaseAgent):
139
  self.logger.info(
140
  "investigation_started",
141
  investigation_id=context.investigation_id,
142
- agent_id=self.agent_id,
143
- message_type=message.message_type,
144
  )
145
 
146
  # Parse investigation request
147
- if message.message_type == "investigation_request":
148
- request = InvestigationRequest(**message.content)
149
 
150
  # Record investigation start
151
  INVESTIGATIONS_TOTAL.labels(
@@ -156,8 +175,8 @@ class InvestigatorAgent(BaseAgent):
156
 
157
  else:
158
  raise AgentExecutionError(
159
- f"Unsupported message type: {message.message_type}",
160
- agent_id=self.agent_id
161
  )
162
 
163
  # Fetch data for investigation
@@ -171,9 +190,10 @@ class InvestigatorAgent(BaseAgent):
171
  ).inc(len(contracts_data) if contracts_data else 0)
172
 
173
  if not contracts_data:
174
- return AgentMessage(
175
- message_type="investigation_result",
176
- content={
 
177
  "status": "no_data",
178
  "message": "No data found for the specified criteria",
179
  "anomalies": [],
@@ -209,7 +229,7 @@ class InvestigatorAgent(BaseAgent):
209
  "metadata": {
210
  "investigation_id": context.investigation_id,
211
  "timestamp": datetime.utcnow().isoformat(),
212
- "agent_id": self.agent_id,
213
  "records_analyzed": len(contracts_data),
214
  "anomalies_detected": len(anomalies),
215
  }
@@ -236,9 +256,10 @@ class InvestigatorAgent(BaseAgent):
236
  duration_seconds=investigation_duration,
237
  )
238
 
239
- return AgentMessage(
240
- message_type="investigation_result",
241
- content=result,
 
242
  metadata={"investigation_id": context.investigation_id}
243
  )
244
 
@@ -254,12 +275,14 @@ class InvestigatorAgent(BaseAgent):
254
  "investigation_failed",
255
  investigation_id=context.investigation_id,
256
  error=str(e),
257
- agent_id=self.agent_id,
258
  )
259
 
260
- return AgentMessage(
261
- message_type="investigation_error",
262
- content={
 
 
263
  "status": "error",
264
  "error": str(e),
265
  "investigation_id": context.investigation_id,
 
16
  import pandas as pd
17
  from pydantic import BaseModel, Field as PydanticField
18
 
19
+ from src.agents.deodoro import BaseAgent, AgentContext, AgentMessage, AgentResponse
20
+ from src.core import get_logger, AgentStatus
21
  from src.core.exceptions import AgentExecutionError, DataAnalysisError
22
  from src.core.monitoring import (
23
  INVESTIGATIONS_TOTAL, ANOMALIES_DETECTED, INVESTIGATION_DURATION,
 
70
 
71
  def __init__(
72
  self,
 
73
  price_anomaly_threshold: float = 2.5, # Standard deviations
74
  concentration_threshold: float = 0.7, # 70% concentration trigger
75
  duplicate_similarity_threshold: float = 0.85, # 85% similarity
 
78
  Initialize the Investigator Agent.
79
 
80
  Args:
 
81
  price_anomaly_threshold: Number of standard deviations for price anomalies
82
  concentration_threshold: Threshold for vendor concentration (0-1)
83
  duplicate_similarity_threshold: Threshold for duplicate detection (0-1)
84
  """
85
+ super().__init__(
86
+ name="Zumbi",
87
+ description="Zumbi dos Palmares - Agent specialized in detecting anomalies and suspicious patterns in government data",
88
+ capabilities=[
89
+ "price_anomaly_detection",
90
+ "temporal_pattern_analysis",
91
+ "vendor_concentration_analysis",
92
+ "duplicate_contract_detection",
93
+ "payment_pattern_analysis",
94
+ "spectral_analysis",
95
+ "explainable_ai"
96
+ ],
97
+ max_retries=3,
98
+ timeout=60
99
+ )
100
  self.price_threshold = price_anomaly_threshold
101
  self.concentration_threshold = concentration_threshold
102
  self.duplicate_threshold = duplicate_similarity_threshold
 
103
 
104
  # Initialize models client for ML inference (only if enabled)
105
  from src.core import settings
 
124
 
125
  self.logger.info(
126
  "zumbi_initialized",
127
+ agent_name=self.name,
128
  price_threshold=price_anomaly_threshold,
129
  concentration_threshold=concentration_threshold,
130
  )
131
 
132
+ async def initialize(self) -> None:
133
+ """Initialize agent resources."""
134
+ self.logger.info(f"{self.name} agent initialized")
135
+
136
+ async def shutdown(self) -> None:
137
+ """Cleanup agent resources."""
138
+ self.logger.info(f"{self.name} agent shutting down")
139
+
140
+ async def process(
141
  self,
142
  message: AgentMessage,
143
  context: AgentContext
144
+ ) -> AgentResponse:
145
  """
146
+ Process investigation request and return anomaly detection results.
147
 
148
  Args:
149
  message: Investigation request message
150
  context: Agent execution context
151
 
152
  Returns:
153
+ AgentResponse with detected anomalies
154
  """
155
  investigation_start_time = time.time()
156
 
 
158
  self.logger.info(
159
  "investigation_started",
160
  investigation_id=context.investigation_id,
161
+ agent_name=self.name,
162
+ action=message.action,
163
  )
164
 
165
  # Parse investigation request
166
+ if message.action == "investigate":
167
+ request = InvestigationRequest(**message.payload)
168
 
169
  # Record investigation start
170
  INVESTIGATIONS_TOTAL.labels(
 
175
 
176
  else:
177
  raise AgentExecutionError(
178
+ f"Unsupported action: {message.action}",
179
+ agent_id=self.name
180
  )
181
 
182
  # Fetch data for investigation
 
190
  ).inc(len(contracts_data) if contracts_data else 0)
191
 
192
  if not contracts_data:
193
+ return AgentResponse(
194
+ agent_name=self.name,
195
+ status=AgentStatus.COMPLETED,
196
+ result={
197
  "status": "no_data",
198
  "message": "No data found for the specified criteria",
199
  "anomalies": [],
 
229
  "metadata": {
230
  "investigation_id": context.investigation_id,
231
  "timestamp": datetime.utcnow().isoformat(),
232
+ "agent_name": self.name,
233
  "records_analyzed": len(contracts_data),
234
  "anomalies_detected": len(anomalies),
235
  }
 
256
  duration_seconds=investigation_duration,
257
  )
258
 
259
+ return AgentResponse(
260
+ agent_name=self.name,
261
+ status=AgentStatus.COMPLETED,
262
+ result=result,
263
  metadata={"investigation_id": context.investigation_id}
264
  )
265
 
 
275
  "investigation_failed",
276
  investigation_id=context.investigation_id,
277
  error=str(e),
278
+ agent_name=self.name,
279
  )
280
 
281
+ return AgentResponse(
282
+ agent_name=self.name,
283
+ status=AgentStatus.ERROR,
284
+ error=str(e),
285
+ result={
286
  "status": "error",
287
  "error": str(e),
288
  "investigation_id": context.investigation_id,