367 lines
13 KiB
Python
367 lines
13 KiB
Python
"""Tests for user repository."""
|
|
|
|
from collections.abc import AsyncGenerator
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from sqlmodel import delete
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from app.models.plan import Plan
|
|
from app.models.user import User
|
|
from app.repositories.user import UserRepository
|
|
from app.utils.auth import PasswordUtils
|
|
|
|
|
|
class TestUserRepository:
|
|
"""Test user repository operations."""
|
|
|
|
@pytest_asyncio.fixture
|
|
async def user_repository(
|
|
self,
|
|
test_session: AsyncSession,
|
|
) -> AsyncGenerator[UserRepository]: # type: ignore[misc]
|
|
"""Create a user repository instance."""
|
|
yield UserRepository(test_session)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_id_existing(
|
|
self,
|
|
user_repository: UserRepository,
|
|
test_user: User,
|
|
) -> None:
|
|
"""Test getting user by ID when user exists."""
|
|
assert test_user.id is not None
|
|
user = await user_repository.get_by_id(test_user.id)
|
|
|
|
assert user is not None
|
|
assert user.id == test_user.id
|
|
assert user.email == test_user.email
|
|
assert user.name == test_user.name
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_id_nonexistent(
|
|
self,
|
|
user_repository: UserRepository,
|
|
) -> None:
|
|
"""Test getting user by ID when user doesn't exist."""
|
|
user = await user_repository.get_by_id(99999)
|
|
|
|
assert user is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_email_existing(
|
|
self,
|
|
user_repository: UserRepository,
|
|
test_user: User,
|
|
) -> None:
|
|
"""Test getting user by email when user exists."""
|
|
user = await user_repository.get_by_email(test_user.email)
|
|
|
|
assert user is not None
|
|
assert user.id == test_user.id
|
|
assert user.email == test_user.email
|
|
assert user.name == test_user.name
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_email_nonexistent(
|
|
self,
|
|
user_repository: UserRepository,
|
|
) -> None:
|
|
"""Test getting user by email when user doesn't exist."""
|
|
user = await user_repository.get_by_email("nonexistent@example.com")
|
|
|
|
assert user is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_api_token_existing(
|
|
self,
|
|
user_repository: UserRepository,
|
|
test_user: User,
|
|
test_session: AsyncSession,
|
|
) -> None:
|
|
"""Test getting user by API token when token exists."""
|
|
# Set an API token for the test user
|
|
test_token = "test_api_token_123"
|
|
test_user.api_token = test_token
|
|
await test_session.commit()
|
|
|
|
user = await user_repository.get_by_api_token(test_token)
|
|
|
|
assert user is not None
|
|
assert user.id == test_user.id
|
|
assert user.api_token == test_token
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_api_token_nonexistent(
|
|
self,
|
|
user_repository: UserRepository,
|
|
) -> None:
|
|
"""Test getting user by API token when token doesn't exist."""
|
|
user = await user_repository.get_by_api_token("nonexistent_token")
|
|
|
|
assert user is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_user(
|
|
self,
|
|
user_repository: UserRepository,
|
|
ensure_plans: tuple[Plan, Plan],
|
|
test_session: AsyncSession,
|
|
) -> None:
|
|
"""Test creating a new user."""
|
|
free_plan, _pro_plan = ensure_plans
|
|
plan_id = free_plan.id
|
|
plan_credits = free_plan.credits
|
|
|
|
# Create a first user to ensure subsequent users get free plan
|
|
first_user_data = {
|
|
"email": "firstuser@example.com",
|
|
"name": "First User",
|
|
"password_hash": PasswordUtils.hash_password("password123"),
|
|
"is_active": True,
|
|
}
|
|
first_user = await user_repository.create(first_user_data)
|
|
assert first_user.role == "admin" # Verify first user is admin
|
|
|
|
# Now create the test user (should get free plan)
|
|
user_data = {
|
|
"email": "newuser@example.com",
|
|
"name": "New User",
|
|
"password_hash": PasswordUtils.hash_password("password123"),
|
|
"role": "user",
|
|
"is_active": True,
|
|
}
|
|
|
|
user = await user_repository.create(user_data)
|
|
await test_session.refresh(user, ["plan"])
|
|
|
|
assert user.id is not None
|
|
assert user.email == user_data["email"]
|
|
assert user.name == user_data["name"]
|
|
assert user.role == "user" # Should be user role (not admin)
|
|
assert user.is_active == user_data["is_active"]
|
|
assert user.plan_id == plan_id # Should get free plan
|
|
assert user.credits == plan_credits
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_user_without_default_plan(
|
|
self,
|
|
user_repository: UserRepository,
|
|
test_session: AsyncSession,
|
|
) -> None:
|
|
"""Test creating user when no default plan exists."""
|
|
# Remove all plans but don't commit to avoid transaction issues
|
|
stmt = delete(Plan)
|
|
await test_session.exec(stmt)
|
|
# Don't commit here - let the exception handling work normally
|
|
|
|
user_data = {
|
|
"email": "newuser@example.com",
|
|
"name": "New User",
|
|
"password_hash": PasswordUtils.hash_password("password123"),
|
|
"role": "user",
|
|
"is_active": True,
|
|
}
|
|
|
|
with pytest.raises(ValueError, match="Default plan not found"):
|
|
await user_repository.create(user_data)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_user(
|
|
self,
|
|
user_repository: UserRepository,
|
|
test_user: User,
|
|
) -> None:
|
|
"""Test updating a user."""
|
|
updated_credits = 200
|
|
update_data = {
|
|
"name": "Updated Name",
|
|
"credits": updated_credits,
|
|
}
|
|
|
|
updated_user = await user_repository.update(test_user, update_data)
|
|
|
|
assert updated_user.id == test_user.id
|
|
assert updated_user.name == "Updated Name"
|
|
assert updated_user.credits == updated_credits
|
|
assert updated_user.email == test_user.email # Unchanged
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_user(
|
|
self,
|
|
user_repository: UserRepository,
|
|
ensure_plans: tuple[Plan, Plan],
|
|
test_session: AsyncSession,
|
|
) -> None:
|
|
"""Test deleting a user."""
|
|
# Create a user to delete
|
|
user_data = {
|
|
"email": "todelete@example.com",
|
|
"name": "To Delete",
|
|
"password_hash": PasswordUtils.hash_password("password123"),
|
|
"role": "user",
|
|
"is_active": True,
|
|
}
|
|
user = await user_repository.create(user_data)
|
|
await test_session.refresh(user, ["plan"])
|
|
|
|
assert user.id is not None
|
|
user_id = user.id
|
|
|
|
# Clean up any favorites that might reference this specific user to avoid foreign key constraint violations
|
|
from sqlmodel import select
|
|
|
|
from app.models.favorite import Favorite
|
|
|
|
# Only delete favorites for the user we're about to delete, not all favorites
|
|
existing_favorites = await test_session.exec(select(Favorite).where(Favorite.user_id == user_id))
|
|
for favorite in existing_favorites.all():
|
|
await test_session.delete(favorite)
|
|
await test_session.commit()
|
|
|
|
# Delete the user
|
|
await user_repository.delete(user)
|
|
|
|
# Verify user is deleted
|
|
deleted_user = await user_repository.get_by_id(user_id)
|
|
assert deleted_user is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_email_exists_true(
|
|
self,
|
|
user_repository: UserRepository,
|
|
test_user: User,
|
|
) -> None:
|
|
"""Test email existence check when email exists."""
|
|
exists = await user_repository.email_exists(test_user.email)
|
|
|
|
assert exists is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_email_exists_false(
|
|
self,
|
|
user_repository: UserRepository,
|
|
) -> None:
|
|
"""Test email existence check when email doesn't exist."""
|
|
exists = await user_repository.email_exists("nonexistent@example.com")
|
|
|
|
assert exists is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_email_exists_case_sensitive(
|
|
self,
|
|
user_repository: UserRepository,
|
|
test_user: User,
|
|
) -> None:
|
|
"""Test email existence check behavior with different cases."""
|
|
# Test with lowercase email (original)
|
|
exists_lower = await user_repository.email_exists(test_user.email.lower())
|
|
|
|
# SQLite with default collation should be case insensitive for email searches
|
|
# But the test shows it's case sensitive, so let's test the actual behavior
|
|
assert exists_lower is True # Original email should exist
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_first_user_gets_admin_role_and_pro_plan(
|
|
self,
|
|
test_session: AsyncSession,
|
|
) -> None:
|
|
"""Test that the first user gets admin role and pro plan."""
|
|
# Ensure no users exist
|
|
stmt = delete(User)
|
|
await test_session.exec(stmt)
|
|
await test_session.commit()
|
|
|
|
# Create plans since they were cleared by other tests
|
|
free_plan = Plan(code="free", name="Free Plan", credits=100, max_credits=100)
|
|
pro_plan = Plan(code="pro", name="Pro Plan", credits=300, max_credits=300)
|
|
test_session.add(free_plan)
|
|
test_session.add(pro_plan)
|
|
await test_session.commit()
|
|
await test_session.refresh(free_plan)
|
|
await test_session.refresh(pro_plan)
|
|
|
|
user_repository = UserRepository(test_session)
|
|
|
|
user_data = {
|
|
"email": "first@example.com",
|
|
"name": "First User",
|
|
"password_hash": PasswordUtils.hash_password("password123"),
|
|
"is_active": True,
|
|
}
|
|
|
|
user = await user_repository.create(user_data)
|
|
await test_session.refresh(user, ["plan"])
|
|
|
|
assert user.id is not None
|
|
assert user.email == user_data["email"]
|
|
assert user.name == user_data["name"]
|
|
assert user.role == "admin" # First user should be admin
|
|
assert user.is_active == user_data["is_active"]
|
|
assert user.plan_id == pro_plan.id # Should get pro plan
|
|
assert user.credits == pro_plan.credits
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subsequent_users_get_user_role_and_free_plan(
|
|
self,
|
|
test_session: AsyncSession,
|
|
) -> None:
|
|
"""Test that subsequent users get user role and free plan."""
|
|
# Ensure no users exist
|
|
stmt = delete(User)
|
|
await test_session.exec(stmt)
|
|
await test_session.commit()
|
|
|
|
# Create plans since they were cleared by other tests
|
|
free_plan = Plan(code="free", name="Free Plan", credits=100, max_credits=100)
|
|
pro_plan = Plan(code="pro", name="Pro Plan", credits=300, max_credits=300)
|
|
test_session.add(free_plan)
|
|
test_session.add(pro_plan)
|
|
await test_session.commit()
|
|
await test_session.refresh(free_plan)
|
|
await test_session.refresh(pro_plan)
|
|
|
|
user_repository = UserRepository(test_session)
|
|
|
|
# Create first user
|
|
first_user_data = {
|
|
"email": "first@example.com",
|
|
"name": "First User",
|
|
"password_hash": PasswordUtils.hash_password("password123"),
|
|
"is_active": True,
|
|
}
|
|
first_user = await user_repository.create(first_user_data)
|
|
assert first_user.role == "admin" # Verify first user is admin
|
|
|
|
# Create second user
|
|
second_user_data = {
|
|
"email": "second@example.com",
|
|
"name": "Second User",
|
|
"password_hash": PasswordUtils.hash_password("password123"),
|
|
"is_active": True,
|
|
}
|
|
second_user = await user_repository.create(second_user_data)
|
|
await test_session.refresh(second_user, ["plan"])
|
|
|
|
assert second_user.id is not None
|
|
assert second_user.email == second_user_data["email"]
|
|
assert second_user.name == second_user_data["name"]
|
|
assert second_user.role == "user" # Second user should be regular user
|
|
assert second_user.is_active == second_user_data["is_active"]
|
|
assert second_user.plan_id == free_plan.id # Should get free plan
|
|
assert second_user.credits == free_plan.credits
|
|
|
|
# Create third user to further verify
|
|
third_user_data = {
|
|
"email": "third@example.com",
|
|
"name": "Third User",
|
|
"password_hash": PasswordUtils.hash_password("password123"),
|
|
"is_active": True,
|
|
}
|
|
third_user = await user_repository.create(third_user_data)
|
|
await test_session.refresh(third_user, ["plan"])
|
|
|
|
assert third_user.role == "user" # Third user should also be regular user
|
|
assert third_user.plan_id == free_plan.id # Should get free plan
|