Enhance test fixtures and user registration logic to ensure plan existence and correct role assignment

This commit is contained in:
JSC
2025-07-25 18:43:29 +02:00
parent 45ba28af52
commit 52ebc59293
5 changed files with 247 additions and 238 deletions

View File

@@ -3,330 +3,270 @@
from typing import Any
import pytest
import pytest_asyncio
from httpx import AsyncClient
from app.models.plan import Plan
from app.models.user import User
from app.utils.auth import JWTUtils
@pytest_asyncio.fixture
async def auth_cookies(test_user: User) -> dict[str, str]:
"""Create authentication cookies with JWT token."""
token_data = {
"sub": str(test_user.id),
"email": test_user.email,
"role": test_user.role,
}
access_token = JWTUtils.create_access_token(token_data)
return {"access_token": access_token}
@pytest_asyncio.fixture
async def admin_cookies(admin_user: User) -> dict[str, str]:
"""Create admin authentication cookies with JWT token."""
token_data = {
"sub": str(admin_user.id),
"email": admin_user.email,
"role": admin_user.role,
}
access_token = JWTUtils.create_access_token(token_data)
return {"access_token": access_token}
class TestAuthEndpoints:
"""Test authentication API endpoints."""
@pytest.mark.asyncio
async def test_register_success(
self,
test_client: AsyncClient,
test_user_data: dict[str, str],
test_plan: Plan
test_user_data: dict[str, Any],
ensure_plans: tuple[Plan, Plan],
) -> None:
"""Test successful user registration."""
response = await test_client.post(
"/api/v1/auth/register",
json=test_user_data
)
response = await test_client.post("/api/v1/auth/register", json=test_user_data)
assert response.status_code == 201
data = response.json()
# Check response structure
assert "user" in data
assert "token" in data
# Check user data
user = data["user"]
assert user["email"] == test_user_data["email"]
assert user["name"] == test_user_data["name"]
assert user["role"] == "user"
assert user["is_active"] is True
assert user["credits"] > 0
assert "plan" in user
# Check token data
token = data["token"]
assert "access_token" in token
assert token["token_type"] == "bearer"
assert token["expires_in"] > 0
# Check user data in response (no token in response body with cookies)
assert data["email"] == test_user_data["email"]
assert data["name"] == test_user_data["name"]
assert data["role"] == "admin" # First user gets admin role
assert data["is_active"] is True
assert data["credits"] > 0
assert "plan" in data
# Check cookies are set
assert "access_token" in response.cookies
assert "refresh_token" in response.cookies
@pytest.mark.asyncio
async def test_register_duplicate_email(
self,
test_client: AsyncClient,
test_user: User
self, test_client: AsyncClient, test_user: User
) -> None:
"""Test registration with duplicate email."""
user_data = {
"email": test_user.email,
"password": "password123",
"name": "Another User"
"name": "Another User",
}
response = await test_client.post(
"/api/v1/auth/register",
json=user_data
)
response = await test_client.post("/api/v1/auth/register", json=user_data)
assert response.status_code == 400
data = response.json()
assert "Email address is already registered" in data["detail"]
@pytest.mark.asyncio
async def test_register_invalid_email(
self,
test_client: AsyncClient
) -> None:
async def test_register_invalid_email(self, test_client: AsyncClient) -> None:
"""Test registration with invalid email."""
user_data = {
"email": "invalid-email",
"password": "password123",
"name": "Test User"
"name": "Test User",
}
response = await test_client.post(
"/api/v1/auth/register",
json=user_data
)
response = await test_client.post("/api/v1/auth/register", json=user_data)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_register_short_password(
self,
test_client: AsyncClient
) -> None:
async def test_register_short_password(self, test_client: AsyncClient) -> None:
"""Test registration with short password."""
user_data = {
"email": "test@example.com",
"password": "short",
"name": "Test User"
"name": "Test User",
}
response = await test_client.post(
"/api/v1/auth/register",
json=user_data
)
response = await test_client.post("/api/v1/auth/register", json=user_data)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_register_missing_fields(
self,
test_client: AsyncClient
) -> None:
async def test_register_missing_fields(self, test_client: AsyncClient) -> None:
"""Test registration with missing fields."""
user_data = {
"email": "test@example.com"
# Missing password and name
}
response = await test_client.post(
"/api/v1/auth/register",
json=user_data
)
response = await test_client.post("/api/v1/auth/register", json=user_data)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_login_success(
self,
test_client: AsyncClient,
test_user: User,
test_login_data: dict[str, str]
self, test_client: AsyncClient, test_user: User, test_login_data: dict[str, str]
) -> None:
"""Test successful user login."""
response = await test_client.post(
"/api/v1/auth/login",
json=test_login_data
)
response = await test_client.post("/api/v1/auth/login", json=test_login_data)
assert response.status_code == 200
data = response.json()
# Check response structure
assert "user" in data
assert "token" in data
# Check user data
user = data["user"]
assert user["id"] == test_user.id
assert user["email"] == test_user.email
assert user["name"] == test_user.name
assert user["role"] == test_user.role
# Check token data
token = data["token"]
assert "access_token" in token
assert token["token_type"] == "bearer"
assert token["expires_in"] > 0
# Check user data in response (no token in response body with cookies)
assert data["email"] == test_login_data["email"]
assert "name" in data
assert "role" in data
assert data["is_active"] is True
# Check cookies are set
assert "access_token" in response.cookies
assert "refresh_token" in response.cookies
@pytest.mark.asyncio
async def test_login_invalid_email(
self,
test_client: AsyncClient
) -> None:
async def test_login_invalid_email(self, test_client: AsyncClient) -> None:
"""Test login with invalid email."""
login_data = {
"email": "nonexistent@example.com",
"password": "password123"
}
response = await test_client.post(
"/api/v1/auth/login",
json=login_data
)
login_data = {"email": "nonexistent@example.com", "password": "password123"}
response = await test_client.post("/api/v1/auth/login", json=login_data)
assert response.status_code == 401
data = response.json()
assert "Invalid email or password" in data["detail"]
@pytest.mark.asyncio
async def test_login_invalid_password(
self,
test_client: AsyncClient,
test_user: User
self, test_client: AsyncClient, test_user: User
) -> None:
"""Test login with invalid password."""
login_data = {
"email": test_user.email,
"password": "wrongpassword"
}
response = await test_client.post(
"/api/v1/auth/login",
json=login_data
)
login_data = {"email": test_user.email, "password": "wrongpassword"}
response = await test_client.post("/api/v1/auth/login", json=login_data)
assert response.status_code == 401
data = response.json()
assert "Invalid email or password" in data["detail"]
@pytest.mark.asyncio
async def test_login_malformed_request(
self,
test_client: AsyncClient
) -> None:
async def test_login_malformed_request(self, test_client: AsyncClient) -> None:
"""Test login with malformed request."""
login_data = {
"email": "invalid-email",
"password": "password123"
}
response = await test_client.post(
"/api/v1/auth/login",
json=login_data
)
login_data = {"email": "invalid-email", "password": "password123"}
response = await test_client.post("/api/v1/auth/login", json=login_data)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_get_current_user_success(
self,
test_client: AsyncClient,
test_user: User,
auth_headers: dict[str, str]
self, test_client: AsyncClient, test_user: User, auth_cookies: dict[str, str]
) -> None:
"""Test getting current user info successfully."""
response = await test_client.get(
"/api/v1/auth/me",
headers=auth_headers
)
# Set cookies on client instance to avoid deprecation warning
test_client.cookies.update(auth_cookies)
response = await test_client.get("/api/v1/auth/me")
assert response.status_code == 200
data = response.json()
# Check user data
assert data["id"] == test_user.id
assert data["email"] == test_user.email
assert data["name"] == test_user.name
assert data["role"] == test_user.role
assert data["is_active"] == test_user.is_active
# Check user data structure
assert "id" in data
assert "email" in data
assert "name" in data
assert "role" in data
assert data["is_active"] is True
assert "plan" in data
@pytest.mark.asyncio
async def test_get_current_user_no_token(
self,
test_client: AsyncClient
) -> None:
async def test_get_current_user_no_token(self, test_client: AsyncClient) -> None:
"""Test getting current user without authentication token."""
response = await test_client.get("/api/v1/auth/me")
assert response.status_code == 403 # Forbidden (no token provided)
assert response.status_code == 422 # Validation error (no cookie provided)
@pytest.mark.asyncio
async def test_get_current_user_invalid_token(
self,
test_client: AsyncClient
self, test_client: AsyncClient
) -> None:
"""Test getting current user with invalid token."""
headers = {"Authorization": "Bearer invalid_token"}
response = await test_client.get(
"/api/v1/auth/me",
headers=headers
)
# Set invalid cookies on client instance
test_client.cookies.update({"access_token": "invalid_token"})
response = await test_client.get("/api/v1/auth/me")
assert response.status_code == 401
data = response.json()
assert "Could not validate credentials" in data["detail"]
@pytest.mark.asyncio
async def test_get_current_user_expired_token(
self,
test_client: AsyncClient,
test_user: User
self, test_client: AsyncClient, test_user: User
) -> None:
"""Test getting current user with expired token."""
from datetime import timedelta
from app.utils.auth import JWTUtils
# Create an expired token (expires immediately)
token_data = {
"sub": str(test_user.id),
"email": test_user.email,
"role": test_user.role,
"sub": "1", # Use a dummy user ID
"email": "test@example.com",
"role": "user",
}
expired_token = JWTUtils.create_access_token(
token_data,
expires_delta=timedelta(seconds=-1)
token_data, expires_delta=timedelta(seconds=-1)
)
headers = {"Authorization": f"Bearer {expired_token}"}
response = await test_client.get(
"/api/v1/auth/me",
headers=headers
)
# Set expired cookies on client instance
test_client.cookies.update({"access_token": expired_token})
response = await test_client.get("/api/v1/auth/me")
assert response.status_code == 401
data = response.json()
# The actual error message comes from the JWT library for expired tokens
assert "Token has expired" in data["detail"]
@pytest.mark.asyncio
async def test_logout_success(
self,
test_client: AsyncClient
) -> None:
async def test_logout_success(self, test_client: AsyncClient) -> None:
"""Test logout endpoint."""
# Logout should work even without cookies (just clears them)
test_client.cookies.update({"access_token": "", "refresh_token": ""})
response = await test_client.post("/api/v1/auth/logout")
assert response.status_code == 200
data = response.json()
assert "Successfully logged out" in data["message"]
@pytest.mark.asyncio
async def test_admin_access_with_user_role(
self,
test_client: AsyncClient,
auth_headers: dict[str, str]
self, test_client: AsyncClient, auth_cookies: dict[str, str]
) -> None:
"""Test that regular users cannot access admin endpoints."""
# This test would be for admin-only endpoints when they're created
# For now, we'll test the dependency behavior
import pytest
from fastapi import HTTPException
from app.core.dependencies import get_admin_user
from app.models.user import User
from fastapi import HTTPException
import pytest
# Create a mock user with regular role
regular_user = User(
id=1,
@@ -335,26 +275,24 @@ class TestAuthEndpoints:
role="user",
is_active=True,
plan_id=1,
credits=100
credits=100,
)
# Test that get_admin_user raises exception for regular user
with pytest.raises(HTTPException) as exc_info:
await get_admin_user(regular_user)
assert exc_info.value.status_code == 403
assert "Not enough permissions" in exc_info.value.detail
@pytest.mark.asyncio
async def test_admin_access_with_admin_role(
self,
test_client: AsyncClient,
admin_headers: dict[str, str]
self, test_client: AsyncClient, admin_cookies: dict[str, str]
) -> None:
"""Test that admin users can access admin endpoints."""
from app.core.dependencies import get_admin_user
from app.models.user import User
# Create a mock admin user
admin_user = User(
id=1,
@@ -363,9 +301,9 @@ class TestAuthEndpoints:
role="admin",
is_active=True,
plan_id=1,
credits=1000
credits=1000,
)
# Test that get_admin_user passes for admin user
result = await get_admin_user(admin_user)
assert result == admin_user
assert result == admin_user

