- Updated tests for listing users to validate pagination and response format. - Changed mock return values to include total count and pagination details. - Refactored user creation mocks for clarity and consistency. - Enhanced assertions to check for presence of pagination fields in responses. - Adjusted test cases for user retrieval and updates to ensure proper handling of user data. - Improved readability by restructuring mock definitions and assertions across various test files.
725 lines
25 KiB
Python
725 lines
25 KiB
Python
"""Tests for authentication endpoints."""
|
|
|
|
from datetime import UTC
|
|
from typing import Any
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from httpx import AsyncClient
|
|
|
|
from app.models.plan import Plan
|
|
from app.models.user import User
|
|
from app.services.auth import OAuthUserInfo
|
|
from app.utils.auth import JWTUtils
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def auth_cookies(test_user: User) -> dict[str, str]:
|
|
"""Create authentication cookies with JWT token."""
|
|
token_data = {
|
|
"sub": str(test_user.id),
|
|
"email": test_user.email,
|
|
"role": test_user.role,
|
|
}
|
|
|
|
access_token = JWTUtils.create_access_token(token_data)
|
|
|
|
return {"access_token": access_token}
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def admin_cookies(admin_user: User) -> dict[str, str]:
|
|
"""Create admin authentication cookies with JWT token."""
|
|
token_data = {
|
|
"sub": str(admin_user.id),
|
|
"email": admin_user.email,
|
|
"role": admin_user.role,
|
|
}
|
|
|
|
access_token = JWTUtils.create_access_token(token_data)
|
|
|
|
return {"access_token": access_token}
|
|
|
|
|
|
class TestAuthEndpoints:
|
|
"""Test authentication API endpoints."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_success(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user_data: dict[str, Any],
|
|
ensure_plans: tuple[Plan, Plan],
|
|
) -> None:
|
|
"""Test successful user registration."""
|
|
response = await test_client.post("/api/v1/auth/register", json=test_user_data)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
|
|
# Check user data in response (no token in response body with cookies)
|
|
assert data["email"] == test_user_data["email"]
|
|
assert data["name"] == test_user_data["name"]
|
|
assert data["role"] == "admin" # First user gets admin role
|
|
assert data["is_active"] is True
|
|
assert data["credits"] > 0
|
|
assert "plan" in data
|
|
|
|
# Check cookies are set - HTTPX AsyncClient preserves Set-Cookie headers
|
|
set_cookie_headers = response.headers.get_list("set-cookie")
|
|
cookie_names = [header.split("=")[0] for header in set_cookie_headers]
|
|
assert "access_token" in cookie_names
|
|
assert "refresh_token" in cookie_names
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_duplicate_email(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
) -> None:
|
|
"""Test registration with duplicate email."""
|
|
user_data = {
|
|
"email": test_user.email,
|
|
"password": "password123",
|
|
"name": "Another User",
|
|
}
|
|
|
|
response = await test_client.post("/api/v1/auth/register", json=user_data)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert "Email address is already registered" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_invalid_email(self, test_client: AsyncClient) -> None:
|
|
"""Test registration with invalid email."""
|
|
user_data = {
|
|
"email": "invalid-email",
|
|
"password": "password123",
|
|
"name": "Test User",
|
|
}
|
|
|
|
response = await test_client.post("/api/v1/auth/register", json=user_data)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_short_password(self, test_client: AsyncClient) -> None:
|
|
"""Test registration with short password."""
|
|
user_data = {
|
|
"email": "test@example.com",
|
|
"password": "short",
|
|
"name": "Test User",
|
|
}
|
|
|
|
response = await test_client.post("/api/v1/auth/register", json=user_data)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_missing_fields(self, test_client: AsyncClient) -> None:
|
|
"""Test registration with missing fields."""
|
|
user_data = {
|
|
"email": "test@example.com",
|
|
# Missing password and name
|
|
}
|
|
|
|
response = await test_client.post("/api/v1/auth/register", json=user_data)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_success(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
test_login_data: dict[str, str],
|
|
) -> None:
|
|
"""Test successful user login."""
|
|
response = await test_client.post("/api/v1/auth/login", json=test_login_data)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Check user data in response (no token in response body with cookies)
|
|
assert data["email"] == test_login_data["email"]
|
|
assert "name" in data
|
|
assert "role" in data
|
|
assert data["is_active"] is True
|
|
|
|
# Check cookies are set - HTTPX AsyncClient preserves Set-Cookie headers
|
|
set_cookie_headers = response.headers.get_list("set-cookie")
|
|
cookie_names = [header.split("=")[0] for header in set_cookie_headers]
|
|
assert "access_token" in cookie_names
|
|
assert "refresh_token" in cookie_names
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_invalid_email(self, test_client: AsyncClient) -> None:
|
|
"""Test login with invalid email."""
|
|
login_data = {"email": "nonexistent@example.com", "password": "password123"}
|
|
|
|
response = await test_client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Invalid email or password" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_invalid_password(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
) -> None:
|
|
"""Test login with invalid password."""
|
|
login_data = {"email": test_user.email, "password": "wrongpassword"}
|
|
|
|
response = await test_client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Invalid email or password" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_malformed_request(self, test_client: AsyncClient) -> None:
|
|
"""Test login with malformed request."""
|
|
login_data = {"email": "invalid-email", "password": "password123"}
|
|
|
|
response = await test_client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_user_success(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
auth_cookies: dict[str, str],
|
|
) -> None:
|
|
"""Test getting current user info successfully."""
|
|
# Set cookies on client instance to avoid deprecation warning
|
|
test_client.cookies.update(auth_cookies)
|
|
response = await test_client.get("/api/v1/auth/me")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Check user data structure
|
|
assert "id" in data
|
|
assert "email" in data
|
|
assert "name" in data
|
|
assert "role" in data
|
|
assert data["is_active"] is True
|
|
assert "plan" in data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_user_no_token(self, test_client: AsyncClient) -> None:
|
|
"""Test getting current user without authentication token."""
|
|
response = await test_client.get("/api/v1/auth/me")
|
|
|
|
assert response.status_code == 401 # Unauthorized (no cookie provided)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_user_invalid_token(
|
|
self,
|
|
test_client: AsyncClient,
|
|
) -> None:
|
|
"""Test getting current user with invalid token."""
|
|
# Set invalid cookies on client instance
|
|
test_client.cookies.update({"access_token": "invalid_token"})
|
|
response = await test_client.get("/api/v1/auth/me")
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Could not validate credentials" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_user_expired_token(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
) -> None:
|
|
"""Test getting current user with expired token."""
|
|
from datetime import timedelta
|
|
|
|
from app.utils.auth import JWTUtils
|
|
|
|
# Create an expired token (expires immediately)
|
|
token_data = {
|
|
"sub": "1", # Use a dummy user ID
|
|
"email": "test@example.com",
|
|
"role": "user",
|
|
}
|
|
expired_token = JWTUtils.create_access_token(
|
|
token_data,
|
|
expires_delta=timedelta(seconds=-1),
|
|
)
|
|
|
|
# Set expired cookies on client instance
|
|
test_client.cookies.update({"access_token": expired_token})
|
|
response = await test_client.get("/api/v1/auth/me")
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
# The actual error message comes from the JWT library for expired tokens
|
|
assert "Token has expired" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_logout_success(self, test_client: AsyncClient) -> None:
|
|
"""Test logout endpoint."""
|
|
# Logout should work even without cookies (just clears them)
|
|
test_client.cookies.update({"access_token": "", "refresh_token": ""})
|
|
response = await test_client.post("/api/v1/auth/logout")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "Successfully logged out" in data["message"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_admin_access_with_user_role(
|
|
self,
|
|
test_client: AsyncClient,
|
|
auth_cookies: dict[str, str],
|
|
) -> None:
|
|
"""Test that regular users cannot access admin endpoints."""
|
|
# This test would be for admin-only endpoints when they're created
|
|
# For now, we'll test the dependency behavior
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
|
|
from app.core.dependencies import get_admin_user
|
|
from app.models.user import User
|
|
|
|
# Create a mock user with regular role
|
|
regular_user = User(
|
|
id=1,
|
|
email="user@example.com",
|
|
name="Regular User",
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=1,
|
|
credits=100,
|
|
)
|
|
|
|
# Test that get_admin_user raises exception for regular user
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await get_admin_user(regular_user)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
assert "Not enough permissions" in exc_info.value.detail
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_admin_access_with_admin_role(
|
|
self,
|
|
test_client: AsyncClient,
|
|
admin_cookies: dict[str, str],
|
|
) -> None:
|
|
"""Test that admin users can access admin endpoints."""
|
|
from app.core.dependencies import get_admin_user
|
|
from app.models.user import User
|
|
|
|
# Create a mock admin user
|
|
admin_user = User(
|
|
id=1,
|
|
email="admin@example.com",
|
|
name="Admin User",
|
|
role="admin",
|
|
is_active=True,
|
|
plan_id=1,
|
|
credits=1000,
|
|
)
|
|
|
|
# Test that get_admin_user passes for admin user
|
|
result = await get_admin_user(admin_user)
|
|
assert result == admin_user
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_oauth_providers(self, test_client: AsyncClient) -> None:
|
|
"""Test getting list of OAuth providers."""
|
|
response = await test_client.get("/api/v1/auth/providers")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "providers" in data
|
|
assert "google" in data["providers"]
|
|
assert "github" in data["providers"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oauth_authorize_google(self, test_client: AsyncClient) -> None:
|
|
"""Test OAuth authorization URL generation for Google."""
|
|
with patch("app.services.oauth.OAuthService.generate_state") as mock_state:
|
|
mock_state.return_value = "test_state_123"
|
|
|
|
response = await test_client.get("/api/v1/auth/google/authorize")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "authorization_url" in data
|
|
assert "state" in data
|
|
assert data["state"] == "test_state_123"
|
|
assert "accounts.google.com" in data["authorization_url"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oauth_authorize_github(self, test_client: AsyncClient) -> None:
|
|
"""Test OAuth authorization URL generation for GitHub."""
|
|
with patch("app.services.oauth.OAuthService.generate_state") as mock_state:
|
|
mock_state.return_value = "test_state_456"
|
|
|
|
response = await test_client.get("/api/v1/auth/github/authorize")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "authorization_url" in data
|
|
assert "state" in data
|
|
assert data["state"] == "test_state_456"
|
|
assert "github.com" in data["authorization_url"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oauth_authorize_invalid_provider(
|
|
self,
|
|
test_client: AsyncClient,
|
|
) -> None:
|
|
"""Test OAuth authorization with invalid provider."""
|
|
response = await test_client.get("/api/v1/auth/invalid/authorize")
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert "Unsupported OAuth provider" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oauth_callback_new_user(
|
|
self,
|
|
test_client: AsyncClient,
|
|
ensure_plans: tuple[Any, Any],
|
|
) -> None:
|
|
"""Test OAuth callback for new user creation."""
|
|
# Mock OAuth user info
|
|
mock_user_info = OAuthUserInfo(
|
|
provider="google",
|
|
provider_user_id="google_123",
|
|
email="newuser@gmail.com",
|
|
name="New User",
|
|
picture="https://example.com/avatar.jpg",
|
|
)
|
|
|
|
# Mock the entire handle_callback method to avoid actual OAuth API calls
|
|
with patch("app.services.oauth.OAuthService.handle_callback") as mock_callback:
|
|
mock_callback.return_value = mock_user_info
|
|
|
|
response = await test_client.get(
|
|
"/api/v1/auth/google/callback",
|
|
params={"code": "auth_code_123", "state": "test_state"},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
# OAuth callback should successfully process and redirect to frontend with temp code
|
|
assert response.status_code == 302
|
|
location = response.headers["location"]
|
|
assert location.startswith("http://localhost:8001/auth/callback?code=")
|
|
|
|
# The fact that we get a 302 redirect means the OAuth login was successful
|
|
# Detailed cookie testing can be done in integration tests
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oauth_callback_existing_user_link(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: Any,
|
|
ensure_plans: tuple[Any, Any],
|
|
) -> None:
|
|
"""Test OAuth callback for linking to existing user."""
|
|
# Mock OAuth user info with same email as test user
|
|
mock_user_info = OAuthUserInfo(
|
|
provider="github",
|
|
provider_user_id="github_456",
|
|
email=test_user.email, # Same email as existing user
|
|
name="Test User",
|
|
picture="https://github.com/avatar.jpg",
|
|
)
|
|
|
|
# Mock the entire handle_callback method to avoid actual OAuth API calls
|
|
with patch("app.services.oauth.OAuthService.handle_callback") as mock_callback:
|
|
mock_callback.return_value = mock_user_info
|
|
|
|
response = await test_client.get(
|
|
"/api/v1/auth/github/callback",
|
|
params={"code": "auth_code_456", "state": "test_state"},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
# OAuth callback should successfully process and redirect to frontend with temp code
|
|
assert response.status_code == 302
|
|
location = response.headers["location"]
|
|
assert location.startswith("http://localhost:8001/auth/callback?code=")
|
|
|
|
# The fact that we get a 302 redirect means the OAuth login was successful
|
|
# Detailed cookie testing can be done in integration tests
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oauth_callback_missing_code(self, test_client: AsyncClient) -> None:
|
|
"""Test OAuth callback with missing authorization code."""
|
|
response = await test_client.get(
|
|
"/api/v1/auth/google/callback",
|
|
params={"state": "test_state"}, # Missing code parameter
|
|
)
|
|
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oauth_callback_invalid_provider(
|
|
self,
|
|
test_client: AsyncClient,
|
|
) -> None:
|
|
"""Test OAuth callback with invalid provider."""
|
|
response = await test_client.get(
|
|
"/api/v1/auth/invalid/callback",
|
|
params={"code": "auth_code_123", "state": "test_state"},
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert "Unsupported OAuth provider" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_refresh_token_success(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
test_plan: Plan,
|
|
) -> None:
|
|
"""Test refresh token success."""
|
|
with patch(
|
|
"app.services.auth.AuthService.refresh_access_token",
|
|
) as mock_refresh:
|
|
mock_refresh.return_value = type(
|
|
"TokenResponse",
|
|
(),
|
|
{
|
|
"access_token": "new_access_token",
|
|
"expires_in": 3600,
|
|
},
|
|
)()
|
|
|
|
response = await test_client.post(
|
|
"/api/v1/auth/refresh",
|
|
cookies={"refresh_token": "valid_refresh_token"},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["message"] == "Token refreshed successfully"
|
|
mock_refresh.assert_called_once_with("valid_refresh_token")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_refresh_token_no_token(self, test_client: AsyncClient) -> None:
|
|
"""Test refresh token without providing refresh token."""
|
|
response = await test_client.post("/api/v1/auth/refresh")
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "No refresh token provided" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_refresh_token_service_error(self, test_client: AsyncClient) -> None:
|
|
"""Test refresh token with service error."""
|
|
with patch(
|
|
"app.services.auth.AuthService.refresh_access_token",
|
|
) as mock_refresh:
|
|
mock_refresh.side_effect = Exception("Database error")
|
|
|
|
response = await test_client.post(
|
|
"/api/v1/auth/refresh",
|
|
cookies={"refresh_token": "valid_refresh_token"},
|
|
)
|
|
|
|
assert response.status_code == 500
|
|
data = response.json()
|
|
assert "Token refresh failed" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exchange_oauth_token_invalid_code(
|
|
self,
|
|
test_client: AsyncClient,
|
|
) -> None:
|
|
"""Test OAuth token exchange with invalid code."""
|
|
response = await test_client.post(
|
|
"/api/v1/auth/exchange-oauth-token",
|
|
json={"code": "invalid_code"},
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert "Invalid or expired OAuth code" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile_success(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
auth_cookies: dict[str, str],
|
|
) -> None:
|
|
"""Test update profile success."""
|
|
with (
|
|
patch("app.services.auth.AuthService.update_user_profile") as mock_update,
|
|
patch(
|
|
"app.services.auth.AuthService.user_to_response",
|
|
) as mock_user_to_response,
|
|
):
|
|
updated_user = User(
|
|
id=test_user.id,
|
|
email=test_user.email,
|
|
name="Updated Name",
|
|
role=test_user.role,
|
|
credits=test_user.credits,
|
|
plan_id=test_user.plan_id,
|
|
is_active=test_user.is_active,
|
|
)
|
|
mock_update.return_value = updated_user
|
|
|
|
# Mock the user_to_response to return UserResponse format
|
|
from app.schemas.auth import UserResponse
|
|
|
|
mock_user_to_response.return_value = UserResponse(
|
|
id=test_user.id,
|
|
email=test_user.email,
|
|
name="Updated Name",
|
|
picture=None,
|
|
role=test_user.role,
|
|
credits=test_user.credits,
|
|
is_active=test_user.is_active,
|
|
plan={
|
|
"id": test_user.plan_id,
|
|
"name": "Test Plan",
|
|
"max_credits": 100,
|
|
"features": [],
|
|
},
|
|
created_at=test_user.created_at,
|
|
updated_at=test_user.updated_at,
|
|
)
|
|
|
|
response = await test_client.patch(
|
|
"/api/v1/auth/me",
|
|
json={"name": "Updated Name"},
|
|
cookies=auth_cookies,
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["name"] == "Updated Name"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_profile_unauthenticated(
|
|
self, test_client: AsyncClient,
|
|
) -> None:
|
|
"""Test update profile without authentication."""
|
|
response = await test_client.patch(
|
|
"/api/v1/auth/me",
|
|
json={"name": "Updated Name"},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_change_password_success(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
auth_cookies: dict[str, str],
|
|
) -> None:
|
|
"""Test change password success."""
|
|
with patch("app.services.auth.AuthService.change_user_password") as mock_change:
|
|
mock_change.return_value = None
|
|
|
|
response = await test_client.post(
|
|
"/api/v1/auth/change-password",
|
|
json={
|
|
"current_password": "old_password",
|
|
"new_password": "new_password",
|
|
},
|
|
cookies=auth_cookies,
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["message"] == "Password changed successfully"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_change_password_unauthenticated(
|
|
self, test_client: AsyncClient,
|
|
) -> None:
|
|
"""Test change password without authentication."""
|
|
response = await test_client.post(
|
|
"/api/v1/auth/change-password",
|
|
json={
|
|
"current_password": "old_password",
|
|
"new_password": "new_password",
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_user_providers_success(
|
|
self,
|
|
test_client: AsyncClient,
|
|
test_user: User,
|
|
auth_cookies: dict[str, str],
|
|
) -> None:
|
|
"""Test get user OAuth providers success."""
|
|
with patch(
|
|
"app.services.auth.AuthService.get_user_oauth_providers",
|
|
) as mock_providers:
|
|
from datetime import datetime
|
|
|
|
from app.models.user_oauth import UserOauth
|
|
|
|
mock_oauth_google = UserOauth(
|
|
id=1,
|
|
user_id=test_user.id,
|
|
provider="google",
|
|
provider_user_id="google123",
|
|
email="test@example.com",
|
|
name="Test User",
|
|
created_at=datetime.now(UTC),
|
|
updated_at=datetime.now(UTC),
|
|
)
|
|
mock_oauth_github = UserOauth(
|
|
id=2,
|
|
user_id=test_user.id,
|
|
provider="github",
|
|
provider_user_id="github456",
|
|
email="test@example.com",
|
|
name="Test User",
|
|
created_at=datetime.now(UTC),
|
|
updated_at=datetime.now(UTC),
|
|
)
|
|
mock_providers.return_value = [mock_oauth_google, mock_oauth_github]
|
|
|
|
response = await test_client.get(
|
|
"/api/v1/auth/user-providers",
|
|
cookies=auth_cookies,
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) == 3 # password + 2 OAuth providers
|
|
|
|
# Check password provider (first)
|
|
assert data[0]["provider"] == "password"
|
|
assert data[0]["display_name"] == "Password"
|
|
|
|
# Check OAuth providers
|
|
assert data[1]["provider"] == "google"
|
|
assert data[1]["display_name"] == "Google"
|
|
assert data[2]["provider"] == "github"
|
|
assert data[2]["display_name"] == "GitHub"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_user_providers_unauthenticated(
|
|
self, test_client: AsyncClient,
|
|
) -> None:
|
|
"""Test get user OAuth providers without authentication."""
|
|
response = await test_client.get("/api/v1/auth/user-providers")
|
|
|
|
assert response.status_code == 401
|