""" Authentication routes for Cidadão.AI API """ from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials from pydantic import BaseModel, EmailStr from ..auth import auth_manager, get_current_user, require_admin, security, User router = APIRouter(prefix="/auth", tags=["authentication"]) # Request/Response Models class LoginRequest(BaseModel): email: EmailStr password: str class LoginResponse(BaseModel): access_token: str refresh_token: str token_type: str = "bearer" expires_in: int user: dict class RefreshRequest(BaseModel): refresh_token: str class RefreshResponse(BaseModel): access_token: str token_type: str = "bearer" expires_in: int class RegisterRequest(BaseModel): email: EmailStr password: str name: str role: Optional[str] = "analyst" class ChangePasswordRequest(BaseModel): old_password: str new_password: str class UserResponse(BaseModel): id: str email: str name: str role: str is_active: bool created_at: datetime last_login: Optional[datetime] = None @router.post("/login", response_model=LoginResponse) async def login(request: LoginRequest): """ Authenticate user and return JWT tokens """ user = auth_manager.authenticate_user(request.email, request.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"} ) access_token = auth_manager.create_access_token(user) refresh_token = auth_manager.create_refresh_token(user) return LoginResponse( access_token=access_token, refresh_token=refresh_token, expires_in=auth_manager.access_token_expire_minutes * 60, user={ "id": user.id, "email": user.email, "name": user.name, "role": user.role, "is_active": user.is_active } ) @router.post("/refresh", response_model=RefreshResponse) async def refresh_token(request: RefreshRequest): """ Refresh access token using refresh token """ try: new_access_token = auth_manager.refresh_access_token(request.refresh_token) return RefreshResponse( access_token=new_access_token, expires_in=auth_manager.access_token_expire_minutes * 60 ) except Exception as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) @router.post("/register", response_model=UserResponse) async def register( request: RegisterRequest, current_user: User = Depends(get_current_user) ): """ Register new user (admin only) """ # Only admin can register new users require_admin(current_user) try: user = auth_manager.register_user( email=request.email, password=request.password, name=request.name, role=request.role ) return UserResponse( id=user.id, email=user.email, name=user.name, role=user.role, is_active=user.is_active, created_at=user.created_at, last_login=user.last_login ) except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to register user: {str(e)}" ) @router.get("/me", response_model=UserResponse) async def get_current_user_info(current_user: User = Depends(get_current_user)): """ Get current user information """ return UserResponse( id=current_user.id, email=current_user.email, name=current_user.name, role=current_user.role, is_active=current_user.is_active, created_at=current_user.created_at, last_login=current_user.last_login ) @router.post("/change-password") async def change_password( request: ChangePasswordRequest, current_user: User = Depends(get_current_user) ): """ Change current user password """ try: success = auth_manager.change_password( user_id=current_user.id, old_password=request.old_password, new_password=request.new_password ) if success: return {"message": "Password changed successfully"} else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Failed to change password" ) except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to change password: {str(e)}" ) @router.post("/logout") async def logout(current_user: User = Depends(get_current_user)): """ Logout user (client should discard tokens) """ # In a production system, you might want to blacklist the token return {"message": "Logged out successfully"} @router.get("/users", response_model=list[UserResponse]) async def list_users(current_user: User = Depends(get_current_user)): """ List all users (admin only) """ require_admin(current_user) users = auth_manager.get_all_users() return [ UserResponse( id=user.id, email=user.email, name=user.name, role=user.role, is_active=user.is_active, created_at=user.created_at, last_login=user.last_login ) for user in users ] @router.post("/users/{user_id}/deactivate") async def deactivate_user( user_id: str, current_user: User = Depends(get_current_user) ): """ Deactivate user account (admin only) """ require_admin(current_user) # Prevent admin from deactivating themselves if user_id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot deactivate your own account" ) try: success = auth_manager.deactivate_user(user_id) if success: return {"message": "User deactivated successfully"} except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to deactivate user: {str(e)}" ) @router.post("/verify") async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): """ Verify if token is valid """ try: user = auth_manager.get_current_user(credentials.credentials) return { "valid": True, "user": { "id": user.id, "email": user.email, "name": user.name, "role": user.role } } except HTTPException: return {"valid": False}