cidadao.ai-backend / src /services /webhook_service.py
anderson-ufrj
feat(notifications): implement comprehensive notification system
389e212
"""Webhook service for sending notifications to external endpoints.
This service provides async webhook delivery with:
- Retry logic with exponential backoff
- Request signing for security
- Batch webhook sending
- Event filtering
- Delivery status tracking
"""
import asyncio
import hashlib
import hmac
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional, Union
from enum import Enum
import httpx
from pydantic import BaseModel, HttpUrl, Field, validator
import structlog
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from src.core.config import settings
from src.core.logging import get_logger
from src.core import json_utils
logger = get_logger(__name__)
class WebhookEvent(str, Enum):
"""Webhook event types."""
INVESTIGATION_CREATED = "investigation.created"
INVESTIGATION_COMPLETED = "investigation.completed"
INVESTIGATION_FAILED = "investigation.failed"
ANOMALY_DETECTED = "anomaly.detected"
ANOMALY_RESOLVED = "anomaly.resolved"
AGENT_STARTED = "agent.started"
AGENT_COMPLETED = "agent.completed"
AGENT_FAILED = "agent.failed"
REPORT_GENERATED = "report.generated"
EXPORT_COMPLETED = "export.completed"
USER_REGISTERED = "user.registered"
USER_LOGIN = "user.login"
SYSTEM_ALERT = "system.alert"
SYSTEM_ERROR = "system.error"
class WebhookPayload(BaseModel):
"""Webhook payload model."""
event: WebhookEvent
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
data: Dict[str, Any]
metadata: Optional[Dict[str, Any]] = None
@validator("timestamp", pre=True)
def ensure_timezone(cls, v):
"""Ensure timestamp has timezone."""
if isinstance(v, datetime) and v.tzinfo is None:
return v.replace(tzinfo=timezone.utc)
return v
class WebhookConfig(BaseModel):
"""Webhook configuration model."""
url: HttpUrl
secret: Optional[str] = None
events: Optional[List[WebhookEvent]] = None # None means all events
headers: Optional[Dict[str, str]] = None
timeout: int = Field(default=30, ge=1, le=300)
max_retries: int = Field(default=3, ge=0, le=10)
active: bool = Field(default=True)
def should_send_event(self, event: WebhookEvent) -> bool:
"""Check if webhook should receive this event."""
if not self.active:
return False
if self.events is None:
return True
return event in self.events
class WebhookDelivery(BaseModel):
"""Webhook delivery result."""
webhook_url: str
event: WebhookEvent
timestamp: datetime
status_code: Optional[int] = None
response_body: Optional[str] = None
error: Optional[str] = None
attempts: int = 0
success: bool = False
duration_ms: Optional[float] = None
class WebhookService:
"""Service for managing and sending webhooks."""
def __init__(self):
"""Initialize webhook service."""
self._webhooks: List[WebhookConfig] = []
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0),
follow_redirects=False,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
self._delivery_history: List[WebhookDelivery] = []
self._max_history = 1000
def add_webhook(self, webhook: WebhookConfig) -> None:
"""Add a webhook configuration."""
self._webhooks.append(webhook)
logger.info(
"Webhook added",
url=str(webhook.url),
events=webhook.events,
active=webhook.active
)
def remove_webhook(self, url: str) -> bool:
"""Remove a webhook by URL."""
initial_count = len(self._webhooks)
self._webhooks = [w for w in self._webhooks if str(w.url) != url]
removed = len(self._webhooks) < initial_count
if removed:
logger.info("Webhook removed", url=url)
return removed
def list_webhooks(self) -> List[WebhookConfig]:
"""List all configured webhooks."""
return self._webhooks.copy()
def _generate_signature(self, payload: bytes, secret: str) -> str:
"""Generate HMAC signature for webhook payload."""
signature = hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return f"sha256={signature}"
def _prepare_request(
self,
webhook: WebhookConfig,
payload: WebhookPayload
) -> tuple[Dict[str, str], bytes]:
"""Prepare webhook request headers and body."""
# Serialize payload
body_data = {
"event": payload.event,
"timestamp": payload.timestamp.isoformat(),
"data": payload.data,
}
if payload.metadata:
body_data["metadata"] = payload.metadata
body = json_utils.dumps(body_data).encode()
# Prepare headers
headers = {
"Content-Type": "application/json",
"User-Agent": "Cidadao.AI/1.0",
"X-Cidadao-Event": payload.event,
"X-Cidadao-Timestamp": payload.timestamp.isoformat(),
}
# Add signature if secret is configured
if webhook.secret:
headers["X-Cidadao-Signature"] = self._generate_signature(body, webhook.secret)
# Add custom headers
if webhook.headers:
headers.update(webhook.headers)
return headers, body
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.ConnectError))
)
async def _send_webhook(
self,
webhook: WebhookConfig,
payload: WebhookPayload
) -> WebhookDelivery:
"""Send a single webhook with retry logic."""
delivery = WebhookDelivery(
webhook_url=str(webhook.url),
event=payload.event,
timestamp=datetime.now(timezone.utc)
)
try:
# Prepare request
headers, body = self._prepare_request(webhook, payload)
# Send request
start_time = asyncio.get_event_loop().time()
response = await self._client.post(
str(webhook.url),
headers=headers,
content=body,
timeout=webhook.timeout
)
end_time = asyncio.get_event_loop().time()
# Update delivery info
delivery.status_code = response.status_code
delivery.response_body = response.text[:1000] # Limit response size
delivery.success = 200 <= response.status_code < 300
delivery.duration_ms = (end_time - start_time) * 1000
delivery.attempts = 1 # Will be updated by retry decorator
if not delivery.success:
logger.warning(
"Webhook delivery failed",
url=str(webhook.url),
status_code=response.status_code,
response=delivery.response_body
)
except Exception as e:
delivery.error = str(e)
delivery.success = False
logger.error(
"Webhook delivery error",
url=str(webhook.url),
error=str(e),
exc_info=True
)
raise
return delivery
async def send_event(
self,
event: WebhookEvent,
data: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None
) -> List[WebhookDelivery]:
"""Send an event to all configured webhooks.
Args:
event: Event type
data: Event data
metadata: Optional metadata
Returns:
List of delivery results
"""
payload = WebhookPayload(
event=event,
data=data,
metadata=metadata
)
# Filter webhooks for this event
webhooks_to_send = [
webhook for webhook in self._webhooks
if webhook.should_send_event(event)
]
if not webhooks_to_send:
logger.debug("No webhooks configured for event", event=event)
return []
# Send webhooks concurrently
tasks = [
self._send_webhook(webhook, payload)
for webhook in webhooks_to_send
]
deliveries = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
results = []
for delivery in deliveries:
if isinstance(delivery, Exception):
# Create failed delivery record
delivery = WebhookDelivery(
webhook_url="unknown",
event=event,
timestamp=datetime.now(timezone.utc),
error=str(delivery),
success=False
)
results.append(delivery)
# Store in history
self._store_deliveries(results)
# Log summary
successful = sum(1 for d in results if d.success)
logger.info(
"Webhooks sent",
event=event,
total=len(results),
successful=successful,
failed=len(results) - successful
)
return results
def _store_deliveries(self, deliveries: List[WebhookDelivery]) -> None:
"""Store delivery results in history."""
self._delivery_history.extend(deliveries)
# Trim history if too large
if len(self._delivery_history) > self._max_history:
self._delivery_history = self._delivery_history[-self._max_history:]
def get_delivery_history(
self,
event: Optional[WebhookEvent] = None,
url: Optional[str] = None,
success: Optional[bool] = None,
limit: int = 100
) -> List[WebhookDelivery]:
"""Get webhook delivery history with filtering."""
history = self._delivery_history.copy()
# Apply filters
if event:
history = [d for d in history if d.event == event]
if url:
history = [d for d in history if d.webhook_url == url]
if success is not None:
history = [d for d in history if d.success == success]
# Sort by timestamp (newest first) and limit
history.sort(key=lambda d: d.timestamp, reverse=True)
return history[:limit]
async def test_webhook(self, webhook: WebhookConfig) -> WebhookDelivery:
"""Test a webhook configuration."""
test_payload = WebhookPayload(
event=WebhookEvent.SYSTEM_ALERT,
data={
"type": "test",
"message": "This is a test webhook from Cidadão.AI",
"timestamp": datetime.now(timezone.utc).isoformat()
},
metadata={"test": True}
)
return await self._send_webhook(webhook, test_payload)
async def close(self):
"""Close the webhook service."""
await self._client.aclose()
# Singleton instance
webhook_service = WebhookService()
# Convenience functions
async def send_webhook_event(
event: WebhookEvent,
data: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None
) -> List[WebhookDelivery]:
"""Send a webhook event using the default service."""
return await webhook_service.send_event(event, data, metadata)
async def register_webhook(
url: str,
events: Optional[List[WebhookEvent]] = None,
secret: Optional[str] = None,
**kwargs
) -> None:
"""Register a new webhook."""
config = WebhookConfig(
url=url,
events=events,
secret=secret,
**kwargs
)
webhook_service.add_webhook(config)