|
|
""" |
|
|
Module: infrastructure.queue.retry_policy |
|
|
Description: Retry policies and mechanisms for batch processing |
|
|
Author: Anderson H. Silva |
|
|
Date: 2025-01-25 |
|
|
License: Proprietary - All rights reserved |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, Optional, Callable, List |
|
|
from datetime import datetime, timedelta |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
import random |
|
|
import asyncio |
|
|
|
|
|
from src.core import get_logger |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class RetryStrategy(str, Enum): |
|
|
"""Retry strategy types.""" |
|
|
FIXED_DELAY = "fixed_delay" |
|
|
EXPONENTIAL_BACKOFF = "exponential_backoff" |
|
|
LINEAR_BACKOFF = "linear_backoff" |
|
|
RANDOM_JITTER = "random_jitter" |
|
|
FIBONACCI = "fibonacci" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class RetryPolicy: |
|
|
"""Retry policy configuration.""" |
|
|
strategy: RetryStrategy = RetryStrategy.EXPONENTIAL_BACKOFF |
|
|
max_attempts: int = 3 |
|
|
initial_delay: float = 1.0 |
|
|
max_delay: float = 300.0 |
|
|
multiplier: float = 2.0 |
|
|
jitter: bool = True |
|
|
retry_on: Optional[List[type]] = None |
|
|
dont_retry_on: Optional[List[type]] = None |
|
|
on_retry: Optional[Callable] = None |
|
|
on_failure: Optional[Callable] = None |
|
|
|
|
|
|
|
|
class RetryHandler: |
|
|
"""Handles retry logic for failed operations.""" |
|
|
|
|
|
def __init__(self, policy: RetryPolicy): |
|
|
"""Initialize retry handler with policy.""" |
|
|
self.policy = policy |
|
|
self._fibonacci_cache = {0: 0, 1: 1} |
|
|
|
|
|
def should_retry( |
|
|
self, |
|
|
exception: Exception, |
|
|
attempt: int |
|
|
) -> bool: |
|
|
""" |
|
|
Determine if operation should be retried. |
|
|
|
|
|
Args: |
|
|
exception: The exception that occurred |
|
|
attempt: Current attempt number (1-based) |
|
|
|
|
|
Returns: |
|
|
True if should retry |
|
|
""" |
|
|
|
|
|
if attempt >= self.policy.max_attempts: |
|
|
logger.warning( |
|
|
"max_retry_attempts_exceeded", |
|
|
attempt=attempt, |
|
|
max_attempts=self.policy.max_attempts |
|
|
) |
|
|
return False |
|
|
|
|
|
|
|
|
exc_type = type(exception) |
|
|
|
|
|
|
|
|
if self.policy.dont_retry_on: |
|
|
if any(isinstance(exception, t) for t in self.policy.dont_retry_on): |
|
|
logger.info( |
|
|
"retry_skipped_exception_blacklist", |
|
|
exception_type=exc_type.__name__ |
|
|
) |
|
|
return False |
|
|
|
|
|
|
|
|
if self.policy.retry_on: |
|
|
should_retry = any(isinstance(exception, t) for t in self.policy.retry_on) |
|
|
if not should_retry: |
|
|
logger.info( |
|
|
"retry_skipped_exception_not_whitelisted", |
|
|
exception_type=exc_type.__name__ |
|
|
) |
|
|
return should_retry |
|
|
|
|
|
|
|
|
return True |
|
|
|
|
|
def calculate_delay(self, attempt: int) -> float: |
|
|
""" |
|
|
Calculate delay before next retry. |
|
|
|
|
|
Args: |
|
|
attempt: Current attempt number (1-based) |
|
|
|
|
|
Returns: |
|
|
Delay in seconds |
|
|
""" |
|
|
base_delay = self._calculate_base_delay(attempt) |
|
|
|
|
|
|
|
|
delay = min(base_delay, self.policy.max_delay) |
|
|
|
|
|
|
|
|
if self.policy.jitter: |
|
|
|
|
|
jitter_range = delay * 0.25 |
|
|
delay += random.uniform(-jitter_range, jitter_range) |
|
|
|
|
|
|
|
|
delay = max(delay, 0.1) |
|
|
|
|
|
logger.debug( |
|
|
"retry_delay_calculated", |
|
|
attempt=attempt, |
|
|
delay=delay, |
|
|
strategy=self.policy.strategy.value |
|
|
) |
|
|
|
|
|
return delay |
|
|
|
|
|
def _calculate_base_delay(self, attempt: int) -> float: |
|
|
"""Calculate base delay based on strategy.""" |
|
|
if self.policy.strategy == RetryStrategy.FIXED_DELAY: |
|
|
return self.policy.initial_delay |
|
|
|
|
|
elif self.policy.strategy == RetryStrategy.EXPONENTIAL_BACKOFF: |
|
|
return self.policy.initial_delay * (self.policy.multiplier ** (attempt - 1)) |
|
|
|
|
|
elif self.policy.strategy == RetryStrategy.LINEAR_BACKOFF: |
|
|
return self.policy.initial_delay * attempt |
|
|
|
|
|
elif self.policy.strategy == RetryStrategy.RANDOM_JITTER: |
|
|
|
|
|
return random.uniform( |
|
|
self.policy.initial_delay, |
|
|
min(self.policy.initial_delay * 10, self.policy.max_delay) |
|
|
) |
|
|
|
|
|
elif self.policy.strategy == RetryStrategy.FIBONACCI: |
|
|
return self.policy.initial_delay * self._fibonacci(attempt) |
|
|
|
|
|
else: |
|
|
return self.policy.initial_delay |
|
|
|
|
|
def _fibonacci(self, n: int) -> int: |
|
|
"""Calculate fibonacci number with memoization.""" |
|
|
if n in self._fibonacci_cache: |
|
|
return self._fibonacci_cache[n] |
|
|
|
|
|
|
|
|
self._fibonacci_cache[n] = self._fibonacci(n - 1) + self._fibonacci(n - 2) |
|
|
return self._fibonacci_cache[n] |
|
|
|
|
|
async def execute_with_retry( |
|
|
self, |
|
|
func: Callable, |
|
|
*args, |
|
|
**kwargs |
|
|
) -> Any: |
|
|
""" |
|
|
Execute function with retry logic. |
|
|
|
|
|
Args: |
|
|
func: Function to execute |
|
|
*args: Function arguments |
|
|
**kwargs: Function keyword arguments |
|
|
|
|
|
Returns: |
|
|
Function result |
|
|
|
|
|
Raises: |
|
|
Last exception if all retries fail |
|
|
""" |
|
|
last_exception = None |
|
|
|
|
|
for attempt in range(1, self.policy.max_attempts + 1): |
|
|
try: |
|
|
|
|
|
if asyncio.iscoroutinefunction(func): |
|
|
result = await func(*args, **kwargs) |
|
|
else: |
|
|
result = func(*args, **kwargs) |
|
|
|
|
|
|
|
|
if attempt > 1: |
|
|
logger.info( |
|
|
"retry_succeeded", |
|
|
attempt=attempt, |
|
|
function=func.__name__ |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
last_exception = e |
|
|
|
|
|
|
|
|
if not self.should_retry(e, attempt): |
|
|
if self.policy.on_failure: |
|
|
await self._call_callback( |
|
|
self.policy.on_failure, |
|
|
e, |
|
|
attempt |
|
|
) |
|
|
raise |
|
|
|
|
|
|
|
|
delay = self.calculate_delay(attempt) |
|
|
|
|
|
logger.warning( |
|
|
"operation_failed_retrying", |
|
|
attempt=attempt, |
|
|
max_attempts=self.policy.max_attempts, |
|
|
delay=delay, |
|
|
error=str(e), |
|
|
function=func.__name__ |
|
|
) |
|
|
|
|
|
|
|
|
if self.policy.on_retry: |
|
|
await self._call_callback( |
|
|
self.policy.on_retry, |
|
|
e, |
|
|
attempt, |
|
|
delay |
|
|
) |
|
|
|
|
|
|
|
|
await asyncio.sleep(delay) |
|
|
|
|
|
|
|
|
if self.policy.on_failure: |
|
|
await self._call_callback( |
|
|
self.policy.on_failure, |
|
|
last_exception, |
|
|
self.policy.max_attempts |
|
|
) |
|
|
|
|
|
raise last_exception |
|
|
|
|
|
async def _call_callback( |
|
|
self, |
|
|
callback: Callable, |
|
|
exception: Exception, |
|
|
attempt: int, |
|
|
delay: Optional[float] = None |
|
|
): |
|
|
"""Call callback function safely.""" |
|
|
try: |
|
|
if asyncio.iscoroutinefunction(callback): |
|
|
await callback(exception, attempt, delay) |
|
|
else: |
|
|
callback(exception, attempt, delay) |
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"retry_callback_failed", |
|
|
callback=callback.__name__, |
|
|
error=str(e) |
|
|
) |
|
|
|
|
|
|
|
|
class CircuitBreaker: |
|
|
""" |
|
|
Circuit breaker pattern for preventing cascading failures. |
|
|
|
|
|
States: |
|
|
- CLOSED: Normal operation |
|
|
- OPEN: Failing, reject all requests |
|
|
- HALF_OPEN: Testing if service recovered |
|
|
""" |
|
|
|
|
|
class State(str, Enum): |
|
|
CLOSED = "closed" |
|
|
OPEN = "open" |
|
|
HALF_OPEN = "half_open" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
failure_threshold: int = 5, |
|
|
recovery_timeout: float = 60.0, |
|
|
expected_exception: Optional[type] = None |
|
|
): |
|
|
""" |
|
|
Initialize circuit breaker. |
|
|
|
|
|
Args: |
|
|
failure_threshold: Number of failures before opening |
|
|
recovery_timeout: Seconds before attempting recovery |
|
|
expected_exception: Exception type that triggers the breaker |
|
|
""" |
|
|
self.failure_threshold = failure_threshold |
|
|
self.recovery_timeout = recovery_timeout |
|
|
self.expected_exception = expected_exception |
|
|
|
|
|
self.state = self.State.CLOSED |
|
|
self.failure_count = 0 |
|
|
self.last_failure_time: Optional[datetime] = None |
|
|
self.success_count = 0 |
|
|
|
|
|
def call(self, func: Callable, *args, **kwargs) -> Any: |
|
|
""" |
|
|
Call function through circuit breaker. |
|
|
|
|
|
Args: |
|
|
func: Function to call |
|
|
*args: Function arguments |
|
|
**kwargs: Function keyword arguments |
|
|
|
|
|
Returns: |
|
|
Function result |
|
|
|
|
|
Raises: |
|
|
Exception: If circuit is open or function fails |
|
|
""" |
|
|
if self.state == self.State.OPEN: |
|
|
if self._should_attempt_reset(): |
|
|
self.state = self.State.HALF_OPEN |
|
|
logger.info("circuit_breaker_half_open") |
|
|
else: |
|
|
raise Exception("Circuit breaker is OPEN") |
|
|
|
|
|
try: |
|
|
result = func(*args, **kwargs) |
|
|
self._on_success() |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
self._on_failure(e) |
|
|
raise |
|
|
|
|
|
async def call_async( |
|
|
self, |
|
|
func: Callable, |
|
|
*args, |
|
|
**kwargs |
|
|
) -> Any: |
|
|
"""Async version of call.""" |
|
|
if self.state == self.State.OPEN: |
|
|
if self._should_attempt_reset(): |
|
|
self.state = self.State.HALF_OPEN |
|
|
logger.info("circuit_breaker_half_open") |
|
|
else: |
|
|
raise Exception("Circuit breaker is OPEN") |
|
|
|
|
|
try: |
|
|
result = await func(*args, **kwargs) |
|
|
self._on_success() |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
self._on_failure(e) |
|
|
raise |
|
|
|
|
|
def _should_attempt_reset(self) -> bool: |
|
|
"""Check if should attempt to reset circuit.""" |
|
|
return ( |
|
|
self.last_failure_time and |
|
|
datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout) |
|
|
) |
|
|
|
|
|
def _on_success(self): |
|
|
"""Handle successful call.""" |
|
|
self.failure_count = 0 |
|
|
|
|
|
if self.state == self.State.HALF_OPEN: |
|
|
self.success_count += 1 |
|
|
if self.success_count >= 3: |
|
|
self.state = self.State.CLOSED |
|
|
self.success_count = 0 |
|
|
logger.info("circuit_breaker_closed") |
|
|
|
|
|
def _on_failure(self, exception: Exception): |
|
|
"""Handle failed call.""" |
|
|
|
|
|
if self.expected_exception and not isinstance(exception, self.expected_exception): |
|
|
return |
|
|
|
|
|
self.failure_count += 1 |
|
|
self.last_failure_time = datetime.now() |
|
|
|
|
|
if self.state == self.State.HALF_OPEN: |
|
|
self.state = self.State.OPEN |
|
|
logger.warning("circuit_breaker_opened_from_half_open") |
|
|
|
|
|
elif self.failure_count >= self.failure_threshold: |
|
|
self.state = self.State.OPEN |
|
|
logger.warning( |
|
|
"circuit_breaker_opened", |
|
|
failures=self.failure_count, |
|
|
threshold=self.failure_threshold |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_RETRY_POLICY = RetryPolicy( |
|
|
strategy=RetryStrategy.EXPONENTIAL_BACKOFF, |
|
|
max_attempts=3, |
|
|
initial_delay=1.0, |
|
|
max_delay=60.0, |
|
|
multiplier=2.0, |
|
|
jitter=True |
|
|
) |
|
|
|
|
|
AGGRESSIVE_RETRY_POLICY = RetryPolicy( |
|
|
strategy=RetryStrategy.EXPONENTIAL_BACKOFF, |
|
|
max_attempts=5, |
|
|
initial_delay=0.5, |
|
|
max_delay=120.0, |
|
|
multiplier=1.5, |
|
|
jitter=True |
|
|
) |
|
|
|
|
|
GENTLE_RETRY_POLICY = RetryPolicy( |
|
|
strategy=RetryStrategy.LINEAR_BACKOFF, |
|
|
max_attempts=2, |
|
|
initial_delay=5.0, |
|
|
max_delay=30.0, |
|
|
jitter=False |
|
|
) |