File size: 4,629 Bytes
25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f 25f22bf e3d8d4f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
from supabase import create_client, Client
import os
import logging
from typing import Optional, Dict, Any
def init_supabase(url: str, key: str) -> Client:
"""
Initialize Supabase client.
Args:
url (str): Supabase project URL
key (str): Supabase API key
Returns:
Client: Supabase client instance
Raises:
ValueError: If URL or key is missing
Exception: If client initialization fails
"""
if not url or not key:
raise ValueError("Supabase URL and key must be provided")
try:
client = create_client(url, key)
logging.info("Supabase client initialized successfully")
return client
except Exception as e:
logging.error(f"Failed to initialize Supabase client: {str(e)}")
raise e
def get_user_by_email(supabase: Client, email: str) -> Optional[Dict[Any, Any]]:
"""
Get user by email from Supabase Auth.
Note: This approach is not recommended for checking user existence.
Instead, use profiles table lookup or Supabase's built-in user management functions.
Args:
supabase (Client): Supabase client instance
email (str): User email
Returns:
dict: User data or None if not found
"""
try:
# More appropriate way to check user existence would be to query the profiles table
# since sign_in_with_password requires a password
response = supabase.table("profiles").select("*").eq("email", email).execute()
if response.data:
# Return the first matched user
return response.data[0]
return None
except Exception as e:
logging.error(f"Error getting user by email {email}: {str(e)}")
return None
def create_user(supabase: Client, email: str, password: str) -> Dict[Any, Any]:
"""
Create a new user in Supabase Auth.
Args:
supabase (Client): Supabase client instance
email (str): User email
password (str): User password
Returns:
dict: User creation response
Raises:
Exception: If user creation fails
"""
try:
response = supabase.auth.sign_up({
"email": email,
"password": password
})
logging.info(f"Successfully created user with email: {email}")
return response
except Exception as e:
logging.error(f"Failed to create user with email {email}: {str(e)}")
raise e
def authenticate_user(supabase: Client, email: str, password: str) -> Dict[Any, Any]:
"""
Authenticate user with email and password.
Args:
supabase (Client): Supabase client instance
email (str): User email
password (str): User password
Returns:
dict: Authentication response with user data
Raises:
Exception: If authentication fails
"""
try:
response = supabase.auth.sign_in_with_password({
"email": email,
"password": password
})
logging.info(f"Successfully authenticated user: {email}")
return response
except Exception as e:
logging.error(f"Authentication error for user {email}: {str(e)}")
raise e
def check_database_connection(supabase: Client) -> bool:
"""
Check if the database connection is working by performing a simple query.
Args:
supabase (Client): Supabase client instance
Returns:
bool: True if connection is working, False otherwise
"""
try:
# Perform a simple query to check the connection
# This tests both the connection and basic functionality
response = supabase.from_("profiles").select("id").limit(1).execute()
logging.info("Database connection check: SUCCESS")
return True
except Exception as e:
logging.error(f"Database connection check: FAILED - {str(e)}")
return False
def get_user_session(supabase: Client) -> Optional[Dict[Any, Any]]:
"""
Get current user session if available.
Args:
supabase (Client): Supabase client instance
Returns:
dict: Session data or None if no session
"""
try:
response = supabase.auth.get_user()
if response.user:
return {
'user_id': response.user.id,
'email': response.user.email,
'created_at': response.user.created_at,
'email_confirmed_at': response.user.email_confirmed_at,
}
return None
except Exception as e:
logging.error(f"Error getting user session: {str(e)}")
return None |