File size: 10,603 Bytes
64de3c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
"""
Command handling for CQRS pattern implementation.

This module provides command definitions and handlers for write operations,
separating them from read queries for better scalability.
"""

from typing import Any, Dict, Optional, Type, TypeVar, Generic
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
import uuid

from pydantic import BaseModel, Field

from src.core import get_logger
from src.infrastructure.events.event_bus import EventBus, EventType, Event

logger = get_logger(__name__)

T = TypeVar('T')


class Command(BaseModel):
    """Base class for all commands."""
    command_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    user_id: Optional[str] = None
    metadata: Dict[str, Any] = Field(default_factory=dict)


class CommandResult(BaseModel):
    """Result of command execution."""
    success: bool
    command_id: str
    data: Optional[Any] = None
    error: Optional[str] = None
    events_published: int = 0


# Investigation Commands
class CreateInvestigationCommand(Command):
    """Command to create a new investigation."""
    query: str
    data_sources: Optional[list[str]] = None
    priority: str = "medium"


class UpdateInvestigationCommand(Command):
    """Command to update investigation status."""
    investigation_id: str
    status: str
    results: Optional[Dict[str, Any]] = None


class CancelInvestigationCommand(Command):
    """Command to cancel an investigation."""
    investigation_id: str
    reason: Optional[str] = None


# Agent Commands
class ExecuteAgentTaskCommand(Command):
    """Command to execute an agent task."""
    agent_name: str
    task_type: str
    payload: Dict[str, Any]
    timeout: Optional[float] = None


# Chat Commands
class SendChatMessageCommand(Command):
    """Command to send a chat message."""
    session_id: str
    message: str
    context: Optional[Dict[str, Any]] = None


class CommandHandler(ABC, Generic[T]):
    """
    Base class for command handlers.
    
    Implements the handler pattern for processing commands.
    """
    
    @abstractmethod
    async def handle(self, command: T) -> CommandResult:
        """
        Handle a command.
        
        Args:
            command: Command to handle
            
        Returns:
            Command execution result
        """
        pass
    
    @abstractmethod
    def can_handle(self, command: Command) -> bool:
        """
        Check if this handler can handle the command.
        
        Args:
            command: Command to check
            
        Returns:
            True if handler can process this command
        """
        pass


class CreateInvestigationHandler(CommandHandler[CreateInvestigationCommand]):
    """Handler for creating investigations."""
    
    def __init__(self, event_bus: EventBus):
        """
        Initialize handler.
        
        Args:
            event_bus: Event bus for publishing events
        """
        self.event_bus = event_bus
        self.logger = get_logger(__name__)
    
    async def handle(self, command: CreateInvestigationCommand) -> CommandResult:
        """Create a new investigation."""
        try:
            investigation_id = str(uuid.uuid4())
            
            # In a real implementation, this would:
            # 1. Validate the command
            # 2. Create the investigation in the write model
            # 3. Publish events
            
            # Publish investigation created event
            await self.event_bus.publish(
                EventType.INVESTIGATION_CREATED,
                {
                    "investigation_id": investigation_id,
                    "query": command.query,
                    "user_id": command.user_id,
                    "data_sources": command.data_sources,
                    "priority": command.priority
                },
                {"command_id": command.command_id}
            )
            
            self.logger.info(f"Investigation {investigation_id} created")
            
            return CommandResult(
                success=True,
                command_id=command.command_id,
                data={"investigation_id": investigation_id},
                events_published=1
            )
            
        except Exception as e:
            self.logger.error(f"Failed to create investigation: {e}")
            return CommandResult(
                success=False,
                command_id=command.command_id,
                error=str(e)
            )
    
    def can_handle(self, command: Command) -> bool:
        """Check if this handler can handle the command."""
        return isinstance(command, CreateInvestigationCommand)


