- Created a new test package for services and added tests for AuthService. - Implemented tests for user registration, login, and token creation. - Added a new test package for utilities and included tests for password and JWT utilities. - Updated `uv.lock` to include new dependencies: bcrypt, email-validator, pyjwt, and pytest-asyncio.
371 lines
11 KiB
Python
371 lines
11 KiB
Python
"""Tests for authentication endpoints."""
|
|
|
|
from typing import Any
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
from app.models.plan import Plan
|
|
from app.models.user import User
|
|
|
|
|
|
class TestAuthEndpoints:
|
|
"""Test authentication API endpoints."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_success(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user_data: dict[str, str],
|
|
test_plan: Plan
|
|
) -> None:
|
|
"""Test successful user registration."""
|
|
response = await test_client.post(
|
|
"/api/v1/auth/register",
|
|
json=test_user_data
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
|
|
# Check response structure
|
|
assert "user" in data
|
|
assert "token" in data
|
|
|
|
# Check user data
|
|
user = data["user"]
|
|
assert user["email"] == test_user_data["email"]
|
|
assert user["name"] == test_user_data["name"]
|
|
assert user["role"] == "user"
|
|
assert user["is_active"] is True
|
|
assert user["credits"] > 0
|
|
assert "plan" in user
|
|
|
|
# Check token data
|
|
token = data["token"]
|
|
assert "access_token" in token
|
|
assert token["token_type"] == "bearer"
|
|
assert token["expires_in"] > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_duplicate_email(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User
|
|
) -> None:
|
|
"""Test registration with duplicate email."""
|
|
user_data = {
|
|
"email": test_user.email,
|
|
"password": "password123",
|
|
"name": "Another User"
|
|
}
|
|
|
|
response = await test_client.post(
|
|
"/api/v1/auth/register",
|
|
json=user_data
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert "Email address is already registered" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_invalid_email(
|
|
self,
|
|
test_client: AsyncClient
|
|
) -> None:
|
|
"""Test registration with invalid email."""
|
|
user_data = {
|
|
"email": "invalid-email",
|
|
"password": "password123",
|
|
"name": "Test User"
|
|
}
|
|
|
|
response = await test_client.post(
|
|
"/api/v1/auth/register",
|
|
json=user_data
|
|
)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_short_password(
|
|
self,
|
|
test_client: AsyncClient
|
|
) -> None:
|
|
"""Test registration with short password."""
|
|
user_data = {
|
|
"email": "test@example.com",
|
|
"password": "short",
|
|
"name": "Test User"
|
|
}
|
|
|
|
response = await test_client.post(
|
|
"/api/v1/auth/register",
|
|
json=user_data
|
|
)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_missing_fields(
|
|
self,
|
|
test_client: AsyncClient
|
|
) -> None:
|
|
"""Test registration with missing fields."""
|
|
user_data = {
|
|
"email": "test@example.com"
|
|
# Missing password and name
|
|
}
|
|
|
|
response = await test_client.post(
|
|
"/api/v1/auth/register",
|
|
json=user_data
|
|
)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_success(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
test_login_data: dict[str, str]
|
|
) -> None:
|
|
"""Test successful user login."""
|
|
response = await test_client.post(
|
|
"/api/v1/auth/login",
|
|
json=test_login_data
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Check response structure
|
|
assert "user" in data
|
|
assert "token" in data
|
|
|
|
# Check user data
|
|
user = data["user"]
|
|
assert user["id"] == test_user.id
|
|
assert user["email"] == test_user.email
|
|
assert user["name"] == test_user.name
|
|
assert user["role"] == test_user.role
|
|
|
|
# Check token data
|
|
token = data["token"]
|
|
assert "access_token" in token
|
|
assert token["token_type"] == "bearer"
|
|
assert token["expires_in"] > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_invalid_email(
|
|
self,
|
|
test_client: AsyncClient
|
|
) -> None:
|
|
"""Test login with invalid email."""
|
|
login_data = {
|
|
"email": "nonexistent@example.com",
|
|
"password": "password123"
|
|
}
|
|
|
|
response = await test_client.post(
|
|
"/api/v1/auth/login",
|
|
json=login_data
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Invalid email or password" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_invalid_password(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User
|
|
) -> None:
|
|
"""Test login with invalid password."""
|
|
login_data = {
|
|
"email": test_user.email,
|
|
"password": "wrongpassword"
|
|
}
|
|
|
|
response = await test_client.post(
|
|
"/api/v1/auth/login",
|
|
json=login_data
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Invalid email or password" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_malformed_request(
|
|
self,
|
|
test_client: AsyncClient
|
|
) -> None:
|
|
"""Test login with malformed request."""
|
|
login_data = {
|
|
"email": "invalid-email",
|
|
"password": "password123"
|
|
}
|
|
|
|
response = await test_client.post(
|
|
"/api/v1/auth/login",
|
|
json=login_data
|
|
)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_user_success(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
auth_headers: dict[str, str]
|
|
) -> None:
|
|
"""Test getting current user info successfully."""
|
|
response = await test_client.get(
|
|
"/api/v1/auth/me",
|
|
headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Check user data
|
|
assert data["id"] == test_user.id
|
|
assert data["email"] == test_user.email
|
|
assert data["name"] == test_user.name
|
|
assert data["role"] == test_user.role
|
|
assert data["is_active"] == test_user.is_active
|
|
assert "plan" in data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_user_no_token(
|
|
self,
|
|
test_client: AsyncClient
|
|
) -> None:
|
|
"""Test getting current user without authentication token."""
|
|
response = await test_client.get("/api/v1/auth/me")
|
|
|
|
assert response.status_code == 403 # Forbidden (no token provided)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_user_invalid_token(
|
|
self,
|
|
test_client: AsyncClient
|
|
) -> None:
|
|
"""Test getting current user with invalid token."""
|
|
headers = {"Authorization": "Bearer invalid_token"}
|
|
|
|
response = await test_client.get(
|
|
"/api/v1/auth/me",
|
|
headers=headers
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Could not validate credentials" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_user_expired_token(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User
|
|
) -> None:
|
|
"""Test getting current user with expired token."""
|
|
from datetime import timedelta
|
|
from app.utils.auth import JWTUtils
|
|
|
|
# Create an expired token (expires immediately)
|
|
token_data = {
|
|
"sub": str(test_user.id),
|
|
"email": test_user.email,
|
|
"role": test_user.role,
|
|
}
|
|
expired_token = JWTUtils.create_access_token(
|
|
token_data,
|
|
expires_delta=timedelta(seconds=-1)
|
|
)
|
|
|
|
headers = {"Authorization": f"Bearer {expired_token}"}
|
|
|
|
response = await test_client.get(
|
|
"/api/v1/auth/me",
|
|
headers=headers
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
# The actual error message comes from the JWT library for expired tokens
|
|
assert "Token has expired" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_logout_success(
|
|
self,
|
|
test_client: AsyncClient
|
|
) -> None:
|
|
"""Test logout endpoint."""
|
|
response = await test_client.post("/api/v1/auth/logout")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "Successfully logged out" in data["message"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_admin_access_with_user_role(
|
|
self,
|
|
test_client: AsyncClient,
|
|
auth_headers: dict[str, str]
|
|
) -> None:
|
|
"""Test that regular users cannot access admin endpoints."""
|
|
# This test would be for admin-only endpoints when they're created
|
|
# For now, we'll test the dependency behavior
|
|
from app.core.dependencies import get_admin_user
|
|
from app.models.user import User
|
|
from fastapi import HTTPException
|
|
import pytest
|
|
|
|
# Create a mock user with regular role
|
|
regular_user = User(
|
|
id=1,
|
|
email="user@example.com",
|
|
name="Regular User",
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=1,
|
|
credits=100
|
|
)
|
|
|
|
# Test that get_admin_user raises exception for regular user
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await get_admin_user(regular_user)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
assert "Not enough permissions" in exc_info.value.detail
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_admin_access_with_admin_role(
|
|
self,
|
|
test_client: AsyncClient,
|
|
admin_headers: dict[str, str]
|
|
) -> None:
|
|
"""Test that admin users can access admin endpoints."""
|
|
from app.core.dependencies import get_admin_user
|
|
from app.models.user import User
|
|
|
|
# Create a mock admin user
|
|
admin_user = User(
|
|
id=1,
|
|
email="admin@example.com",
|
|
name="Admin User",
|
|
role="admin",
|
|
is_active=True,
|
|
plan_id=1,
|
|
credits=1000
|
|
)
|
|
|
|
# Test that get_admin_user passes for admin user
|
|
result = await get_admin_user(admin_user)
|
|
assert result == admin_user |