View File

@@ -130,7 +130,47 @@ async def test_pro_plan(test_session: AsyncSession) -> Plan:
@pytest_asyncio.fixture
async def test_user(test_session: AsyncSession, test_plan: Plan) -> User:
async def ensure_plans(test_session: AsyncSession) -> tuple[Plan, Plan]:
"""Ensure both free and pro plans exist."""
# Check for free plan
free_result = await test_session.exec(select(Plan).where(Plan.code == "free"))
free_plan = free_result.first()
if not free_plan:
free_plan = Plan(
code="free",
name="Free Plan",
description="Test free plan",
credits=100,
max_credits=100,
)
test_session.add(free_plan)
# Check for pro plan
pro_result = await test_session.exec(select(Plan).where(Plan.code == "pro"))
pro_plan = pro_result.first()
if not pro_plan:
pro_plan = Plan(
code="pro",
name="Pro Plan",
description="Test pro plan",
credits=300,
max_credits=300,
)
test_session.add(pro_plan)
await test_session.commit()
await test_session.refresh(free_plan)
await test_session.refresh(pro_plan)
return free_plan, pro_plan
@pytest_asyncio.fixture
async def test_user(
test_session: AsyncSession, ensure_plans: tuple[Plan, Plan]
) -> User:
"""Create a test user."""
user = User(
email="test@example.com",
@@ -138,7 +178,7 @@ async def test_user(test_session: AsyncSession, test_plan: Plan) -> User:
password_hash=PasswordUtils.hash_password("testpassword123"),
role="user",
is_active=True,
plan_id=test_plan.id,
plan_id=ensure_plans[0].id, # Use free plan
credits=100,
)
test_session.add(user)
@@ -148,7 +188,9 @@ async def test_user(test_session: AsyncSession, test_plan: Plan) -> User:
@pytest_asyncio.fixture
async def admin_user(test_session: AsyncSession, test_plan: Plan) -> User:
async def admin_user(
test_session: AsyncSession, ensure_plans: tuple[Plan, Plan]
) -> User:
"""Create a test admin user."""
user = User(
email="admin@example.com",
@@ -156,7 +198,7 @@ async def admin_user(test_session: AsyncSession, test_plan: Plan) -> User:
password_hash=PasswordUtils.hash_password("adminpassword123"),
role="admin",
is_active=True,
plan_id=test_plan.id,
plan_id=ensure_plans[1].id, # Use pro plan for admin
credits=1000,
)
test_session.add(user)