class CommandBus:
    """
    Command bus for routing commands to appropriate handlers.
    
    Implements the mediator pattern for command processing.
    """
    
    def __init__(self, event_bus: EventBus):
        """
        Initialize command bus.
        
        Args:
            event_bus: Event bus for publishing events
        """
        self.event_bus = event_bus
        self._handlers: list[CommandHandler] = []
        self._middleware: list[CommandMiddleware] = []
        
        # Statistics
        self._stats = {
            "commands_processed": 0,
            "commands_succeeded": 0,
            "commands_failed": 0
        }
        
        # Register default handlers
        self._register_default_handlers()
    
    def _register_default_handlers(self):
        """Register default command handlers."""
        self.register_handler(CreateInvestigationHandler(self.event_bus))
    
    def register_handler(self, handler: CommandHandler):
        """
        Register a command handler.
        
        Args:
            handler: Handler to register
        """
        self._handlers.append(handler)
        logger.info(f"Registered command handler: {handler.__class__.__name__}")
    
    def register_middleware(self, middleware: 'CommandMiddleware'):
        """
        Register command middleware.
        
        Args:
            middleware: Middleware to register
        """
        self._middleware.append(middleware)
        logger.info(f"Registered command middleware: {middleware.__class__.__name__}")
    
    async def execute(self, command: Command) -> CommandResult:
        """
        Execute a command.
        
        Args:
            command: Command to execute
            
        Returns:
            Command execution result
        """
        self._stats["commands_processed"] += 1
        
        try:
            # Apply middleware
            for middleware in self._middleware:
                command = await middleware.before_execute(command)
            
            # Find handler
            handler = None
            for h in self._handlers:
                if h.can_handle(command):
                    handler = h
                    break
            
            if not handler:
                raise ValueError(f"No handler found for command: {type(command).__name__}")
            
            # Execute command
            result = await handler.handle(command)
            
            # Apply middleware to result
            for middleware in reversed(self._middleware):
                result = await middleware.after_execute(command, result)
            
            if result.success:
                self._stats["commands_succeeded"] += 1
            else:
                self._stats["commands_failed"] += 1
            
            return result
            
        except Exception as e:
            logger.error(f"Command execution failed: {e}")
            self._stats["commands_failed"] += 1
            
            return CommandResult(
                success=False,
                command_id=command.command_id,
                error=str(e)
            )
    
    def get_stats(self) -> Dict[str, Any]:
        """Get command bus statistics."""
        return {
            **self._stats,
            "handlers_registered": len(self._handlers),
            "middleware_registered": len(self._middleware),
            "success_rate": (
                self._stats["commands_succeeded"] / self._stats["commands_processed"]
                if self._stats["commands_processed"] > 0 else 0
            )
        }


class CommandMiddleware(ABC):
    """Base class for command middleware."""
    
    @abstractmethod
    async def before_execute(self, command: Command) -> Command:
        """
        Process command before execution.
        
        Args:
            command: Command to process
            
        Returns:
            Processed command
        """
        pass
    
    @abstractmethod
    async def after_execute(
        self,
        command: Command,
        result: CommandResult
    ) -> CommandResult:
        """
        Process result after execution.
        
        Args:
            command: Executed command
            result: Execution result
            
        Returns:
            Processed result
        """
        pass


class LoggingMiddleware(CommandMiddleware):
    """Middleware for logging commands."""
    
    def __init__(self):
        self.logger = get_logger(__name__)
    
    async def before_execute(self, command: Command) -> Command:
        """Log command execution."""
        self.logger.info(
            f"Executing command {command.__class__.__name__} "
            f"(id: {command.command_id})"
        )
        return command
    
    async def after_execute(
        self,
        command: Command,
        result: CommandResult
    ) -> CommandResult:
        """Log command result."""
        if result.success:
            self.logger.info(
                f"Command {command.command_id} succeeded "
                f"(events: {result.events_published})"
            )
        else:
            self.logger.error(
                f"Command {command.command_id} failed: {result.error}"
            )
        return result


class ValidationMiddleware(CommandMiddleware):
    """Middleware for validating commands."""
    
    async def before_execute(self, command: Command) -> Command:
        """Validate command."""
        # Perform validation
        if hasattr(command, 'validate'):
            command.validate()
        return command
    
    async def after_execute(
        self,
        command: Command,
        result: CommandResult
    ) -> CommandResult:
        """No post-processing needed."""
        return result