|
|
"""Email service for sending notifications via SMTP. |
|
|
|
|
|
This service provides async email sending capabilities with support for: |
|
|
- HTML and plain text emails |
|
|
- Attachments |
|
|
- Multiple recipients (to, cc, bcc) |
|
|
- Email templates using Jinja2 |
|
|
- Retry logic with exponential backoff |
|
|
- Connection pooling |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from email.mime.multipart import MIMEMultipart |
|
|
from email.mime.text import MIMEText |
|
|
from email.mime.base import MIMEBase |
|
|
from email import encoders |
|
|
import os |
|
|
from typing import List, Optional, Dict, Any, Union |
|
|
from pathlib import Path |
|
|
import aiosmtplib |
|
|
from email_validator import validate_email, EmailNotValidError |
|
|
from pydantic import BaseModel, EmailStr, Field, validator |
|
|
from jinja2 import Environment, FileSystemLoader, select_autoescape |
|
|
import structlog |
|
|
from tenacity import retry, stop_after_attempt, wait_exponential |
|
|
|
|
|
from src.core.config import settings |
|
|
from src.core.logging import get_logger |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class EmailAttachment(BaseModel): |
|
|
"""Email attachment model.""" |
|
|
filename: str |
|
|
content: Union[bytes, str] |
|
|
content_type: str = "application/octet-stream" |
|
|
|
|
|
@validator("content") |
|
|
def validate_content(cls, v): |
|
|
"""Ensure content is bytes.""" |
|
|
if isinstance(v, str): |
|
|
return v.encode() |
|
|
return v |
|
|
|
|
|
|
|
|
class EmailMessage(BaseModel): |
|
|
"""Email message model with validation.""" |
|
|
to: List[EmailStr] |
|
|
subject: str |
|
|
body: Optional[str] = None |
|
|
html_body: Optional[str] = None |
|
|
cc: Optional[List[EmailStr]] = None |
|
|
bcc: Optional[List[EmailStr]] = None |
|
|
reply_to: Optional[EmailStr] = None |
|
|
attachments: Optional[List[EmailAttachment]] = None |
|
|
headers: Optional[Dict[str, str]] = None |
|
|
template: Optional[str] = None |
|
|
template_data: Optional[Dict[str, Any]] = None |
|
|
|
|
|
@validator("to", "cc", "bcc", pre=True) |
|
|
def validate_emails(cls, v): |
|
|
"""Validate email addresses.""" |
|
|
if v is None: |
|
|
return v |
|
|
if isinstance(v, str): |
|
|
v = [v] |
|
|
validated = [] |
|
|
for email in v: |
|
|
try: |
|
|
validated_email = validate_email(email) |
|
|
validated.append(validated_email.email) |
|
|
except EmailNotValidError as e: |
|
|
logger.warning(f"Invalid email address: {email} - {e}") |
|
|
continue |
|
|
return validated |
|
|
|
|
|
@validator("body", always=True) |
|
|
def validate_body(cls, v, values): |
|
|
"""Ensure at least one body type is provided.""" |
|
|
if not v and not values.get("html_body") and not values.get("template"): |
|
|
raise ValueError("Either body, html_body, or template must be provided") |
|
|
return v |
|
|
|
|
|
|
|
|
class SMTPConfig(BaseModel): |
|
|
"""SMTP configuration model.""" |
|
|
host: str = Field(default_factory=lambda: settings.smtp_host) |
|
|
port: int = Field(default_factory=lambda: settings.smtp_port) |
|
|
username: Optional[str] = Field(default_factory=lambda: settings.smtp_username) |
|
|
password: Optional[str] = Field(default_factory=lambda: settings.smtp_password.get_secret_value() if settings.smtp_password else None) |
|
|
use_tls: bool = Field(default_factory=lambda: settings.smtp_use_tls) |
|
|
use_ssl: bool = Field(default_factory=lambda: settings.smtp_use_ssl) |
|
|
timeout: int = Field(default=30) |
|
|
from_email: EmailStr = Field(default_factory=lambda: settings.smtp_from_email) |
|
|
from_name: str = Field(default_factory=lambda: settings.smtp_from_name) |
|
|
|
|
|
|
|
|
class EmailService: |
|
|
"""Service for sending emails via SMTP.""" |
|
|
|
|
|
def __init__(self, config: Optional[SMTPConfig] = None): |
|
|
"""Initialize email service. |
|
|
|
|
|
Args: |
|
|
config: SMTP configuration. If not provided, uses settings. |
|
|
""" |
|
|
self.config = config or SMTPConfig() |
|
|
self._template_env = self._setup_template_environment() |
|
|
self._connection_lock = asyncio.Lock() |
|
|
self._smtp_client: Optional[aiosmtplib.SMTP] = None |
|
|
|
|
|
def _setup_template_environment(self) -> Environment: |
|
|
"""Setup Jinja2 template environment.""" |
|
|
template_dir = Path(__file__).parent.parent / "templates" / "email" |
|
|
template_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
return Environment( |
|
|
loader=FileSystemLoader(str(template_dir)), |
|
|
autoescape=select_autoescape(["html", "xml"]), |
|
|
enable_async=True |
|
|
) |
|
|
|
|
|
async def _get_smtp_client(self) -> aiosmtplib.SMTP: |
|
|
"""Get or create SMTP client with connection pooling.""" |
|
|
async with self._connection_lock: |
|
|
if self._smtp_client is None or not self._smtp_client.is_connected: |
|
|
self._smtp_client = aiosmtplib.SMTP( |
|
|
hostname=self.config.host, |
|
|
port=self.config.port, |
|
|
timeout=self.config.timeout, |
|
|
use_tls=self.config.use_ssl |
|
|
) |
|
|
|
|
|
await self._smtp_client.connect() |
|
|
|
|
|
if self.config.use_tls and not self.config.use_ssl: |
|
|
await self._smtp_client.starttls() |
|
|
|
|
|
if self.config.username and self.config.password: |
|
|
await self._smtp_client.login( |
|
|
self.config.username, |
|
|
self.config.password |
|
|
) |
|
|
|
|
|
return self._smtp_client |
|
|
|
|
|
async def _render_template( |
|
|
self, |
|
|
template_name: str, |
|
|
context: Dict[str, Any] |
|
|
) -> tuple[str, str]: |
|
|
"""Render email template. |
|
|
|
|
|
Args: |
|
|
template_name: Name of the template file |
|
|
context: Template context data |
|
|
|
|
|
Returns: |
|
|
Tuple of (html_body, text_body) |
|
|
""" |
|
|
|
|
|
default_context = { |
|
|
"app_name": "Cidadão.AI", |
|
|
"app_url": settings.app_url, |
|
|
"support_email": settings.support_email, |
|
|
} |
|
|
context = {**default_context, **context} |
|
|
|
|
|
|
|
|
html_template = self._template_env.get_template(f"{template_name}.html") |
|
|
html_body = await html_template.render_async(**context) |
|
|
|
|
|
|
|
|
try: |
|
|
text_template = self._template_env.get_template(f"{template_name}.txt") |
|
|
text_body = await text_template.render_async(**context) |
|
|
except Exception: |
|
|
|
|
|
import re |
|
|
text_body = re.sub(r"<[^>]+>", "", html_body) |
|
|
text_body = re.sub(r"\s+", " ", text_body).strip() |
|
|
|
|
|
return html_body, text_body |
|
|
|
|
|
def _create_message(self, email: EmailMessage) -> MIMEMultipart: |
|
|
"""Create MIME message from EmailMessage.""" |
|
|
msg = MIMEMultipart("alternative") |
|
|
|
|
|
|
|
|
msg["Subject"] = email.subject |
|
|
msg["From"] = f"{self.config.from_name} <{self.config.from_email}>" |
|
|
msg["To"] = ", ".join(email.to) |
|
|
|
|
|
if email.cc: |
|
|
msg["Cc"] = ", ".join(email.cc) |
|
|
|
|
|
if email.reply_to: |
|
|
msg["Reply-To"] = email.reply_to |
|
|
|
|
|
|
|
|
if email.headers: |
|
|
for key, value in email.headers.items(): |
|
|
msg[key] = value |
|
|
|
|
|
|
|
|
if email.body: |
|
|
msg.attach(MIMEText(email.body, "plain")) |
|
|
|
|
|
|
|
|
if email.html_body: |
|
|
msg.attach(MIMEText(email.html_body, "html")) |
|
|
|
|
|
|
|
|
if email.attachments: |
|
|
for attachment in email.attachments: |
|
|
part = MIMEBase("application", "octet-stream") |
|
|
part.set_payload(attachment.content) |
|
|
encoders.encode_base64(part) |
|
|
part.add_header( |
|
|
"Content-Disposition", |
|
|
f"attachment; filename={attachment.filename}" |
|
|
) |
|
|
msg.attach(part) |
|
|
|
|
|
return msg |
|
|
|
|
|
@retry( |
|
|
stop=stop_after_attempt(3), |
|
|
wait=wait_exponential(multiplier=1, min=4, max=10) |
|
|
) |
|
|
async def send_email(self, email: EmailMessage) -> bool: |
|
|
"""Send an email message. |
|
|
|
|
|
Args: |
|
|
email: Email message to send |
|
|
|
|
|
Returns: |
|
|
True if email was sent successfully |
|
|
""" |
|
|
try: |
|
|
|
|
|
if email.template: |
|
|
html_body, text_body = await self._render_template( |
|
|
email.template, |
|
|
email.template_data or {} |
|
|
) |
|
|
email.html_body = html_body |
|
|
email.body = text_body |
|
|
|
|
|
|
|
|
msg = self._create_message(email) |
|
|
|
|
|
|
|
|
smtp = await self._get_smtp_client() |
|
|
|
|
|
|
|
|
recipients = email.to.copy() |
|
|
if email.cc: |
|
|
recipients.extend(email.cc) |
|
|
if email.bcc: |
|
|
recipients.extend(email.bcc) |
|
|
|
|
|
|
|
|
await smtp.send_message(msg) |
|
|
|
|
|
logger.info( |
|
|
"Email sent successfully", |
|
|
subject=email.subject, |
|
|
recipients=len(recipients), |
|
|
has_attachments=bool(email.attachments) |
|
|
) |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"Failed to send email", |
|
|
subject=email.subject, |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
raise |
|
|
|
|
|
async def send_batch( |
|
|
self, |
|
|
emails: List[EmailMessage], |
|
|
max_concurrent: int = 5 |
|
|
) -> List[bool]: |
|
|
"""Send multiple emails concurrently. |
|
|
|
|
|
Args: |
|
|
emails: List of email messages |
|
|
max_concurrent: Maximum concurrent sends |
|
|
|
|
|
Returns: |
|
|
List of success status for each email |
|
|
""" |
|
|
semaphore = asyncio.Semaphore(max_concurrent) |
|
|
|
|
|
async def send_with_semaphore(email: EmailMessage) -> bool: |
|
|
async with semaphore: |
|
|
try: |
|
|
return await self.send_email(email) |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
tasks = [send_with_semaphore(email) for email in emails] |
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
return [ |
|
|
result if isinstance(result, bool) else False |
|
|
for result in results |
|
|
] |
|
|
|
|
|
async def close(self): |
|
|
"""Close SMTP connection.""" |
|
|
if self._smtp_client and self._smtp_client.is_connected: |
|
|
await self._smtp_client.quit() |
|
|
self._smtp_client = None |
|
|
|
|
|
|
|
|
|
|
|
email_service = EmailService() |
|
|
|
|
|
|
|
|
|
|
|
async def send_email( |
|
|
to: Union[str, List[str]], |
|
|
subject: str, |
|
|
body: Optional[str] = None, |
|
|
html_body: Optional[str] = None, |
|
|
template: Optional[str] = None, |
|
|
template_data: Optional[Dict[str, Any]] = None, |
|
|
attachments: Optional[List[EmailAttachment]] = None, |
|
|
**kwargs |
|
|
) -> bool: |
|
|
"""Send an email using the default email service. |
|
|
|
|
|
Args: |
|
|
to: Recipient email address(es) |
|
|
subject: Email subject |
|
|
body: Plain text body |
|
|
html_body: HTML body |
|
|
template: Template name to render |
|
|
template_data: Data for template rendering |
|
|
attachments: List of attachments |
|
|
**kwargs: Additional email fields (cc, bcc, reply_to, headers) |
|
|
|
|
|
Returns: |
|
|
True if email was sent successfully |
|
|
""" |
|
|
if isinstance(to, str): |
|
|
to = [to] |
|
|
|
|
|
email = EmailMessage( |
|
|
to=to, |
|
|
subject=subject, |
|
|
body=body, |
|
|
html_body=html_body, |
|
|
template=template, |
|
|
template_data=template_data, |
|
|
attachments=attachments, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
return await email_service.send_email(email) |
|
|
|
|
|
|
|
|
async def send_template_email( |
|
|
to: Union[str, List[str]], |
|
|
subject: str, |
|
|
template: str, |
|
|
template_data: Optional[Dict[str, Any]] = None, |
|
|
**kwargs |
|
|
) -> bool: |
|
|
"""Send an email using a template. |
|
|
|
|
|
Args: |
|
|
to: Recipient email address(es) |
|
|
subject: Email subject |
|
|
template: Template name |
|
|
template_data: Template context data |
|
|
**kwargs: Additional email fields |
|
|
|
|
|
Returns: |
|
|
True if email was sent successfully |
|
|
""" |
|
|
return await send_email( |
|
|
to=to, |
|
|
subject=subject, |
|
|
template=template, |
|
|
template_data=template_data, |
|
|
**kwargs |
|
|
) |