Files
sdb2-backend/tests/repositories/test_user_oauth.py

269 lines
9.1 KiB
Python

"""Tests for user OAuth repository."""
from collections.abc import AsyncGenerator
import pytest
import pytest_asyncio
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.user import User
from app.models.user_oauth import UserOauth
from app.repositories.user_oauth import UserOauthRepository
class TestUserOauthRepository:
"""Test user OAuth repository operations."""
@pytest_asyncio.fixture
async def user_oauth_repository(
self,
test_session: AsyncSession,
) -> AsyncGenerator[UserOauthRepository, None]:
"""Create a user OAuth repository instance."""
yield UserOauthRepository(test_session)
@pytest_asyncio.fixture
async def test_user_id(
self,
test_user: User,
) -> int:
"""Get test user ID to avoid lazy loading issues."""
assert test_user.id is not None
return test_user.id
@pytest_asyncio.fixture
async def test_oauth(
self,
test_session: AsyncSession,
test_user_id: int,
) -> AsyncGenerator[UserOauth, None]:
"""Create a test OAuth record."""
oauth_data = {
"user_id": test_user_id,
"provider": "google",
"provider_user_id": "google_123456",
"email": "test@gmail.com",
"name": "Test User Google",
"picture": None,
}
oauth = UserOauth(**oauth_data)
test_session.add(oauth)
await test_session.commit()
await test_session.refresh(oauth)
yield oauth
@pytest.mark.asyncio
async def test_get_by_provider_user_id_existing(
self,
user_oauth_repository: UserOauthRepository,
test_oauth: UserOauth,
) -> None:
"""Test getting OAuth by provider user ID when it exists."""
oauth = await user_oauth_repository.get_by_provider_user_id(
"google", "google_123456"
)
assert oauth is not None
assert oauth.id == test_oauth.id
assert oauth.provider == "google"
assert oauth.provider_user_id == "google_123456"
assert oauth.user_id == test_oauth.user_id
@pytest.mark.asyncio
async def test_get_by_provider_user_id_nonexistent(
self,
user_oauth_repository: UserOauthRepository,
) -> None:
"""Test getting OAuth by provider user ID when it doesn't exist."""
oauth = await user_oauth_repository.get_by_provider_user_id(
"google", "nonexistent_id"
)
assert oauth is None
@pytest.mark.asyncio
async def test_get_by_user_id_and_provider_existing(
self,
user_oauth_repository: UserOauthRepository,
test_oauth: UserOauth,
test_user_id: int,
) -> None:
"""Test getting OAuth by user ID and provider when it exists."""
oauth = await user_oauth_repository.get_by_user_id_and_provider(
test_user_id, "google"
)
assert oauth is not None
assert oauth.id == test_oauth.id
assert oauth.provider == "google"
assert oauth.user_id == test_user_id
@pytest.mark.asyncio
async def test_get_by_user_id_and_provider_nonexistent(
self,
user_oauth_repository: UserOauthRepository,
test_user_id: int,
) -> None:
"""Test getting OAuth by user ID and provider when it doesn't exist."""
oauth = await user_oauth_repository.get_by_user_id_and_provider(
test_user_id, "github"
)
assert oauth is None
@pytest.mark.asyncio
async def test_create_oauth(
self,
user_oauth_repository: UserOauthRepository,
test_user_id: int,
) -> None:
"""Test creating a new OAuth record."""
oauth_data = {
"user_id": test_user_id,
"provider": "github",
"provider_user_id": "github_789",
"email": "test@github.com",
"name": "Test User GitHub",
"picture": None,
}
oauth = await user_oauth_repository.create(oauth_data)
assert oauth.id is not None
assert oauth.user_id == test_user_id
assert oauth.provider == "github"
assert oauth.provider_user_id == "github_789"
assert oauth.email == "test@github.com"
assert oauth.name == "Test User GitHub"
@pytest.mark.asyncio
async def test_update_oauth(
self,
user_oauth_repository: UserOauthRepository,
test_oauth: UserOauth,
) -> None:
"""Test updating an OAuth record."""
update_data = {
"email": "updated@gmail.com",
"name": "Updated User Name",
"picture": "https://example.com/photo.jpg",
}
updated_oauth = await user_oauth_repository.update(test_oauth, update_data)
assert updated_oauth.id == test_oauth.id
assert updated_oauth.email == "updated@gmail.com"
assert updated_oauth.name == "Updated User Name"
assert updated_oauth.picture == "https://example.com/photo.jpg"
assert updated_oauth.provider == test_oauth.provider # Unchanged
assert updated_oauth.provider_user_id == test_oauth.provider_user_id # Unchanged
@pytest.mark.asyncio
async def test_delete_oauth(
self,
user_oauth_repository: UserOauthRepository,
test_session: AsyncSession,
test_user_id: int,
) -> None:
"""Test deleting an OAuth record."""
# Create an OAuth record to delete
oauth_data = {
"user_id": test_user_id,
"provider": "twitter",
"provider_user_id": "twitter_456",
"email": "test@twitter.com",
"name": "Test User Twitter",
"picture": None,
}
oauth = await user_oauth_repository.create(oauth_data)
oauth_id = oauth.id
# Delete the OAuth record
await user_oauth_repository.delete(oauth)
# Verify it's deleted by trying to find it
deleted_oauth = await user_oauth_repository.get_by_provider_user_id(
"twitter", "twitter_456"
)
assert deleted_oauth is None
@pytest.mark.asyncio
async def test_create_duplicate_provider_user_id(
self,
user_oauth_repository: UserOauthRepository,
test_oauth: UserOauth,
test_user_id: int,
) -> None:
"""Test creating OAuth with duplicate provider user ID should fail."""
# Try to create another OAuth with the same provider and provider_user_id
duplicate_oauth_data = {
"user_id": test_user_id,
"provider": "google",
"provider_user_id": "google_123456", # Same as test_oauth
"email": "another@gmail.com",
"name": "Another User",
"picture": None,
}
# This should fail due to unique constraint
with pytest.raises(Exception): # SQLAlchemy IntegrityError or similar
await user_oauth_repository.create(duplicate_oauth_data)
@pytest.mark.asyncio
async def test_multiple_providers_same_user(
self,
user_oauth_repository: UserOauthRepository,
test_user_id: int,
) -> None:
"""Test that a user can have multiple OAuth providers."""
# Create Google OAuth
google_oauth_data = {
"user_id": test_user_id,
"provider": "google",
"provider_user_id": "google_user_1",
"email": "user@gmail.com",
"name": "Test User Google",
"picture": None,
}
google_oauth = await user_oauth_repository.create(google_oauth_data)
# Create GitHub OAuth for the same user
github_oauth_data = {
"user_id": test_user_id,
"provider": "github",
"provider_user_id": "github_user_1",
"email": "user@github.com",
"name": "Test User GitHub",
"picture": None,
}
github_oauth = await user_oauth_repository.create(github_oauth_data)
# Verify both exist by querying back from database
found_google = await user_oauth_repository.get_by_user_id_and_provider(
test_user_id, "google"
)
found_github = await user_oauth_repository.get_by_user_id_and_provider(
test_user_id, "github"
)
assert found_google is not None
assert found_github is not None
assert found_google.provider == "google"
assert found_github.provider == "github"
assert found_google.user_id == test_user_id
assert found_github.user_id == test_user_id
assert found_google.provider_user_id == "google_user_1"
assert found_github.provider_user_id == "github_user_1"
# Verify we can also find them by provider_user_id
found_google_by_provider = await user_oauth_repository.get_by_provider_user_id(
"google", "google_user_1"
)
found_github_by_provider = await user_oauth_repository.get_by_provider_user_id(
"github", "github_user_1"
)
assert found_google_by_provider is not None
assert found_github_by_provider is not None
assert found_google_by_provider.user_id == test_user_id
assert found_github_by_provider.user_id == test_user_id