anderson-ufrj commited on
Commit
67a1f44
·
1 Parent(s): 889e789

feat(database): implement Supabase REST API service

Browse files

Create HTTP/HTTPS-based Supabase service compatible with restricted
network environments. This service uses Supabase's REST API instead of
direct PostgreSQL connections, enabling deployment on platforms like
HuggingFace Spaces that block database ports. Implements full CRUD
operations with lazy configuration loading.

Files changed (1) hide show
  1. src/services/supabase_service_rest.py +407 -0
src/services/supabase_service_rest.py ADDED
@@ -0,0 +1,407 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Supabase integration service using REST API (works on HuggingFace Spaces).
3
+
4
+ This service uses Supabase's REST API via HTTP/HTTPS instead of direct
5
+ PostgreSQL connections, making it compatible with restricted environments
6
+ like HuggingFace Spaces that block direct database connections.
7
+ """
8
+
9
+ import os
10
+ from typing import Optional, List, Dict, Any
11
+ from datetime import datetime
12
+
13
+ from supabase import create_client, Client
14
+ from pydantic import BaseModel, Field
15
+
16
+ from src.core import get_logger, settings
17
+ from src.core.exceptions import CidadaoAIError
18
+
19
+ logger = get_logger(__name__)
20
+
21
+
22
+ class SupabaseConfig(BaseModel):
23
+ """Supabase connection configuration."""
24
+
25
+ url: str = Field(..., description="Supabase project URL")
26
+ key: str = Field(..., description="Supabase service role key (for backend)")
27
+ anon_key: Optional[str] = Field(None, description="Supabase anon key (for frontend)")
28
+
29
+ @classmethod
30
+ def from_env(cls) -> "SupabaseConfig":
31
+ """Load configuration from environment variables."""
32
+ supabase_url = os.getenv("SUPABASE_URL")
33
+ service_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
34
+
35
+ if not supabase_url or not service_key:
36
+ raise ValueError(
37
+ "SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY environment variables required. "
38
+ "Get them from: Supabase Dashboard > Settings > API"
39
+ )
40
+
41
+ return cls(
42
+ url=supabase_url,
43
+ key=service_key,
44
+ anon_key=os.getenv("SUPABASE_ANON_KEY"),
45
+ )
46
+
47
+
48
+ class SupabaseServiceRest:
49
+ """
50
+ Service for interacting with Supabase via REST API.
51
+
52
+ Uses HTTP/HTTPS which works in restricted environments like HuggingFace Spaces
53
+ where direct PostgreSQL connections are blocked.
54
+ """
55
+
56
+ def __init__(self, config: Optional[SupabaseConfig] = None):
57
+ """
58
+ Initialize Supabase service.
59
+
60
+ Args:
61
+ config: Supabase configuration (loads from env if None)
62
+ """
63
+ self._config = config
64
+ self._client: Optional[Client] = None
65
+ self._initialized = False
66
+
67
+ @property
68
+ def config(self) -> SupabaseConfig:
69
+ """Lazy load configuration."""
70
+ if self._config is None:
71
+ self._config = SupabaseConfig.from_env()
72
+ return self._config
73
+
74
+ async def initialize(self) -> None:
75
+ """Initialize Supabase client."""
76
+ if self._initialized:
77
+ logger.warning("Supabase REST service already initialized")
78
+ return
79
+
80
+ try:
81
+ logger.info("Initializing Supabase REST client")
82
+
83
+ self._client = create_client(
84
+ supabase_url=self.config.url,
85
+ supabase_key=self.config.key,
86
+ )
87
+
88
+ # Test connection with a simple query
89
+ result = self._client.table("investigations").select("id").limit(1).execute()
90
+
91
+ self._initialized = True
92
+ logger.info("Supabase REST service initialized successfully")
93
+
94
+ except Exception as e:
95
+ logger.error(f"Failed to initialize Supabase REST service: {e}", exc_info=True)
96
+ raise CidadaoAIError(f"Supabase REST initialization failed: {e}")
97
+
98
+ def _ensure_client(self) -> Client:
99
+ """Ensure client is initialized."""
100
+ if not self._initialized or not self._client:
101
+ # Synchronous initialization for backwards compatibility
102
+ import asyncio
103
+ try:
104
+ loop = asyncio.get_event_loop()
105
+ except RuntimeError:
106
+ loop = asyncio.new_event_loop()
107
+ asyncio.set_event_loop(loop)
108
+
109
+ if not self._initialized:
110
+ loop.run_until_complete(self.initialize())
111
+
112
+ return self._client
113
+
114
+ async def create_investigation(
115
+ self,
116
+ user_id: str,
117
+ query: str,
118
+ data_source: str,
119
+ filters: Optional[Dict[str, Any]] = None,
120
+ anomaly_types: Optional[List[str]] = None,
121
+ session_id: Optional[str] = None,
122
+ ) -> Dict[str, Any]:
123
+ """
124
+ Create a new investigation in Supabase.
125
+
126
+ Args:
127
+ user_id: User ID
128
+ query: Investigation query
129
+ data_source: Data source to investigate
130
+ filters: Query filters
131
+ anomaly_types: Types of anomalies to detect
132
+ session_id: Optional session ID
133
+
134
+ Returns:
135
+ Created investigation as dict
136
+ """
137
+ client = self._ensure_client()
138
+
139
+ data = {
140
+ "user_id": user_id,
141
+ "session_id": session_id,
142
+ "query": query,
143
+ "data_source": data_source,
144
+ "status": "pending",
145
+ "filters": filters or {},
146
+ "anomaly_types": anomaly_types or [],
147
+ "progress": 0.0,
148
+ "created_at": datetime.utcnow().isoformat(),
149
+ "updated_at": datetime.utcnow().isoformat(),
150
+ }
151
+
152
+ result = client.table("investigations").insert(data).execute()
153
+
154
+ if not result.data or len(result.data) == 0:
155
+ raise CidadaoAIError("Failed to create investigation")
156
+
157
+ investigation = result.data[0]
158
+ logger.info(f"Created investigation {investigation['id']} via REST API")
159
+ return investigation
160
+
161
+ async def get_investigation(self, investigation_id: str) -> Optional[Dict[str, Any]]:
162
+ """
163
+ Get investigation by ID.
164
+
165
+ Args:
166
+ investigation_id: Investigation UUID
167
+
168
+ Returns:
169
+ Investigation dict or None
170
+ """
171
+ client = self._ensure_client()
172
+
173
+ result = client.table("investigations").select("*").eq("id", investigation_id).execute()
174
+
175
+ if not result.data or len(result.data) == 0:
176
+ return None
177
+
178
+ return result.data[0]
179
+
180
+ async def update_investigation(
181
+ self,
182
+ investigation_id: str,
183
+ **updates
184
+ ) -> Dict[str, Any]:
185
+ """
186
+ Update investigation fields.
187
+
188
+ Args:
189
+ investigation_id: Investigation UUID
190
+ **updates: Fields to update
191
+
192
+ Returns:
193
+ Updated investigation dict
194
+ """
195
+ client = self._ensure_client()
196
+
197
+ # Always update updated_at
198
+ updates["updated_at"] = datetime.utcnow().isoformat()
199
+
200
+ result = (
201
+ client.table("investigations")
202
+ .update(updates)
203
+ .eq("id", investigation_id)
204
+ .execute()
205
+ )
206
+
207
+ if not result.data or len(result.data) == 0:
208
+ raise ValueError(f"Investigation {investigation_id} not found")
209
+
210
+ logger.debug(f"Updated investigation {investigation_id} via REST API")
211
+ return result.data[0]
212
+
213
+ async def update_progress(
214
+ self,
215
+ investigation_id: str,
216
+ progress: float,
217
+ current_phase: str,
218
+ records_processed: Optional[int] = None,
219
+ anomalies_found: Optional[int] = None,
220
+ ) -> Dict[str, Any]:
221
+ """
222
+ Update investigation progress.
223
+
224
+ Args:
225
+ investigation_id: Investigation UUID
226
+ progress: Progress percentage (0.0 to 1.0)
227
+ current_phase: Current processing phase
228
+ records_processed: Number of records processed
229
+ anomalies_found: Number of anomalies detected
230
+
231
+ Returns:
232
+ Updated investigation dict
233
+ """
234
+ updates = {
235
+ "progress": progress,
236
+ "current_phase": current_phase,
237
+ }
238
+
239
+ if records_processed is not None:
240
+ updates["total_records_analyzed"] = records_processed
241
+
242
+ if anomalies_found is not None:
243
+ updates["anomalies_found"] = anomalies_found
244
+
245
+ return await self.update_investigation(investigation_id, **updates)
246
+
247
+ async def complete_investigation(
248
+ self,
249
+ investigation_id: str,
250
+ results: List[Dict[str, Any]],
251
+ summary: str,
252
+ confidence_score: float,
253
+ total_records: int,
254
+ anomalies_found: int,
255
+ ) -> Dict[str, Any]:
256
+ """
257
+ Mark investigation as completed with results.
258
+
259
+ Args:
260
+ investigation_id: Investigation UUID
261
+ results: List of anomaly results
262
+ summary: Investigation summary
263
+ confidence_score: Overall confidence
264
+ total_records: Total records analyzed
265
+ anomalies_found: Total anomalies found
266
+
267
+ Returns:
268
+ Updated investigation dict
269
+ """
270
+ return await self.update_investigation(
271
+ investigation_id,
272
+ status="completed",
273
+ progress=1.0,
274
+ current_phase="completed",
275
+ results=results,
276
+ summary=summary,
277
+ confidence_score=confidence_score,
278
+ total_records_analyzed=total_records,
279
+ anomalies_found=anomalies_found,
280
+ completed_at=datetime.utcnow().isoformat(),
281
+ )
282
+
283
+ async def fail_investigation(
284
+ self,
285
+ investigation_id: str,
286
+ error_message: str,
287
+ ) -> Dict[str, Any]:
288
+ """
289
+ Mark investigation as failed.
290
+
291
+ Args:
292
+ investigation_id: Investigation UUID
293
+ error_message: Error description
294
+
295
+ Returns:
296
+ Updated investigation dict
297
+ """
298
+ return await self.update_investigation(
299
+ investigation_id,
300
+ status="failed",
301
+ current_phase="failed",
302
+ error_message=error_message,
303
+ completed_at=datetime.utcnow().isoformat(),
304
+ )
305
+
306
+ async def list_user_investigations(
307
+ self,
308
+ user_id: str,
309
+ limit: int = 20,
310
+ offset: int = 0,
311
+ status: Optional[str] = None,
312
+ ) -> List[Dict[str, Any]]:
313
+ """
314
+ List investigations for a user.
315
+
316
+ Args:
317
+ user_id: User ID
318
+ limit: Maximum results
319
+ offset: Pagination offset
320
+ status: Filter by status
321
+
322
+ Returns:
323
+ List of investigation dicts
324
+ """
325
+ client = self._ensure_client()
326
+
327
+ query = client.table("investigations").select("*").eq("user_id", user_id)
328
+
329
+ if status:
330
+ query = query.eq("status", status)
331
+
332
+ query = query.order("created_at", desc=True).range(offset, offset + limit - 1)
333
+
334
+ result = query.execute()
335
+
336
+ return result.data if result.data else []
337
+
338
+ async def delete_investigation(
339
+ self,
340
+ investigation_id: str,
341
+ user_id: str,
342
+ ) -> bool:
343
+ """
344
+ Delete an investigation (soft delete by marking as cancelled).
345
+
346
+ Args:
347
+ investigation_id: Investigation UUID
348
+ user_id: User ID (for authorization)
349
+
350
+ Returns:
351
+ True if deleted, False if not found
352
+ """
353
+ client = self._ensure_client()
354
+
355
+ result = (
356
+ client.table("investigations")
357
+ .update({
358
+ "status": "cancelled",
359
+ "completed_at": datetime.utcnow().isoformat(),
360
+ })
361
+ .eq("id", investigation_id)
362
+ .eq("user_id", user_id)
363
+ .execute()
364
+ )
365
+
366
+ if result.data and len(result.data) > 0:
367
+ logger.info(f"Cancelled investigation {investigation_id} via REST API")
368
+ return True
369
+
370
+ return False
371
+
372
+ async def health_check(self) -> Dict[str, Any]:
373
+ """
374
+ Check Supabase connection health.
375
+
376
+ Returns:
377
+ Health status dict
378
+ """
379
+ try:
380
+ client = self._ensure_client()
381
+
382
+ # Simple query to test connection
383
+ result = client.table("investigations").select("id").limit(1).execute()
384
+
385
+ return {
386
+ "status": "healthy",
387
+ "connected": True,
388
+ "api_version": "rest",
389
+ }
390
+ except Exception as e:
391
+ logger.error(f"Supabase REST health check failed: {e}")
392
+ return {
393
+ "status": "unhealthy",
394
+ "connected": False,
395
+ "error": str(e),
396
+ }
397
+
398
+
399
+ # Global service instance
400
+ supabase_service_rest = SupabaseServiceRest()
401
+
402
+
403
+ async def get_supabase_service_rest() -> SupabaseServiceRest:
404
+ """Get the global Supabase REST service instance."""
405
+ if not supabase_service_rest._initialized:
406
+ await supabase_service_rest.initialize()
407
+ return supabase_service_rest