View File

@@ -106,12 +106,25 @@ class TestUserRepository:
async def test_create_user(
self,
user_repository: UserRepository,
test_plan: Plan,
ensure_plans: tuple[Plan, Plan],
test_session: AsyncSession,
) -> None:
"""Test creating a new user."""
plan_id = test_plan.id
plan_credits = test_plan.credits
free_plan, pro_plan = ensure_plans
plan_id = free_plan.id
plan_credits = free_plan.credits
# Create a first user to ensure subsequent users get free plan
first_user_data = {
"email": "firstuser@example.com",
"name": "First User",
"password_hash": PasswordUtils.hash_password("password123"),
"is_active": True,
}
first_user = await user_repository.create(first_user_data)
assert first_user.role == "admin" # Verify first user is admin
# Now create the test user (should get free plan)
user_data = {
"email": "newuser@example.com",
"name": "New User",
@@ -121,13 +134,14 @@ class TestUserRepository:
}
user = await user_repository.create(user_data)
await test_session.refresh(user, ["plan"])
assert user.id is not None
assert user.email == user_data["email"]
assert user.name == user_data["name"]
assert user.role == user_data["role"]
assert user.role == "user" # Should be user role (not admin)
assert user.is_active == user_data["is_active"]
assert user.plan_id == plan_id
assert user.plan_id == plan_id # Should get free plan
assert user.credits == plan_credits
@pytest.mark.asyncio
@@ -137,11 +151,10 @@ class TestUserRepository:
test_session: AsyncSession,
) -> None:
"""Test creating user when no default plan exists."""
# Remove all plans
# Remove all plans but don't commit to avoid transaction issues
stmt = delete(Plan)
# Use exec for delete statements
await test_session.exec(stmt)
await test_session.commit()
# Don't commit here - let the exception handling work normally
user_data = {
"email": "newuser@example.com",
@@ -178,7 +191,8 @@ class TestUserRepository:
async def test_delete_user(
self,
user_repository: UserRepository,
test_plan: Plan, # noqa: ARG002
ensure_plans: tuple[Plan, Plan], # noqa: ARG002
test_session: AsyncSession,
) -> None:
"""Test deleting a user."""
# Create a user to delete
@@ -190,6 +204,8 @@ class TestUserRepository:
"is_active": True,
}
user = await user_repository.create(user_data)
await test_session.refresh(user, ["plan"])
assert user.id is not None
user_id = user.id
@@ -265,6 +281,7 @@ class TestUserRepository:
}
user = await user_repository.create(user_data)
await test_session.refresh(user, ["plan"])
assert user.id is not None
assert user.email == user_data["email"]
@@ -314,6 +331,7 @@ class TestUserRepository:
"is_active": True,
}
second_user = await user_repository.create(second_user_data)
await test_session.refresh(second_user, ["plan"])
assert second_user.id is not None
assert second_user.email == second_user_data["email"]
@@ -331,6 +349,7 @@ class TestUserRepository:
"is_active": True,
}
third_user = await user_repository.create(third_user_data)
await test_session.refresh(third_user, ["plan"])
assert third_user.role == "user" # Third user should also be regular user
assert third_user.plan_id == free_plan.id # Should get free plan

