anderson-ufrj
refactor(performance): replace all json imports with json_utils
9730fbc
"""
Cursor-based pagination models for efficient data retrieval.
This module implements cursor pagination which is more efficient
than offset pagination for large datasets and real-time data.
"""
from typing import Generic, List, Optional, TypeVar, Dict, Any
from datetime import datetime
from pydantic import BaseModel, Field
import base64
from src.core import json_utils
from src.core import get_logger
logger = get_logger(__name__)
T = TypeVar('T')
class CursorInfo(BaseModel):
"""Information encoded in a cursor."""
timestamp: datetime
id: str
direction: str = "next"
def encode(self) -> str:
"""Encode cursor info to base64 string."""
data = {
"t": self.timestamp.isoformat(),
"i": self.id,
"d": self.direction
}
json_str = json_utils.dumps(data, separators=(',', ':'))
return base64.urlsafe_b64encode(json_str.encode()).decode()
@classmethod
def decode(cls, cursor: str) -> "CursorInfo":
"""Decode cursor from base64 string."""
try:
json_str = base64.urlsafe_b64decode(cursor).decode()
data = json_utils.loads(json_str)
return cls(
timestamp=datetime.fromisoformat(data["t"]),
id=data["i"],
direction=data.get("d", "next")
)
except Exception as e:
logger.error(f"Invalid cursor: {e}")
raise ValueError("Invalid cursor format")
class CursorPaginationRequest(BaseModel):
"""Request parameters for cursor pagination."""
cursor: Optional[str] = Field(None, description="Cursor for next/previous page")
limit: int = Field(20, ge=1, le=100, description="Number of items per page")
direction: str = Field("next", pattern="^(next|prev)$", description="Pagination direction")
class CursorPaginationResponse(BaseModel, Generic[T]):
"""Response with cursor pagination metadata."""
items: List[T]
next_cursor: Optional[str] = None
prev_cursor: Optional[str] = None
has_more: bool = False
total_items: Optional[int] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class PaginationHelper:
"""Helper class for cursor-based pagination."""
@staticmethod
def create_cursor(item: Dict[str, Any], direction: str = "next") -> str:
"""Create cursor from an item."""
cursor_info = CursorInfo(
timestamp=item.get("timestamp", datetime.utcnow()),
id=str(item.get("id", "")),
direction=direction
)
return cursor_info.encode()
@staticmethod
def parse_cursor(cursor: Optional[str]) -> Optional[CursorInfo]:
"""Parse cursor string to CursorInfo."""
if not cursor:
return None
return CursorInfo.decode(cursor)
@staticmethod
def paginate_list(
items: List[Dict[str, Any]],
request: CursorPaginationRequest,
key_field: str = "timestamp",
id_field: str = "id"
) -> CursorPaginationResponse[Dict[str, Any]]:
"""
Paginate a list of items using cursor pagination.
Args:
items: List of items to paginate (should be sorted)
request: Pagination request parameters
key_field: Field to use for cursor comparison
id_field: Unique identifier field
Returns:
Paginated response with cursors
"""
# Parse cursor if provided
cursor_info = PaginationHelper.parse_cursor(request.cursor)
# Filter items based on cursor
if cursor_info:
if request.direction == "next":
# Get items after cursor
filtered_items = [
item for item in items
if item.get(key_field) > cursor_info.timestamp
or (item.get(key_field) == cursor_info.timestamp
and str(item.get(id_field)) > cursor_info.id)
]
else: # prev
# Get items before cursor (reverse order)
filtered_items = [
item for item in reversed(items)
if item.get(key_field) < cursor_info.timestamp
or (item.get(key_field) == cursor_info.timestamp
and str(item.get(id_field)) < cursor_info.id)
]
filtered_items = list(reversed(filtered_items))
else:
filtered_items = items
# Apply limit
page_items = filtered_items[:request.limit]
has_more = len(filtered_items) > request.limit
# Generate cursors
next_cursor = None
prev_cursor = None
if page_items:
# Next cursor from last item
if has_more or cursor_info:
next_cursor = PaginationHelper.create_cursor(
page_items[-1], "next"
)
# Previous cursor from first item
if cursor_info or (not cursor_info and request.direction == "prev"):
prev_cursor = PaginationHelper.create_cursor(
page_items[0], "prev"
)
return CursorPaginationResponse(
items=page_items,
next_cursor=next_cursor,
prev_cursor=prev_cursor,
has_more=has_more,
total_items=len(items),
metadata={
"page_size": len(page_items),
"direction": request.direction
}
)
class ChatMessagePagination:
"""Specialized pagination for chat messages."""
@staticmethod
def paginate_messages(
messages: List[Dict[str, Any]],
cursor: Optional[str] = None,
limit: int = 50,
direction: str = "prev" # Default to loading older messages
) -> CursorPaginationResponse[Dict[str, Any]]:
"""
Paginate chat messages with cursor.
Chat typically loads older messages, so default direction is "prev".
"""
request = CursorPaginationRequest(
cursor=cursor,
limit=limit,
direction=direction
)
# Sort messages by timestamp
sorted_messages = sorted(
messages,
key=lambda m: m.get("timestamp", datetime.min)
)
response = PaginationHelper.paginate_list(
sorted_messages,
request,
key_field="timestamp",
id_field="id"
)
# Add chat-specific metadata
response.metadata.update({
"oldest_message": sorted_messages[0].get("timestamp") if sorted_messages else None,
"newest_message": sorted_messages[-1].get("timestamp") if sorted_messages else None,
"unread_count": sum(1 for m in messages if not m.get("read", True))
})
return response