anderson-ufrj commited on
Commit
43301b6
·
1 Parent(s): 6a77881

test(auth): add comprehensive JWT authentication tests

Browse files

- Test password hashing with bcrypt
- Test JWT token creation and validation
- Test access and refresh token lifecycle
- Test user authentication flow
- Add tests for current user retrieval
- Test AuthService class methods

Files changed (1) hide show
  1. tests/unit/test_auth.py +490 -0
tests/unit/test_auth.py ADDED
@@ -0,0 +1,490 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Unit tests for authentication system."""
2
+ import pytest
3
+ from datetime import datetime, timedelta
4
+ from unittest.mock import MagicMock, patch
5
+ from jose import jwt
6
+ from sqlalchemy.ext.asyncio import AsyncSession
7
+
8
+ from src.api.auth import (
9
+ create_access_token,
10
+ create_refresh_token,
11
+ verify_token,
12
+ get_current_user,
13
+ authenticate_user,
14
+ hash_password,
15
+ verify_password,
16
+ AuthService
17
+ )
18
+ from src.models.user import User
19
+ from src.core.config import get_settings
20
+ from src.core.exceptions import UnauthorizedError
21
+
22
+
23
+ class TestPasswordHashing:
24
+ """Test password hashing and verification."""
25
+
26
+ def test_hash_password(self):
27
+ """Test password hashing creates different hash each time."""
28
+ password = "secure_password_123"
29
+ hash1 = hash_password(password)
30
+ hash2 = hash_password(password)
31
+
32
+ assert hash1 != hash2 # Different salts
33
+ assert hash1.startswith("$2b$") # bcrypt format
34
+ assert len(hash1) > 50 # Reasonable hash length
35
+
36
+ def test_verify_password_correct(self):
37
+ """Test verifying correct password."""
38
+ password = "test_password_456"
39
+ hashed = hash_password(password)
40
+
41
+ assert verify_password(password, hashed) is True
42
+
43
+ def test_verify_password_incorrect(self):
44
+ """Test verifying incorrect password."""
45
+ password = "correct_password"
46
+ wrong_password = "wrong_password"
47
+ hashed = hash_password(password)
48
+
49
+ assert verify_password(wrong_password, hashed) is False
50
+
51
+
52
+ class TestTokenCreation:
53
+ """Test JWT token creation."""
54
+
55
+ @pytest.fixture
56
+ def settings(self):
57
+ """Mock settings."""
58
+ settings = MagicMock()
59
+ settings.JWT_SECRET_KEY = "test_secret_key_123"
60
+ settings.JWT_ALGORITHM = "HS256"
61
+ settings.ACCESS_TOKEN_EXPIRE_MINUTES = 30
62
+ settings.REFRESH_TOKEN_EXPIRE_DAYS = 7
63
+ return settings
64
+
65
+ def test_create_access_token(self, settings):
66
+ """Test access token creation."""
67
+ with patch("src.api.auth.get_settings", return_value=settings):
68
+ user_id = "user123"
69
+ email = "[email protected]"
70
+ role = "user"
71
+
72
+ token = create_access_token(user_id, email, role)
73
+
74
+ # Decode and verify token
75
+ payload = jwt.decode(
76
+ token,
77
+ settings.JWT_SECRET_KEY,
78
+ algorithms=[settings.JWT_ALGORITHM]
79
+ )
80
+
81
+ assert payload["sub"] == user_id
82
+ assert payload["email"] == email
83
+ assert payload["role"] == role
84
+ assert payload["type"] == "access"
85
+ assert "exp" in payload
86
+ assert "iat" in payload
87
+
88
+ def test_create_refresh_token(self, settings):
89
+ """Test refresh token creation."""
90
+ with patch("src.api.auth.get_settings", return_value=settings):
91
+ user_id = "user456"
92
+
93
+ token = create_refresh_token(user_id)
94
+
95
+ # Decode and verify token
96
+ payload = jwt.decode(
97
+ token,
98
+ settings.JWT_SECRET_KEY,
99
+ algorithms=[settings.JWT_ALGORITHM]
100
+ )
101
+
102
+ assert payload["sub"] == user_id
103
+ assert payload["type"] == "refresh"
104
+ assert "exp" in payload
105
+ assert "iat" in payload
106
+
107
+ def test_token_expiration(self, settings):
108
+ """Test token expiration times."""
109
+ with patch("src.api.auth.get_settings", return_value=settings):
110
+ # Create tokens
111
+ access_token = create_access_token("user1", "[email protected]", "user")
112
+ refresh_token = create_refresh_token("user1")
113
+
114
+ # Decode tokens
115
+ access_payload = jwt.decode(
116
+ access_token,
117
+ settings.JWT_SECRET_KEY,
118
+ algorithms=[settings.JWT_ALGORITHM]
119
+ )
120
+ refresh_payload = jwt.decode(
121
+ refresh_token,
122
+ settings.JWT_SECRET_KEY,
123
+ algorithms=[settings.JWT_ALGORITHM]
124
+ )
125
+
126
+ # Check expiration times
127
+ access_exp = datetime.fromtimestamp(access_payload["exp"])
128
+ refresh_exp = datetime.fromtimestamp(refresh_payload["exp"])
129
+ now = datetime.utcnow()
130
+
131
+ # Access token should expire in ~30 minutes
132
+ assert (access_exp - now) < timedelta(minutes=31)
133
+ assert (access_exp - now) > timedelta(minutes=29)
134
+
135
+ # Refresh token should expire in ~7 days
136
+ assert (refresh_exp - now) < timedelta(days=7, minutes=1)
137
+ assert (refresh_exp - now) > timedelta(days=6, hours=23)
138
+
139
+
140
+ class TestTokenVerification:
141
+ """Test JWT token verification."""
142
+
143
+ @pytest.fixture
144
+ def settings(self):
145
+ """Mock settings."""
146
+ settings = MagicMock()
147
+ settings.JWT_SECRET_KEY = "test_secret_key_789"
148
+ settings.JWT_ALGORITHM = "HS256"
149
+ return settings
150
+
151
+ @pytest.mark.asyncio
152
+ async def test_verify_valid_token(self, settings):
153
+ """Test verifying valid token."""
154
+ with patch("src.api.auth.get_settings", return_value=settings):
155
+ # Create valid token
156
+ user_id = "valid_user"
157
+ token = create_access_token(user_id, "[email protected]", "user")
158
+
159
+ # Verify token
160
+ payload = await verify_token(token)
161
+
162
+ assert payload["sub"] == user_id
163
+ assert payload["type"] == "access"
164
+
165
+ @pytest.mark.asyncio
166
+ async def test_verify_expired_token(self, settings):
167
+ """Test verifying expired token."""
168
+ with patch("src.api.auth.get_settings", return_value=settings):
169
+ # Create expired token
170
+ payload = {
171
+ "sub": "user123",
172
+ "type": "access",
173
+ "exp": datetime.utcnow() - timedelta(hours=1),
174
+ "iat": datetime.utcnow() - timedelta(hours=2)
175
+ }
176
+ expired_token = jwt.encode(
177
+ payload,
178
+ settings.JWT_SECRET_KEY,
179
+ algorithm=settings.JWT_ALGORITHM
180
+ )
181
+
182
+ # Should raise UnauthorizedError
183
+ with pytest.raises(UnauthorizedError) as exc_info:
184
+ await verify_token(expired_token)
185
+
186
+ assert "Token has expired" in str(exc_info.value)
187
+
188
+ @pytest.mark.asyncio
189
+ async def test_verify_invalid_token(self, settings):
190
+ """Test verifying invalid token."""
191
+ with patch("src.api.auth.get_settings", return_value=settings):
192
+ invalid_token = "invalid.jwt.token"
193
+
194
+ # Should raise UnauthorizedError
195
+ with pytest.raises(UnauthorizedError) as exc_info:
196
+ await verify_token(invalid_token)
197
+
198
+ assert "Invalid token" in str(exc_info.value)
199
+
200
+ @pytest.mark.asyncio
201
+ async def test_verify_wrong_secret(self, settings):
202
+ """Test verifying token with wrong secret."""
203
+ # Create token with one secret
204
+ with patch("src.api.auth.get_settings", return_value=settings):
205
+ token = create_access_token("user1", "[email protected]", "user")
206
+
207
+ # Try to verify with different secret
208
+ settings.JWT_SECRET_KEY = "different_secret"
209
+ with patch("src.api.auth.get_settings", return_value=settings):
210
+ with pytest.raises(UnauthorizedError) as exc_info:
211
+ await verify_token(token)
212
+
213
+ assert "Invalid token" in str(exc_info.value)
214
+
215
+
216
+ class TestUserAuthentication:
217
+ """Test user authentication flow."""
218
+
219
+ @pytest.mark.asyncio
220
+ async def test_authenticate_user_success(self, async_session):
221
+ """Test successful user authentication."""
222
+ # Create test user
223
+ password = "secure_password_123"
224
+ hashed = hash_password(password)
225
+
226
+ user = User(
227
+ email="[email protected]",
228
+ hashed_password=hashed,
229
+ is_active=True
230
+ )
231
+ async_session.add(user)
232
+ await async_session.commit()
233
+
234
+ # Mock the database query
235
+ mock_result = MagicMock()
236
+ mock_result.scalar_one_or_none.return_value = user
237
+ async_session.execute.return_value = mock_result
238
+
239
+ # Authenticate
240
+ authenticated_user = await authenticate_user(
241
+ async_session,
242
243
+ password
244
+ )
245
+
246
+ assert authenticated_user is not None
247
+ assert authenticated_user.email == "[email protected]"
248
+
249
+ @pytest.mark.asyncio
250
+ async def test_authenticate_user_wrong_password(self, async_session):
251
+ """Test authentication with wrong password."""
252
+ # Create test user
253
+ password = "correct_password"
254
+ hashed = hash_password(password)
255
+
256
+ user = User(
257
+ email="[email protected]",
258
+ hashed_password=hashed,
259
+ is_active=True
260
+ )
261
+
262
+ # Mock the database query
263
+ mock_result = MagicMock()
264
+ mock_result.scalar_one_or_none.return_value = user
265
+ async_session.execute.return_value = mock_result
266
+
267
+ # Try to authenticate with wrong password
268
+ authenticated_user = await authenticate_user(
269
+ async_session,
270
271
+ "wrong_password"
272
+ )
273
+
274
+ assert authenticated_user is None
275
+
276
+ @pytest.mark.asyncio
277
+ async def test_authenticate_user_not_found(self, async_session):
278
+ """Test authentication with non-existent user."""
279
+ # Mock the database query to return None
280
+ mock_result = MagicMock()
281
+ mock_result.scalar_one_or_none.return_value = None
282
+ async_session.execute.return_value = mock_result
283
+
284
+ # Try to authenticate
285
+ authenticated_user = await authenticate_user(
286
+ async_session,
287
288
+ "any_password"
289
+ )
290
+
291
+ assert authenticated_user is None
292
+
293
+ @pytest.mark.asyncio
294
+ async def test_authenticate_inactive_user(self, async_session):
295
+ """Test authentication with inactive user."""
296
+ # Create inactive user
297
+ password = "password123"
298
+ hashed = hash_password(password)
299
+
300
+ user = User(
301
+ email="[email protected]",
302
+ hashed_password=hashed,
303
+ is_active=False
304
+ )
305
+
306
+ # Mock the database query
307
+ mock_result = MagicMock()
308
+ mock_result.scalar_one_or_none.return_value = user
309
+ async_session.execute.return_value = mock_result
310
+
311
+ # Try to authenticate
312
+ authenticated_user = await authenticate_user(
313
+ async_session,
314
315
+ password
316
+ )
317
+
318
+ assert authenticated_user is None
319
+
320
+
321
+ class TestGetCurrentUser:
322
+ """Test getting current user from token."""
323
+
324
+ @pytest.fixture
325
+ def settings(self):
326
+ """Mock settings."""
327
+ settings = MagicMock()
328
+ settings.JWT_SECRET_KEY = "test_secret"
329
+ settings.JWT_ALGORITHM = "HS256"
330
+ return settings
331
+
332
+ @pytest.mark.asyncio
333
+ async def test_get_current_user_valid(self, async_session, settings):
334
+ """Test getting current user with valid token."""
335
+ with patch("src.api.auth.get_settings", return_value=settings):
336
+ # Create test user
337
+ user = User(
338
+ id="user123",
339
+ email="[email protected]",
340
+ is_active=True
341
+ )
342
+
343
+ # Create valid token
344
+ token = create_access_token(user.id, user.email, "user")
345
+
346
+ # Mock the database query
347
+ mock_result = MagicMock()
348
+ mock_result.scalar_one_or_none.return_value = user
349
+ async_session.execute.return_value = mock_result
350
+
351
+ # Get current user
352
+ current_user = await get_current_user(token, async_session)
353
+
354
+ assert current_user is not None
355
+ assert current_user.id == "user123"
356
+ assert current_user.email == "[email protected]"
357
+
358
+ @pytest.mark.asyncio
359
+ async def test_get_current_user_invalid_token(self, async_session, settings):
360
+ """Test getting current user with invalid token."""
361
+ with patch("src.api.auth.get_settings", return_value=settings):
362
+ invalid_token = "invalid.token"
363
+
364
+ # Should raise UnauthorizedError
365
+ with pytest.raises(UnauthorizedError):
366
+ await get_current_user(invalid_token, async_session)
367
+
368
+ @pytest.mark.asyncio
369
+ async def test_get_current_user_not_found(self, async_session, settings):
370
+ """Test getting current user when user not found in database."""
371
+ with patch("src.api.auth.get_settings", return_value=settings):
372
+ # Create token for non-existent user
373
+ token = create_access_token("ghost_user", "[email protected]", "user")
374
+
375
+ # Mock the database query to return None
376
+ mock_result = MagicMock()
377
+ mock_result.scalar_one_or_none.return_value = None
378
+ async_session.execute.return_value = mock_result
379
+
380
+ # Should raise UnauthorizedError
381
+ with pytest.raises(UnauthorizedError) as exc_info:
382
+ await get_current_user(token, async_session)
383
+
384
+ assert "User not found" in str(exc_info.value)
385
+
386
+ @pytest.mark.asyncio
387
+ async def test_get_current_user_inactive(self, async_session, settings):
388
+ """Test getting current user when user is inactive."""
389
+ with patch("src.api.auth.get_settings", return_value=settings):
390
+ # Create inactive user
391
+ user = User(
392
+ id="inactive123",
393
+ email="[email protected]",
394
+ is_active=False
395
+ )
396
+
397
+ # Create token
398
+ token = create_access_token(user.id, user.email, "user")
399
+
400
+ # Mock the database query
401
+ mock_result = MagicMock()
402
+ mock_result.scalar_one_or_none.return_value = user
403
+ async_session.execute.return_value = mock_result
404
+
405
+ # Should raise UnauthorizedError
406
+ with pytest.raises(UnauthorizedError) as exc_info:
407
+ await get_current_user(token, async_session)
408
+
409
+ assert "User is inactive" in str(exc_info.value)
410
+
411
+
412
+ class TestAuthService:
413
+ """Test AuthService class methods."""
414
+
415
+ @pytest.mark.asyncio
416
+ async def test_auth_service_login(self, async_session):
417
+ """Test AuthService login method."""
418
+ # Create test user
419
+ password = "test_password"
420
+ hashed = hash_password(password)
421
+
422
+ user = User(
423
+ id="service_user",
424
+ email="[email protected]",
425
+ hashed_password=hashed,
426
+ is_active=True,
427
+ role="admin"
428
+ )
429
+
430
+ # Mock database
431
+ mock_result = MagicMock()
432
+ mock_result.scalar_one_or_none.return_value = user
433
+ async_session.execute.return_value = mock_result
434
+
435
+ # Test login
436
+ auth_service = AuthService(async_session)
437
+ tokens = await auth_service.login("[email protected]", password)
438
+
439
+ assert "access_token" in tokens
440
+ assert "refresh_token" in tokens
441
+ assert "token_type" in tokens
442
+ assert tokens["token_type"] == "bearer"
443
+
444
+ @pytest.mark.asyncio
445
+ async def test_auth_service_login_failed(self, async_session):
446
+ """Test AuthService login with invalid credentials."""
447
+ # Mock database to return None
448
+ mock_result = MagicMock()
449
+ mock_result.scalar_one_or_none.return_value = None
450
+ async_session.execute.return_value = mock_result
451
+
452
+ # Test login
453
+ auth_service = AuthService(async_session)
454
+
455
+ with pytest.raises(UnauthorizedError) as exc_info:
456
+ await auth_service.login("[email protected]", "wrong_password")
457
+
458
+ assert "Invalid email or password" in str(exc_info.value)
459
+
460
+ @pytest.mark.asyncio
461
+ async def test_auth_service_refresh_token(self, async_session):
462
+ """Test AuthService refresh token method."""
463
+ settings = MagicMock()
464
+ settings.JWT_SECRET_KEY = "test_secret"
465
+ settings.JWT_ALGORITHM = "HS256"
466
+
467
+ with patch("src.api.auth.get_settings", return_value=settings):
468
+ # Create test user
469
+ user = User(
470
+ id="refresh_user",
471
+ email="[email protected]",
472
+ is_active=True,
473
+ role="user"
474
+ )
475
+
476
+ # Create refresh token
477
+ refresh_token = create_refresh_token(user.id)
478
+
479
+ # Mock database
480
+ mock_result = MagicMock()
481
+ mock_result.scalar_one_or_none.return_value = user
482
+ async_session.execute.return_value = mock_result
483
+
484
+ # Test refresh
485
+ auth_service = AuthService(async_session)
486
+ new_tokens = await auth_service.refresh_token(refresh_token)
487
+
488
+ assert "access_token" in new_tokens
489
+ assert "refresh_token" in new_tokens
490
+ assert new_tokens["access_token"] != refresh_token