|
|
"""Webhook service for sending notifications to external endpoints. |
|
|
|
|
|
This service provides async webhook delivery with: |
|
|
- Retry logic with exponential backoff |
|
|
- Request signing for security |
|
|
- Batch webhook sending |
|
|
- Event filtering |
|
|
- Delivery status tracking |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import hashlib |
|
|
import hmac |
|
|
from datetime import datetime, timezone |
|
|
from typing import List, Dict, Any, Optional, Union |
|
|
from enum import Enum |
|
|
import httpx |
|
|
from pydantic import BaseModel, HttpUrl, Field, validator |
|
|
import structlog |
|
|
from tenacity import ( |
|
|
retry, |
|
|
stop_after_attempt, |
|
|
wait_exponential, |
|
|
retry_if_exception_type |
|
|
) |
|
|
|
|
|
from src.core.config import settings |
|
|
from src.core.logging import get_logger |
|
|
from src.core import json_utils |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class WebhookEvent(str, Enum): |
|
|
"""Webhook event types.""" |
|
|
INVESTIGATION_CREATED = "investigation.created" |
|
|
INVESTIGATION_COMPLETED = "investigation.completed" |
|
|
INVESTIGATION_FAILED = "investigation.failed" |
|
|
|
|
|
ANOMALY_DETECTED = "anomaly.detected" |
|
|
ANOMALY_RESOLVED = "anomaly.resolved" |
|
|
|
|
|
AGENT_STARTED = "agent.started" |
|
|
AGENT_COMPLETED = "agent.completed" |
|
|
AGENT_FAILED = "agent.failed" |
|
|
|
|
|
REPORT_GENERATED = "report.generated" |
|
|
EXPORT_COMPLETED = "export.completed" |
|
|
|
|
|
USER_REGISTERED = "user.registered" |
|
|
USER_LOGIN = "user.login" |
|
|
|
|
|
SYSTEM_ALERT = "system.alert" |
|
|
SYSTEM_ERROR = "system.error" |
|
|
|
|
|
|
|
|
class WebhookPayload(BaseModel): |
|
|
"""Webhook payload model.""" |
|
|
event: WebhookEvent |
|
|
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
|
|
data: Dict[str, Any] |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
|
|
|
@validator("timestamp", pre=True) |
|
|
def ensure_timezone(cls, v): |
|
|
"""Ensure timestamp has timezone.""" |
|
|
if isinstance(v, datetime) and v.tzinfo is None: |
|
|
return v.replace(tzinfo=timezone.utc) |
|
|
return v |
|
|
|
|
|
|
|
|
class WebhookConfig(BaseModel): |
|
|
"""Webhook configuration model.""" |
|
|
url: HttpUrl |
|
|
secret: Optional[str] = None |
|
|
events: Optional[List[WebhookEvent]] = None |
|
|
headers: Optional[Dict[str, str]] = None |
|
|
timeout: int = Field(default=30, ge=1, le=300) |
|
|
max_retries: int = Field(default=3, ge=0, le=10) |
|
|
active: bool = Field(default=True) |
|
|
|
|
|
def should_send_event(self, event: WebhookEvent) -> bool: |
|
|
"""Check if webhook should receive this event.""" |
|
|
if not self.active: |
|
|
return False |
|
|
if self.events is None: |
|
|
return True |
|
|
return event in self.events |
|
|
|
|
|
|
|
|
class WebhookDelivery(BaseModel): |
|
|
"""Webhook delivery result.""" |
|
|
webhook_url: str |
|
|
event: WebhookEvent |
|
|
timestamp: datetime |
|
|
status_code: Optional[int] = None |
|
|
response_body: Optional[str] = None |
|
|
error: Optional[str] = None |
|
|
attempts: int = 0 |
|
|
success: bool = False |
|
|
duration_ms: Optional[float] = None |
|
|
|
|
|
|
|
|
class WebhookService: |
|
|
"""Service for managing and sending webhooks.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize webhook service.""" |
|
|
self._webhooks: List[WebhookConfig] = [] |
|
|
self._client = httpx.AsyncClient( |
|
|
timeout=httpx.Timeout(30.0), |
|
|
follow_redirects=False, |
|
|
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20) |
|
|
) |
|
|
self._delivery_history: List[WebhookDelivery] = [] |
|
|
self._max_history = 1000 |
|
|
|
|
|
def add_webhook(self, webhook: WebhookConfig) -> None: |
|
|
"""Add a webhook configuration.""" |
|
|
self._webhooks.append(webhook) |
|
|
logger.info( |
|
|
"Webhook added", |
|
|
url=str(webhook.url), |
|
|
events=webhook.events, |
|
|
active=webhook.active |
|
|
) |
|
|
|
|
|
def remove_webhook(self, url: str) -> bool: |
|
|
"""Remove a webhook by URL.""" |
|
|
initial_count = len(self._webhooks) |
|
|
self._webhooks = [w for w in self._webhooks if str(w.url) != url] |
|
|
removed = len(self._webhooks) < initial_count |
|
|
|
|
|
if removed: |
|
|
logger.info("Webhook removed", url=url) |
|
|
|
|
|
return removed |
|
|
|
|
|
def list_webhooks(self) -> List[WebhookConfig]: |
|
|
"""List all configured webhooks.""" |
|
|
return self._webhooks.copy() |
|
|
|
|
|
def _generate_signature(self, payload: bytes, secret: str) -> str: |
|
|
"""Generate HMAC signature for webhook payload.""" |
|
|
signature = hmac.new( |
|
|
secret.encode(), |
|
|
payload, |
|
|
hashlib.sha256 |
|
|
).hexdigest() |
|
|
return f"sha256={signature}" |
|
|
|
|
|
def _prepare_request( |
|
|
self, |
|
|
webhook: WebhookConfig, |
|
|
payload: WebhookPayload |
|
|
) -> tuple[Dict[str, str], bytes]: |
|
|
"""Prepare webhook request headers and body.""" |
|
|
|
|
|
body_data = { |
|
|
"event": payload.event, |
|
|
"timestamp": payload.timestamp.isoformat(), |
|
|
"data": payload.data, |
|
|
} |
|
|
if payload.metadata: |
|
|
body_data["metadata"] = payload.metadata |
|
|
|
|
|
body = json_utils.dumps(body_data).encode() |
|
|
|
|
|
|
|
|
headers = { |
|
|
"Content-Type": "application/json", |
|
|
"User-Agent": "Cidadao.AI/1.0", |
|
|
"X-Cidadao-Event": payload.event, |
|
|
"X-Cidadao-Timestamp": payload.timestamp.isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
if webhook.secret: |
|
|
headers["X-Cidadao-Signature"] = self._generate_signature(body, webhook.secret) |
|
|
|
|
|
|
|
|
if webhook.headers: |
|
|
headers.update(webhook.headers) |
|
|
|
|
|
return headers, body |
|
|
|
|
|
@retry( |
|
|
stop=stop_after_attempt(3), |
|
|
wait=wait_exponential(multiplier=1, min=4, max=10), |
|
|
retry=retry_if_exception_type((httpx.TimeoutException, httpx.ConnectError)) |
|
|
) |
|
|
async def _send_webhook( |
|
|
self, |
|
|
webhook: WebhookConfig, |
|
|
payload: WebhookPayload |
|
|
) -> WebhookDelivery: |
|
|
"""Send a single webhook with retry logic.""" |
|
|
delivery = WebhookDelivery( |
|
|
webhook_url=str(webhook.url), |
|
|
event=payload.event, |
|
|
timestamp=datetime.now(timezone.utc) |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
headers, body = self._prepare_request(webhook, payload) |
|
|
|
|
|
|
|
|
start_time = asyncio.get_event_loop().time() |
|
|
response = await self._client.post( |
|
|
str(webhook.url), |
|
|
headers=headers, |
|
|
content=body, |
|
|
timeout=webhook.timeout |
|
|
) |
|
|
end_time = asyncio.get_event_loop().time() |
|
|
|
|
|
|
|
|
delivery.status_code = response.status_code |
|
|
delivery.response_body = response.text[:1000] |
|
|
delivery.success = 200 <= response.status_code < 300 |
|
|
delivery.duration_ms = (end_time - start_time) * 1000 |
|
|
delivery.attempts = 1 |
|
|
|
|
|
if not delivery.success: |
|
|
logger.warning( |
|
|
"Webhook delivery failed", |
|
|
url=str(webhook.url), |
|
|
status_code=response.status_code, |
|
|
response=delivery.response_body |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
delivery.error = str(e) |
|
|
delivery.success = False |
|
|
logger.error( |
|
|
"Webhook delivery error", |
|
|
url=str(webhook.url), |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
raise |
|
|
|
|
|
return delivery |
|
|
|
|
|
async def send_event( |
|
|
self, |
|
|
event: WebhookEvent, |
|
|
data: Dict[str, Any], |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
) -> List[WebhookDelivery]: |
|
|
"""Send an event to all configured webhooks. |
|
|
|
|
|
Args: |
|
|
event: Event type |
|
|
data: Event data |
|
|
metadata: Optional metadata |
|
|
|
|
|
Returns: |
|
|
List of delivery results |
|
|
""" |
|
|
payload = WebhookPayload( |
|
|
event=event, |
|
|
data=data, |
|
|
metadata=metadata |
|
|
) |
|
|
|
|
|
|
|
|
webhooks_to_send = [ |
|
|
webhook for webhook in self._webhooks |
|
|
if webhook.should_send_event(event) |
|
|
] |
|
|
|
|
|
if not webhooks_to_send: |
|
|
logger.debug("No webhooks configured for event", event=event) |
|
|
return [] |
|
|
|
|
|
|
|
|
tasks = [ |
|
|
self._send_webhook(webhook, payload) |
|
|
for webhook in webhooks_to_send |
|
|
] |
|
|
|
|
|
deliveries = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
results = [] |
|
|
for delivery in deliveries: |
|
|
if isinstance(delivery, Exception): |
|
|
|
|
|
delivery = WebhookDelivery( |
|
|
webhook_url="unknown", |
|
|
event=event, |
|
|
timestamp=datetime.now(timezone.utc), |
|
|
error=str(delivery), |
|
|
success=False |
|
|
) |
|
|
results.append(delivery) |
|
|
|
|
|
|
|
|
self._store_deliveries(results) |
|
|
|
|
|
|
|
|
successful = sum(1 for d in results if d.success) |
|
|
logger.info( |
|
|
"Webhooks sent", |
|
|
event=event, |
|
|
total=len(results), |
|
|
successful=successful, |
|
|
failed=len(results) - successful |
|
|
) |
|
|
|
|
|
return results |
|
|
|
|
|
def _store_deliveries(self, deliveries: List[WebhookDelivery]) -> None: |
|
|
"""Store delivery results in history.""" |
|
|
self._delivery_history.extend(deliveries) |
|
|
|
|
|
|
|
|
if len(self._delivery_history) > self._max_history: |
|
|
self._delivery_history = self._delivery_history[-self._max_history:] |
|
|
|
|
|
def get_delivery_history( |
|
|
self, |
|
|
event: Optional[WebhookEvent] = None, |
|
|
url: Optional[str] = None, |
|
|
success: Optional[bool] = None, |
|
|
limit: int = 100 |
|
|
) -> List[WebhookDelivery]: |
|
|
"""Get webhook delivery history with filtering.""" |
|
|
history = self._delivery_history.copy() |
|
|
|
|
|
|
|
|
if event: |
|
|
history = [d for d in history if d.event == event] |
|
|
if url: |
|
|
history = [d for d in history if d.webhook_url == url] |
|
|
if success is not None: |
|
|
history = [d for d in history if d.success == success] |
|
|
|
|
|
|
|
|
history.sort(key=lambda d: d.timestamp, reverse=True) |
|
|
return history[:limit] |
|
|
|
|
|
async def test_webhook(self, webhook: WebhookConfig) -> WebhookDelivery: |
|
|
"""Test a webhook configuration.""" |
|
|
test_payload = WebhookPayload( |
|
|
event=WebhookEvent.SYSTEM_ALERT, |
|
|
data={ |
|
|
"type": "test", |
|
|
"message": "This is a test webhook from Cidadão.AI", |
|
|
"timestamp": datetime.now(timezone.utc).isoformat() |
|
|
}, |
|
|
metadata={"test": True} |
|
|
) |
|
|
|
|
|
return await self._send_webhook(webhook, test_payload) |
|
|
|
|
|
async def close(self): |
|
|
"""Close the webhook service.""" |
|
|
await self._client.aclose() |
|
|
|
|
|
|
|
|
|
|
|
webhook_service = WebhookService() |
|
|
|
|
|
|
|
|
|
|
|
async def send_webhook_event( |
|
|
event: WebhookEvent, |
|
|
data: Dict[str, Any], |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
) -> List[WebhookDelivery]: |
|
|
"""Send a webhook event using the default service.""" |
|
|
return await webhook_service.send_event(event, data, metadata) |
|
|
|
|
|
|
|
|
async def register_webhook( |
|
|
url: str, |
|
|
events: Optional[List[WebhookEvent]] = None, |
|
|
secret: Optional[str] = None, |
|
|
**kwargs |
|
|
) -> None: |
|
|
"""Register a new webhook.""" |
|
|
config = WebhookConfig( |
|
|
url=url, |
|
|
events=events, |
|
|
secret=secret, |
|
|
**kwargs |
|
|
) |
|
|
webhook_service.add_webhook(config) |