Files
sdb2-backend/tests/repositories/test_user.py
JSC bccfcafe0e
Some checks failed
Backend CI / lint (push) Failing after 10s
Backend CI / test (push) Failing after 1m37s
feat: Update CORS origins to allow Chrome extensions and improve logging in migration tool
2025-09-19 16:41:11 +02:00

367 lines
13 KiB
Python

"""Tests for user repository."""
from collections.abc import AsyncGenerator
import pytest
import pytest_asyncio
from sqlmodel import delete
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.plan import Plan
from app.models.user import User
from app.repositories.user import UserRepository
from app.utils.auth import PasswordUtils
class TestUserRepository:
"""Test user repository operations."""
@pytest_asyncio.fixture
async def user_repository(
self,
test_session: AsyncSession,
) -> AsyncGenerator[UserRepository]: # type: ignore[misc]
"""Create a user repository instance."""
yield UserRepository(test_session)
@pytest.mark.asyncio
async def test_get_by_id_existing(
self,
user_repository: UserRepository,
test_user: User,
) -> None:
"""Test getting user by ID when user exists."""
assert test_user.id is not None
user = await user_repository.get_by_id(test_user.id)
assert user is not None
assert user.id == test_user.id
assert user.email == test_user.email
assert user.name == test_user.name
@pytest.mark.asyncio
async def test_get_by_id_nonexistent(
self,
user_repository: UserRepository,
) -> None:
"""Test getting user by ID when user doesn't exist."""
user = await user_repository.get_by_id(99999)
assert user is None
@pytest.mark.asyncio
async def test_get_by_email_existing(
self,
user_repository: UserRepository,
test_user: User,
) -> None:
"""Test getting user by email when user exists."""
user = await user_repository.get_by_email(test_user.email)
assert user is not None
assert user.id == test_user.id
assert user.email == test_user.email
assert user.name == test_user.name
@pytest.mark.asyncio
async def test_get_by_email_nonexistent(
self,
user_repository: UserRepository,
) -> None:
"""Test getting user by email when user doesn't exist."""
user = await user_repository.get_by_email("nonexistent@example.com")
assert user is None
@pytest.mark.asyncio
async def test_get_by_api_token_existing(
self,
user_repository: UserRepository,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test getting user by API token when token exists."""
# Set an API token for the test user
test_token = "test_api_token_123"
test_user.api_token = test_token
await test_session.commit()
user = await user_repository.get_by_api_token(test_token)
assert user is not None
assert user.id == test_user.id
assert user.api_token == test_token
@pytest.mark.asyncio
async def test_get_by_api_token_nonexistent(
self,
user_repository: UserRepository,
) -> None:
"""Test getting user by API token when token doesn't exist."""
user = await user_repository.get_by_api_token("nonexistent_token")
assert user is None
@pytest.mark.asyncio
async def test_create_user(
self,
user_repository: UserRepository,
ensure_plans: tuple[Plan, Plan],
test_session: AsyncSession,
) -> None:
"""Test creating a new user."""
free_plan, _pro_plan = ensure_plans
plan_id = free_plan.id
plan_credits = free_plan.credits
# Create a first user to ensure subsequent users get free plan
first_user_data = {
"email": "firstuser@example.com",
"name": "First User",
"password_hash": PasswordUtils.hash_password("password123"),
"is_active": True,
}
first_user = await user_repository.create(first_user_data)
assert first_user.role == "admin" # Verify first user is admin
# Now create the test user (should get free plan)
user_data = {
"email": "newuser@example.com",
"name": "New User",
"password_hash": PasswordUtils.hash_password("password123"),
"role": "user",
"is_active": True,
}
user = await user_repository.create(user_data)
await test_session.refresh(user, ["plan"])
assert user.id is not None
assert user.email == user_data["email"]
assert user.name == user_data["name"]
assert user.role == "user" # Should be user role (not admin)
assert user.is_active == user_data["is_active"]
assert user.plan_id == plan_id # Should get free plan
assert user.credits == plan_credits
@pytest.mark.asyncio
async def test_create_user_without_default_plan(
self,
user_repository: UserRepository,
test_session: AsyncSession,
) -> None:
"""Test creating user when no default plan exists."""
# Remove all plans but don't commit to avoid transaction issues
stmt = delete(Plan)
await test_session.exec(stmt)
# Don't commit here - let the exception handling work normally
user_data = {
"email": "newuser@example.com",
"name": "New User",
"password_hash": PasswordUtils.hash_password("password123"),
"role": "user",
"is_active": True,
}
with pytest.raises(ValueError, match="Default plan not found"):
await user_repository.create(user_data)
@pytest.mark.asyncio
async def test_update_user(
self,
user_repository: UserRepository,
test_user: User,
) -> None:
"""Test updating a user."""
updated_credits = 200
update_data = {
"name": "Updated Name",
"credits": updated_credits,
}
updated_user = await user_repository.update(test_user, update_data)
assert updated_user.id == test_user.id
assert updated_user.name == "Updated Name"
assert updated_user.credits == updated_credits
assert updated_user.email == test_user.email # Unchanged
@pytest.mark.asyncio
async def test_delete_user(
self,
user_repository: UserRepository,
ensure_plans: tuple[Plan, Plan],
test_session: AsyncSession,
) -> None:
"""Test deleting a user."""
# Create a user to delete
user_data = {
"email": "todelete@example.com",
"name": "To Delete",
"password_hash": PasswordUtils.hash_password("password123"),
"role": "user",
"is_active": True,
}
user = await user_repository.create(user_data)
await test_session.refresh(user, ["plan"])
assert user.id is not None
user_id = user.id
# Clean up any favorites that might reference this specific user to avoid foreign key constraint violations
from sqlmodel import select
from app.models.favorite import Favorite
# Only delete favorites for the user we're about to delete, not all favorites
existing_favorites = await test_session.exec(select(Favorite).where(Favorite.user_id == user_id))
for favorite in existing_favorites.all():
await test_session.delete(favorite)
await test_session.commit()
# Delete the user
await user_repository.delete(user)
# Verify user is deleted
deleted_user = await user_repository.get_by_id(user_id)
assert deleted_user is None
@pytest.mark.asyncio
async def test_email_exists_true(
self,
user_repository: UserRepository,
test_user: User,
) -> None:
"""Test email existence check when email exists."""
exists = await user_repository.email_exists(test_user.email)
assert exists is True
@pytest.mark.asyncio
async def test_email_exists_false(
self,
user_repository: UserRepository,
) -> None:
"""Test email existence check when email doesn't exist."""
exists = await user_repository.email_exists("nonexistent@example.com")
assert exists is False
@pytest.mark.asyncio
async def test_email_exists_case_sensitive(
self,
user_repository: UserRepository,
test_user: User,
) -> None:
"""Test email existence check behavior with different cases."""
# Test with lowercase email (original)
exists_lower = await user_repository.email_exists(test_user.email.lower())
# SQLite with default collation should be case insensitive for email searches
# But the test shows it's case sensitive, so let's test the actual behavior
assert exists_lower is True # Original email should exist
@pytest.mark.asyncio
async def test_first_user_gets_admin_role_and_pro_plan(
self,
test_session: AsyncSession,
) -> None:
"""Test that the first user gets admin role and pro plan."""
# Ensure no users exist
stmt = delete(User)
await test_session.exec(stmt)
await test_session.commit()
# Create plans since they were cleared by other tests
free_plan = Plan(code="free", name="Free Plan", credits=100, max_credits=100)
pro_plan = Plan(code="pro", name="Pro Plan", credits=300, max_credits=300)
test_session.add(free_plan)
test_session.add(pro_plan)
await test_session.commit()
await test_session.refresh(free_plan)
await test_session.refresh(pro_plan)
user_repository = UserRepository(test_session)
user_data = {
"email": "first@example.com",
"name": "First User",
"password_hash": PasswordUtils.hash_password("password123"),
"is_active": True,
}
user = await user_repository.create(user_data)
await test_session.refresh(user, ["plan"])
assert user.id is not None
assert user.email == user_data["email"]
assert user.name == user_data["name"]
assert user.role == "admin" # First user should be admin
assert user.is_active == user_data["is_active"]
assert user.plan_id == pro_plan.id # Should get pro plan
assert user.credits == pro_plan.credits
@pytest.mark.asyncio
async def test_subsequent_users_get_user_role_and_free_plan(
self,
test_session: AsyncSession,
) -> None:
"""Test that subsequent users get user role and free plan."""
# Ensure no users exist
stmt = delete(User)
await test_session.exec(stmt)
await test_session.commit()
# Create plans since they were cleared by other tests
free_plan = Plan(code="free", name="Free Plan", credits=100, max_credits=100)
pro_plan = Plan(code="pro", name="Pro Plan", credits=300, max_credits=300)
test_session.add(free_plan)
test_session.add(pro_plan)
await test_session.commit()
await test_session.refresh(free_plan)
await test_session.refresh(pro_plan)
user_repository = UserRepository(test_session)
# Create first user
first_user_data = {
"email": "first@example.com",
"name": "First User",
"password_hash": PasswordUtils.hash_password("password123"),
"is_active": True,
}
first_user = await user_repository.create(first_user_data)
assert first_user.role == "admin" # Verify first user is admin
# Create second user
second_user_data = {
"email": "second@example.com",
"name": "Second User",
"password_hash": PasswordUtils.hash_password("password123"),
"is_active": True,
}
second_user = await user_repository.create(second_user_data)
await test_session.refresh(second_user, ["plan"])
assert second_user.id is not None
assert second_user.email == second_user_data["email"]
assert second_user.name == second_user_data["name"]
assert second_user.role == "user" # Second user should be regular user
assert second_user.is_active == second_user_data["is_active"]
assert second_user.plan_id == free_plan.id # Should get free plan
assert second_user.credits == free_plan.credits
# Create third user to further verify
third_user_data = {
"email": "third@example.com",
"name": "Third User",
"password_hash": PasswordUtils.hash_password("password123"),
"is_active": True,
}
third_user = await user_repository.create(third_user_data)
await test_session.refresh(third_user, ["plan"])
assert third_user.role == "user" # Third user should also be regular user
assert third_user.plan_id == free_plan.id # Should get free plan