View File

@@ -22,7 +22,10 @@ class TestAuthService:
@pytest.mark.asyncio
async def test_register_success(
self, auth_service: AuthService, test_plan: Plan, test_user_data: dict[str, str]
self,
auth_service: AuthService,
ensure_plans: tuple[Plan, Plan],
test_user_data: dict[str, str],
) -> None:
"""Test successful user registration."""
request = UserRegisterRequest(**test_user_data)
@@ -32,10 +35,12 @@ class TestAuthService:
# Check user data
assert response.user.email == test_user_data["email"]
assert response.user.name == test_user_data["name"]
assert response.user.role == "user"
assert response.user.role == "admin" # First user gets admin role
assert response.user.is_active is True
assert response.user.credits == test_plan.credits
assert response.user.plan["code"] == test_plan.code
# First user gets pro plan
free_plan, pro_plan = ensure_plans
assert response.user.credits == pro_plan.credits
assert response.user.plan["code"] == pro_plan.code
# Check token
assert response.token.access_token is not None
@@ -213,7 +218,7 @@ class TestAuthService:
# Ensure plan relationship is loaded
await test_session.refresh(test_user, ["plan"])
user_response = await auth_service._create_user_response(test_user)
user_response = await auth_service.create_user_response(test_user)
assert user_response.id == test_user.id
assert user_response.email == test_user.email