feat: Add tests for dashboard service including statistics and date filters
This commit is contained in:
490
tests/api/v1/admin/test_users_endpoints.py
Normal file
490
tests/api/v1/admin/test_users_endpoints.py
Normal file
@@ -0,0 +1,490 @@
|
||||
"""Tests for admin user endpoints."""
|
||||
|
||||
from unittest.mock import AsyncMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
from httpx import AsyncClient
|
||||
|
||||
from app.models.plan import Plan
|
||||
from app.models.user import User
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_user_repository():
|
||||
"""Mock user repository."""
|
||||
return Mock()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_plan_repository():
|
||||
"""Mock plan repository."""
|
||||
return Mock()
|
||||
|
||||
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def regular_user():
|
||||
"""Create regular user for testing."""
|
||||
return User(
|
||||
id=2,
|
||||
email="user@example.com",
|
||||
name="Regular User",
|
||||
role="user",
|
||||
credits=100,
|
||||
plan_id=1,
|
||||
is_active=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_plan():
|
||||
"""Create test plan."""
|
||||
return Plan(
|
||||
id=1,
|
||||
name="Basic",
|
||||
max_credits=100,
|
||||
features=["feature1", "feature2"],
|
||||
)
|
||||
|
||||
|
||||
class TestAdminUserEndpoints:
|
||||
"""Test admin user endpoints."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_users_success(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
regular_user: User,
|
||||
test_plan: Plan,
|
||||
) -> None:
|
||||
"""Test listing users successfully."""
|
||||
with patch("app.repositories.user.UserRepository.get_all_with_plan") as mock_get_all:
|
||||
# Create mock user objects that don't trigger database saves
|
||||
mock_admin = type("User", (), {
|
||||
"id": admin_user.id,
|
||||
"email": admin_user.email,
|
||||
"name": admin_user.name,
|
||||
"picture": None,
|
||||
"role": admin_user.role,
|
||||
"credits": admin_user.credits,
|
||||
"is_active": admin_user.is_active,
|
||||
"created_at": admin_user.created_at,
|
||||
"updated_at": admin_user.updated_at,
|
||||
"plan": type("Plan", (), {
|
||||
"id": test_plan.id,
|
||||
"name": test_plan.name,
|
||||
"max_credits": test_plan.max_credits,
|
||||
})()
|
||||
})()
|
||||
|
||||
mock_regular = type("User", (), {
|
||||
"id": regular_user.id,
|
||||
"email": regular_user.email,
|
||||
"name": regular_user.name,
|
||||
"picture": None,
|
||||
"role": regular_user.role,
|
||||
"credits": regular_user.credits,
|
||||
"is_active": regular_user.is_active,
|
||||
"created_at": regular_user.created_at,
|
||||
"updated_at": regular_user.updated_at,
|
||||
"plan": type("Plan", (), {
|
||||
"id": test_plan.id,
|
||||
"name": test_plan.name,
|
||||
"max_credits": test_plan.max_credits,
|
||||
})()
|
||||
})()
|
||||
|
||||
mock_get_all.return_value = [mock_admin, mock_regular]
|
||||
|
||||
response = await authenticated_admin_client.get("/api/v1/admin/users/")
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert len(data) == 2
|
||||
assert data[0]["email"] == "admin@example.com"
|
||||
assert data[1]["email"] == "user@example.com"
|
||||
mock_get_all.assert_called_once_with(limit=100, offset=0)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_users_with_pagination(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
test_plan: Plan,
|
||||
) -> None:
|
||||
"""Test listing users with pagination."""
|
||||
with patch("app.repositories.user.UserRepository.get_all_with_plan") as mock_get_all:
|
||||
mock_admin = type("User", (), {
|
||||
"id": admin_user.id,
|
||||
"email": admin_user.email,
|
||||
"name": admin_user.name,
|
||||
"picture": None,
|
||||
"role": admin_user.role,
|
||||
"credits": admin_user.credits,
|
||||
"is_active": admin_user.is_active,
|
||||
"created_at": admin_user.created_at,
|
||||
"updated_at": admin_user.updated_at,
|
||||
"plan": type("Plan", (), {
|
||||
"id": test_plan.id,
|
||||
"name": test_plan.name,
|
||||
"max_credits": test_plan.max_credits,
|
||||
})()
|
||||
})()
|
||||
mock_get_all.return_value = [mock_admin]
|
||||
|
||||
response = await authenticated_admin_client.get("/api/v1/admin/users/?limit=10&offset=5")
|
||||
|
||||
assert response.status_code == 200
|
||||
mock_get_all.assert_called_once_with(limit=10, offset=5)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_users_unauthenticated(self, client: AsyncClient) -> None:
|
||||
"""Test listing users without authentication."""
|
||||
response = await client.get("/api/v1/admin/users/")
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_users_non_admin(
|
||||
self,
|
||||
client: AsyncClient,
|
||||
regular_user: User,
|
||||
) -> None:
|
||||
"""Test listing users as non-admin user."""
|
||||
with patch("app.core.dependencies.get_current_active_user", return_value=regular_user):
|
||||
response = await client.get("/api/v1/admin/users/")
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_user_success(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
regular_user: User,
|
||||
test_plan: Plan,
|
||||
) -> None:
|
||||
"""Test getting specific user successfully."""
|
||||
with (
|
||||
patch("app.core.dependencies.get_admin_user", return_value=admin_user),
|
||||
patch("app.repositories.user.UserRepository.get_by_id_with_plan") as mock_get_by_id,
|
||||
):
|
||||
mock_user = type("User", (), {
|
||||
"id": regular_user.id,
|
||||
"email": regular_user.email,
|
||||
"name": regular_user.name,
|
||||
"picture": None,
|
||||
"role": regular_user.role,
|
||||
"credits": regular_user.credits,
|
||||
"is_active": regular_user.is_active,
|
||||
"created_at": regular_user.created_at,
|
||||
"updated_at": regular_user.updated_at,
|
||||
"plan": type("Plan", (), {
|
||||
"id": test_plan.id,
|
||||
"name": test_plan.name,
|
||||
"max_credits": test_plan.max_credits,
|
||||
})()
|
||||
})()
|
||||
mock_get_by_id.return_value = mock_user
|
||||
|
||||
response = await authenticated_admin_client.get("/api/v1/admin/users/2")
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["id"] == 2
|
||||
assert data["email"] == "user@example.com"
|
||||
assert data["name"] == "Regular User"
|
||||
mock_get_by_id.assert_called_once_with(2)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_user_not_found(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
) -> None:
|
||||
"""Test getting non-existent user."""
|
||||
with (
|
||||
patch("app.core.dependencies.get_admin_user", return_value=admin_user),
|
||||
patch("app.repositories.user.UserRepository.get_by_id_with_plan", return_value=None),
|
||||
):
|
||||
response = await authenticated_admin_client.get("/api/v1/admin/users/999")
|
||||
|
||||
assert response.status_code == 404
|
||||
data = response.json()
|
||||
assert "User not found" in data["detail"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_user_success(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
regular_user: User,
|
||||
test_plan: Plan,
|
||||
) -> None:
|
||||
"""Test updating user successfully."""
|
||||
with (
|
||||
patch("app.core.dependencies.get_admin_user", return_value=admin_user),
|
||||
patch("app.repositories.user.UserRepository.get_by_id_with_plan") as mock_get_by_id,
|
||||
patch("app.repositories.user.UserRepository.update") as mock_update,
|
||||
patch("app.repositories.plan.PlanRepository.get_by_id", return_value=test_plan),
|
||||
):
|
||||
mock_user = type("User", (), {
|
||||
"id": regular_user.id,
|
||||
"email": regular_user.email,
|
||||
"name": regular_user.name,
|
||||
"picture": None,
|
||||
"role": regular_user.role,
|
||||
"credits": regular_user.credits,
|
||||
"is_active": regular_user.is_active,
|
||||
"created_at": regular_user.created_at,
|
||||
"updated_at": regular_user.updated_at,
|
||||
"plan": type("Plan", (), {
|
||||
"id": test_plan.id,
|
||||
"name": test_plan.name,
|
||||
"max_credits": test_plan.max_credits,
|
||||
})()
|
||||
})()
|
||||
|
||||
updated_mock = type("User", (), {
|
||||
"id": regular_user.id,
|
||||
"email": regular_user.email,
|
||||
"name": "Updated Name",
|
||||
"picture": None,
|
||||
"role": regular_user.role,
|
||||
"credits": 200,
|
||||
"is_active": regular_user.is_active,
|
||||
"created_at": regular_user.created_at,
|
||||
"updated_at": regular_user.updated_at,
|
||||
"plan": type("Plan", (), {
|
||||
"id": test_plan.id,
|
||||
"name": test_plan.name,
|
||||
"max_credits": test_plan.max_credits,
|
||||
})()
|
||||
})()
|
||||
|
||||
mock_get_by_id.return_value = mock_user
|
||||
mock_update.return_value = updated_mock
|
||||
|
||||
# Mock session.refresh to prevent actual database calls
|
||||
async def mock_refresh(instance, attributes=None):
|
||||
pass
|
||||
|
||||
with patch("sqlmodel.ext.asyncio.session.AsyncSession.refresh", side_effect=mock_refresh):
|
||||
response = await authenticated_admin_client.patch(
|
||||
"/api/v1/admin/users/2",
|
||||
json={
|
||||
"name": "Updated Name",
|
||||
"credits": 200,
|
||||
"plan_id": 1,
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["name"] == "Updated Name"
|
||||
assert data["credits"] == 200
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_user_not_found(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
) -> None:
|
||||
"""Test updating non-existent user."""
|
||||
with (
|
||||
patch("app.core.dependencies.get_admin_user", return_value=admin_user),
|
||||
patch("app.repositories.user.UserRepository.get_by_id_with_plan", return_value=None),
|
||||
):
|
||||
response = await authenticated_admin_client.patch(
|
||||
"/api/v1/admin/users/999",
|
||||
json={"name": "Updated Name"}
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
||||
data = response.json()
|
||||
assert "User not found" in data["detail"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_user_invalid_plan(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
regular_user: User,
|
||||
) -> None:
|
||||
"""Test updating user with invalid plan."""
|
||||
with (
|
||||
patch("app.core.dependencies.get_admin_user", return_value=admin_user),
|
||||
patch("app.repositories.user.UserRepository.get_by_id_with_plan") as mock_get_by_id,
|
||||
patch("app.repositories.plan.PlanRepository.get_by_id", return_value=None),
|
||||
):
|
||||
mock_user = type("User", (), {
|
||||
"id": regular_user.id,
|
||||
"email": regular_user.email,
|
||||
"name": regular_user.name,
|
||||
"picture": None,
|
||||
"role": regular_user.role,
|
||||
"credits": regular_user.credits,
|
||||
"is_active": regular_user.is_active,
|
||||
"created_at": regular_user.created_at,
|
||||
"updated_at": regular_user.updated_at,
|
||||
"plan": type("Plan", (), {
|
||||
"id": 1,
|
||||
"name": "Basic",
|
||||
"max_credits": 100,
|
||||
})()
|
||||
})()
|
||||
mock_get_by_id.return_value = mock_user
|
||||
response = await authenticated_admin_client.patch(
|
||||
"/api/v1/admin/users/2",
|
||||
json={"plan_id": 999}
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
||||
data = response.json()
|
||||
assert "Plan not found" in data["detail"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disable_user_success(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
regular_user: User,
|
||||
test_plan: Plan,
|
||||
) -> None:
|
||||
"""Test disabling user successfully."""
|
||||
with (
|
||||
patch("app.core.dependencies.get_admin_user", return_value=admin_user),
|
||||
patch("app.repositories.user.UserRepository.get_by_id_with_plan") as mock_get_by_id,
|
||||
patch("app.repositories.user.UserRepository.update") as mock_update,
|
||||
):
|
||||
mock_user = type("User", (), {
|
||||
"id": regular_user.id,
|
||||
"email": regular_user.email,
|
||||
"name": regular_user.name,
|
||||
"picture": None,
|
||||
"role": regular_user.role,
|
||||
"credits": regular_user.credits,
|
||||
"is_active": regular_user.is_active,
|
||||
"created_at": regular_user.created_at,
|
||||
"updated_at": regular_user.updated_at,
|
||||
"plan": type("Plan", (), {
|
||||
"id": test_plan.id,
|
||||
"name": test_plan.name,
|
||||
"max_credits": test_plan.max_credits,
|
||||
})()
|
||||
})()
|
||||
mock_get_by_id.return_value = mock_user
|
||||
mock_update.return_value = mock_user
|
||||
|
||||
response = await authenticated_admin_client.post("/api/v1/admin/users/2/disable")
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["message"] == "User disabled successfully"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disable_user_not_found(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
) -> None:
|
||||
"""Test disabling non-existent user."""
|
||||
with (
|
||||
patch("app.core.dependencies.get_admin_user", return_value=admin_user),
|
||||
patch("app.repositories.user.UserRepository.get_by_id_with_plan", return_value=None),
|
||||
):
|
||||
response = await authenticated_admin_client.post("/api/v1/admin/users/999/disable")
|
||||
|
||||
assert response.status_code == 404
|
||||
data = response.json()
|
||||
assert "User not found" in data["detail"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enable_user_success(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
test_plan: Plan,
|
||||
) -> None:
|
||||
"""Test enabling user successfully."""
|
||||
disabled_user = User(
|
||||
id=3,
|
||||
email="disabled@example.com",
|
||||
name="Disabled User",
|
||||
role="user",
|
||||
credits=100,
|
||||
plan_id=1,
|
||||
is_active=False,
|
||||
)
|
||||
|
||||
with (
|
||||
patch("app.core.dependencies.get_admin_user", return_value=admin_user),
|
||||
patch("app.repositories.user.UserRepository.get_by_id_with_plan") as mock_get_by_id,
|
||||
patch("app.repositories.user.UserRepository.update") as mock_update,
|
||||
):
|
||||
mock_disabled_user = type("User", (), {
|
||||
"id": disabled_user.id,
|
||||
"email": disabled_user.email,
|
||||
"name": disabled_user.name,
|
||||
"picture": None,
|
||||
"role": disabled_user.role,
|
||||
"credits": disabled_user.credits,
|
||||
"is_active": disabled_user.is_active,
|
||||
"created_at": disabled_user.created_at,
|
||||
"updated_at": disabled_user.updated_at,
|
||||
"plan": type("Plan", (), {
|
||||
"id": test_plan.id,
|
||||
"name": test_plan.name,
|
||||
"max_credits": test_plan.max_credits,
|
||||
})()
|
||||
})()
|
||||
mock_get_by_id.return_value = mock_disabled_user
|
||||
mock_update.return_value = mock_disabled_user
|
||||
|
||||
response = await authenticated_admin_client.post("/api/v1/admin/users/3/enable")
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["message"] == "User enabled successfully"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enable_user_not_found(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
) -> None:
|
||||
"""Test enabling non-existent user."""
|
||||
with (
|
||||
patch("app.core.dependencies.get_admin_user", return_value=admin_user),
|
||||
patch("app.repositories.user.UserRepository.get_by_id_with_plan", return_value=None),
|
||||
):
|
||||
response = await authenticated_admin_client.post("/api/v1/admin/users/999/enable")
|
||||
|
||||
assert response.status_code == 404
|
||||
data = response.json()
|
||||
assert "User not found" in data["detail"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_plans_success(
|
||||
self,
|
||||
authenticated_admin_client: AsyncClient,
|
||||
admin_user: User,
|
||||
test_plan: Plan,
|
||||
) -> None:
|
||||
"""Test listing plans successfully."""
|
||||
basic_plan = Plan(id=1, name="Basic", max_credits=100, features=["basic"])
|
||||
premium_plan = Plan(id=2, name="Premium", max_credits=500, features=["premium"])
|
||||
|
||||
with (
|
||||
patch("app.core.dependencies.get_admin_user", return_value=admin_user),
|
||||
patch("app.repositories.plan.PlanRepository.get_all", return_value=[basic_plan, premium_plan]),
|
||||
):
|
||||
response = await authenticated_admin_client.get("/api/v1/admin/users/plans/list")
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert len(data) == 2
|
||||
assert data[0]["name"] == "Basic"
|
||||
assert data[1]["name"] == "Premium"
|
||||
@@ -478,3 +478,227 @@ class TestAuthEndpoints:
|
||||
assert response.status_code == 400
|
||||
data = response.json()
|
||||
assert "Unsupported OAuth provider" in data["detail"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_refresh_token_success(
|
||||
self,
|
||||
test_client: AsyncClient,
|
||||
test_user: User,
|
||||
test_plan: Plan,
|
||||
) -> None:
|
||||
"""Test refresh token success."""
|
||||
with patch("app.services.auth.AuthService.refresh_access_token") as mock_refresh:
|
||||
mock_refresh.return_value = type("TokenResponse", (), {
|
||||
"access_token": "new_access_token",
|
||||
"expires_in": 3600,
|
||||
})()
|
||||
|
||||
response = await test_client.post(
|
||||
"/api/v1/auth/refresh",
|
||||
cookies={"refresh_token": "valid_refresh_token"}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["message"] == "Token refreshed successfully"
|
||||
mock_refresh.assert_called_once_with("valid_refresh_token")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_refresh_token_no_token(self, test_client: AsyncClient) -> None:
|
||||
"""Test refresh token without providing refresh token."""
|
||||
response = await test_client.post("/api/v1/auth/refresh")
|
||||
|
||||
assert response.status_code == 401
|
||||
data = response.json()
|
||||
assert "No refresh token provided" in data["detail"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_refresh_token_service_error(self, test_client: AsyncClient) -> None:
|
||||
"""Test refresh token with service error."""
|
||||
with patch("app.services.auth.AuthService.refresh_access_token") as mock_refresh:
|
||||
mock_refresh.side_effect = Exception("Database error")
|
||||
|
||||
response = await test_client.post(
|
||||
"/api/v1/auth/refresh",
|
||||
cookies={"refresh_token": "valid_refresh_token"}
|
||||
)
|
||||
|
||||
assert response.status_code == 500
|
||||
data = response.json()
|
||||
assert "Token refresh failed" in data["detail"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exchange_oauth_token_invalid_code(
|
||||
self,
|
||||
test_client: AsyncClient,
|
||||
) -> None:
|
||||
"""Test OAuth token exchange with invalid code."""
|
||||
response = await test_client.post(
|
||||
"/api/v1/auth/exchange-oauth-token",
|
||||
json={"code": "invalid_code"}
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
data = response.json()
|
||||
assert "Invalid or expired OAuth code" in data["detail"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_profile_success(
|
||||
self,
|
||||
test_client: AsyncClient,
|
||||
test_user: User,
|
||||
auth_cookies: dict[str, str],
|
||||
) -> None:
|
||||
"""Test update profile success."""
|
||||
with (
|
||||
patch("app.services.auth.AuthService.update_user_profile") as mock_update,
|
||||
patch("app.services.auth.AuthService.user_to_response") as mock_user_to_response,
|
||||
):
|
||||
updated_user = User(
|
||||
id=test_user.id,
|
||||
email=test_user.email,
|
||||
name="Updated Name",
|
||||
role=test_user.role,
|
||||
credits=test_user.credits,
|
||||
plan_id=test_user.plan_id,
|
||||
is_active=test_user.is_active,
|
||||
)
|
||||
mock_update.return_value = updated_user
|
||||
|
||||
# Mock the user_to_response to return UserResponse format
|
||||
from app.schemas.auth import UserResponse
|
||||
mock_user_to_response.return_value = UserResponse(
|
||||
id=test_user.id,
|
||||
email=test_user.email,
|
||||
name="Updated Name",
|
||||
picture=None,
|
||||
role=test_user.role,
|
||||
credits=test_user.credits,
|
||||
is_active=test_user.is_active,
|
||||
plan={
|
||||
"id": test_user.plan_id,
|
||||
"name": "Test Plan",
|
||||
"max_credits": 100,
|
||||
"features": [],
|
||||
},
|
||||
created_at=test_user.created_at,
|
||||
updated_at=test_user.updated_at,
|
||||
)
|
||||
|
||||
response = await test_client.patch(
|
||||
"/api/v1/auth/me",
|
||||
json={"name": "Updated Name"},
|
||||
cookies=auth_cookies
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["name"] == "Updated Name"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_profile_unauthenticated(self, test_client: AsyncClient) -> None:
|
||||
"""Test update profile without authentication."""
|
||||
response = await test_client.patch(
|
||||
"/api/v1/auth/me",
|
||||
json={"name": "Updated Name"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_change_password_success(
|
||||
self,
|
||||
test_client: AsyncClient,
|
||||
test_user: User,
|
||||
auth_cookies: dict[str, str],
|
||||
) -> None:
|
||||
"""Test change password success."""
|
||||
with patch("app.services.auth.AuthService.change_user_password") as mock_change:
|
||||
mock_change.return_value = None
|
||||
|
||||
response = await test_client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={
|
||||
"current_password": "old_password",
|
||||
"new_password": "new_password"
|
||||
},
|
||||
cookies=auth_cookies
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["message"] == "Password changed successfully"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_change_password_unauthenticated(self, test_client: AsyncClient) -> None:
|
||||
"""Test change password without authentication."""
|
||||
response = await test_client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={
|
||||
"current_password": "old_password",
|
||||
"new_password": "new_password"
|
||||
}
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_user_providers_success(
|
||||
self,
|
||||
test_client: AsyncClient,
|
||||
test_user: User,
|
||||
auth_cookies: dict[str, str],
|
||||
) -> None:
|
||||
"""Test get user OAuth providers success."""
|
||||
with patch("app.services.auth.AuthService.get_user_oauth_providers") as mock_providers:
|
||||
from app.models.user_oauth import UserOauth
|
||||
from datetime import datetime, timezone
|
||||
|
||||
mock_oauth_google = UserOauth(
|
||||
id=1,
|
||||
user_id=test_user.id,
|
||||
provider="google",
|
||||
provider_user_id="google123",
|
||||
email="test@example.com",
|
||||
name="Test User",
|
||||
created_at=datetime.now(timezone.utc),
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
mock_oauth_github = UserOauth(
|
||||
id=2,
|
||||
user_id=test_user.id,
|
||||
provider="github",
|
||||
provider_user_id="github456",
|
||||
email="test@example.com",
|
||||
name="Test User",
|
||||
created_at=datetime.now(timezone.utc),
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
mock_providers.return_value = [mock_oauth_google, mock_oauth_github]
|
||||
|
||||
response = await test_client.get(
|
||||
"/api/v1/auth/user-providers",
|
||||
cookies=auth_cookies
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert len(data) == 3 # password + 2 OAuth providers
|
||||
|
||||
# Check password provider (first)
|
||||
assert data[0]["provider"] == "password"
|
||||
assert data[0]["display_name"] == "Password"
|
||||
|
||||
# Check OAuth providers
|
||||
assert data[1]["provider"] == "google"
|
||||
assert data[1]["display_name"] == "Google"
|
||||
assert data[2]["provider"] == "github"
|
||||
assert data[2]["display_name"] == "GitHub"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_user_providers_unauthenticated(self, test_client: AsyncClient) -> None:
|
||||
"""Test get user OAuth providers without authentication."""
|
||||
response = await test_client.get("/api/v1/auth/user-providers")
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
277
tests/services/test_dashboard.py
Normal file
277
tests/services/test_dashboard.py
Normal file
@@ -0,0 +1,277 @@
|
||||
"""Tests for dashboard service."""
|
||||
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from unittest.mock import AsyncMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from app.services.dashboard import DashboardService
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_sound_repository():
|
||||
"""Mock sound repository."""
|
||||
return Mock()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dashboard_service(mock_sound_repository):
|
||||
"""Dashboard service with mocked dependencies."""
|
||||
return DashboardService(sound_repository=mock_sound_repository)
|
||||
|
||||
|
||||
class TestDashboardService:
|
||||
"""Test dashboard service."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_init(self, mock_sound_repository):
|
||||
"""Test dashboard service initialization."""
|
||||
service = DashboardService(sound_repository=mock_sound_repository)
|
||||
assert service.sound_repository == mock_sound_repository
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_soundboard_statistics_success(
|
||||
self,
|
||||
dashboard_service,
|
||||
mock_sound_repository,
|
||||
):
|
||||
"""Test getting soundboard statistics successfully."""
|
||||
# Mock repository response
|
||||
mock_stats = {
|
||||
"count": 25,
|
||||
"total_plays": 150,
|
||||
"total_duration": 75000,
|
||||
"total_size": 1024000,
|
||||
}
|
||||
mock_sound_repository.get_soundboard_statistics = AsyncMock(return_value=mock_stats)
|
||||
|
||||
result = await dashboard_service.get_soundboard_statistics()
|
||||
|
||||
assert result == {
|
||||
"sound_count": 25,
|
||||
"total_play_count": 150,
|
||||
"total_duration": 75000,
|
||||
"total_size": 1024000,
|
||||
}
|
||||
mock_sound_repository.get_soundboard_statistics.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_soundboard_statistics_exception(
|
||||
self,
|
||||
dashboard_service,
|
||||
mock_sound_repository,
|
||||
):
|
||||
"""Test getting soundboard statistics with exception."""
|
||||
mock_sound_repository.get_soundboard_statistics = AsyncMock(
|
||||
side_effect=Exception("Database error")
|
||||
)
|
||||
|
||||
with pytest.raises(Exception, match="Database error"):
|
||||
await dashboard_service.get_soundboard_statistics()
|
||||
|
||||
mock_sound_repository.get_soundboard_statistics.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_track_statistics_success(
|
||||
self,
|
||||
dashboard_service,
|
||||
mock_sound_repository,
|
||||
):
|
||||
"""Test getting track statistics successfully."""
|
||||
# Mock repository response
|
||||
mock_stats = {
|
||||
"count": 15,
|
||||
"total_plays": 80,
|
||||
"total_duration": 45000,
|
||||
"total_size": 512000,
|
||||
}
|
||||
mock_sound_repository.get_track_statistics = AsyncMock(return_value=mock_stats)
|
||||
|
||||
result = await dashboard_service.get_track_statistics()
|
||||
|
||||
assert result == {
|
||||
"track_count": 15,
|
||||
"total_play_count": 80,
|
||||
"total_duration": 45000,
|
||||
"total_size": 512000,
|
||||
}
|
||||
mock_sound_repository.get_track_statistics.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_track_statistics_exception(
|
||||
self,
|
||||
dashboard_service,
|
||||
mock_sound_repository,
|
||||
):
|
||||
"""Test getting track statistics with exception."""
|
||||
mock_sound_repository.get_track_statistics = AsyncMock(
|
||||
side_effect=Exception("Database error")
|
||||
)
|
||||
|
||||
with pytest.raises(Exception, match="Database error"):
|
||||
await dashboard_service.get_track_statistics()
|
||||
|
||||
mock_sound_repository.get_track_statistics.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_top_sounds_success_all_time(
|
||||
self,
|
||||
dashboard_service,
|
||||
mock_sound_repository,
|
||||
):
|
||||
"""Test getting top sounds for all time successfully."""
|
||||
# Mock repository response
|
||||
mock_sounds = [
|
||||
{
|
||||
"id": 1,
|
||||
"name": "Sound 1",
|
||||
"type": "SDB",
|
||||
"play_count": 100,
|
||||
"duration": 5000,
|
||||
"created_at": datetime(2023, 1, 1, 12, 0, 0, tzinfo=UTC),
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "Sound 2",
|
||||
"type": "SDB",
|
||||
"play_count": 50,
|
||||
"duration": 3000,
|
||||
"created_at": None,
|
||||
},
|
||||
]
|
||||
mock_sound_repository.get_top_sounds = AsyncMock(return_value=mock_sounds)
|
||||
|
||||
result = await dashboard_service.get_top_sounds("SDB", "all_time", 10)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0] == {
|
||||
"id": 1,
|
||||
"name": "Sound 1",
|
||||
"type": "SDB",
|
||||
"play_count": 100,
|
||||
"duration": 5000,
|
||||
"created_at": "2023-01-01T12:00:00+00:00",
|
||||
}
|
||||
assert result[1] == {
|
||||
"id": 2,
|
||||
"name": "Sound 2",
|
||||
"type": "SDB",
|
||||
"play_count": 50,
|
||||
"duration": 3000,
|
||||
"created_at": None,
|
||||
}
|
||||
|
||||
mock_sound_repository.get_top_sounds.assert_called_once_with(
|
||||
sound_type="SDB",
|
||||
date_filter=None,
|
||||
limit=10,
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_top_sounds_success_with_period(
|
||||
self,
|
||||
dashboard_service,
|
||||
mock_sound_repository,
|
||||
):
|
||||
"""Test getting top sounds with specific period."""
|
||||
mock_sounds = []
|
||||
mock_sound_repository.get_top_sounds = AsyncMock(return_value=mock_sounds)
|
||||
|
||||
with patch.object(dashboard_service, "_get_date_filter") as mock_date_filter:
|
||||
mock_date_filter.return_value = datetime(2023, 1, 1, tzinfo=UTC)
|
||||
|
||||
result = await dashboard_service.get_top_sounds("EXT", "1_week", 5)
|
||||
|
||||
assert result == []
|
||||
mock_date_filter.assert_called_once_with("1_week")
|
||||
mock_sound_repository.get_top_sounds.assert_called_once_with(
|
||||
sound_type="EXT",
|
||||
date_filter=datetime(2023, 1, 1, tzinfo=UTC),
|
||||
limit=5,
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_top_sounds_exception(
|
||||
self,
|
||||
dashboard_service,
|
||||
mock_sound_repository,
|
||||
):
|
||||
"""Test getting top sounds with exception."""
|
||||
mock_sound_repository.get_top_sounds = AsyncMock(
|
||||
side_effect=Exception("Database error")
|
||||
)
|
||||
|
||||
with pytest.raises(Exception, match="Database error"):
|
||||
await dashboard_service.get_top_sounds("SDB", "all_time", 10)
|
||||
|
||||
mock_sound_repository.get_top_sounds.assert_called_once()
|
||||
|
||||
def test_get_date_filter_today(self, dashboard_service):
|
||||
"""Test date filter for today."""
|
||||
with patch("app.services.dashboard.datetime") as mock_datetime:
|
||||
mock_now = datetime(2023, 6, 15, 14, 30, 45, 123456, tzinfo=UTC)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
mock_datetime.UTC = UTC
|
||||
|
||||
result = dashboard_service._get_date_filter("today")
|
||||
|
||||
expected = datetime(2023, 6, 15, 0, 0, 0, 0, tzinfo=UTC)
|
||||
assert result == expected
|
||||
|
||||
def test_get_date_filter_1_day(self, dashboard_service):
|
||||
"""Test date filter for 1 day."""
|
||||
with patch("app.services.dashboard.datetime") as mock_datetime:
|
||||
mock_now = datetime(2023, 6, 15, 14, 30, 45, tzinfo=UTC)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
mock_datetime.UTC = UTC
|
||||
|
||||
result = dashboard_service._get_date_filter("1_day")
|
||||
|
||||
expected = datetime(2023, 6, 14, 14, 30, 45, tzinfo=UTC)
|
||||
assert result == expected
|
||||
|
||||
def test_get_date_filter_1_week(self, dashboard_service):
|
||||
"""Test date filter for 1 week."""
|
||||
with patch("app.services.dashboard.datetime") as mock_datetime:
|
||||
mock_now = datetime(2023, 6, 15, 14, 30, 45, tzinfo=UTC)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
mock_datetime.UTC = UTC
|
||||
|
||||
result = dashboard_service._get_date_filter("1_week")
|
||||
|
||||
expected = datetime(2023, 6, 8, 14, 30, 45, tzinfo=UTC)
|
||||
assert result == expected
|
||||
|
||||
def test_get_date_filter_1_month(self, dashboard_service):
|
||||
"""Test date filter for 1 month."""
|
||||
with patch("app.services.dashboard.datetime") as mock_datetime:
|
||||
mock_now = datetime(2023, 6, 15, 14, 30, 45, tzinfo=UTC)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
mock_datetime.UTC = UTC
|
||||
|
||||
result = dashboard_service._get_date_filter("1_month")
|
||||
|
||||
expected = datetime(2023, 5, 16, 14, 30, 45, tzinfo=UTC)
|
||||
assert result == expected
|
||||
|
||||
def test_get_date_filter_1_year(self, dashboard_service):
|
||||
"""Test date filter for 1 year."""
|
||||
with patch("app.services.dashboard.datetime") as mock_datetime:
|
||||
mock_now = datetime(2023, 6, 15, 14, 30, 45, tzinfo=UTC)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
mock_datetime.UTC = UTC
|
||||
|
||||
result = dashboard_service._get_date_filter("1_year")
|
||||
|
||||
expected = datetime(2022, 6, 15, 14, 30, 45, tzinfo=UTC)
|
||||
assert result == expected
|
||||
|
||||
def test_get_date_filter_all_time(self, dashboard_service):
|
||||
"""Test date filter for all time."""
|
||||
result = dashboard_service._get_date_filter("all_time")
|
||||
assert result is None
|
||||
|
||||
def test_get_date_filter_unknown_period(self, dashboard_service):
|
||||
"""Test date filter for unknown period."""
|
||||
result = dashboard_service._get_date_filter("unknown_period")
|
||||
assert result is None
|
||||
Reference in New Issue
Block a user