Add comprehensive tests for playlist service and refactor socket service tests

- Introduced a new test suite for the PlaylistService covering various functionalities including creation, retrieval, updating, and deletion of playlists.
- Added tests for handling sounds within playlists, ensuring correct behavior when adding/removing sounds and managing current playlists.
- Refactored socket service tests for improved readability by adjusting function signatures.
- Cleaned up unnecessary whitespace in sound normalizer and sound scanner tests for consistency.
- Enhanced audio utility tests to ensure accurate hash and size calculations, including edge cases for nonexistent files.
- Removed redundant blank lines in cookie utility tests for cleaner code.
This commit is contained in:
JSC
2025-07-29 19:25:46 +02:00
parent 301b5dd794
commit 5ed19c8f0f
31 changed files with 4248 additions and 194 deletions

View File

@@ -14,7 +14,9 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_generate_api_token_success(
self, authenticated_client: AsyncClient, authenticated_user: User,
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test successful API token generation."""
request_data = {"expires_days": 30}
@@ -33,6 +35,7 @@ class TestApiTokenEndpoints:
# Verify token format (should be URL-safe base64)
import base64
try:
base64.urlsafe_b64decode(data["api_token"] + "===") # Add padding
except Exception:
@@ -40,7 +43,8 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_generate_api_token_default_expiry(
self, authenticated_client: AsyncClient,
self,
authenticated_client: AsyncClient,
):
"""Test API token generation with default expiry."""
response = await authenticated_client.post("/api/v1/auth/api-token", json={})
@@ -65,7 +69,8 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_generate_api_token_custom_expiry(
self, authenticated_client: AsyncClient,
self,
authenticated_client: AsyncClient,
):
"""Test API token generation with custom expiry."""
expires_days = 90
@@ -96,7 +101,8 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_generate_api_token_validation_errors(
self, authenticated_client: AsyncClient,
self,
authenticated_client: AsyncClient,
):
"""Test API token generation with validation errors."""
# Test minimum validation
@@ -124,7 +130,8 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_get_api_token_status_no_token(
self, authenticated_client: AsyncClient,
self,
authenticated_client: AsyncClient,
):
"""Test getting API token status when user has no token."""
response = await authenticated_client.get("/api/v1/auth/api-token/status")
@@ -138,7 +145,8 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_get_api_token_status_with_token(
self, authenticated_client: AsyncClient,
self,
authenticated_client: AsyncClient,
):
"""Test getting API token status when user has a token."""
# First generate a token
@@ -159,14 +167,18 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_get_api_token_status_expired_token(
self, authenticated_client: AsyncClient, authenticated_user: User,
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test getting API token status with expired token."""
# Mock expired token
with patch("app.utils.auth.TokenUtils.is_token_expired", return_value=True):
# Set a token on the user
authenticated_user.api_token = "expired_token"
authenticated_user.api_token_expires_at = datetime.now(UTC) - timedelta(days=1)
authenticated_user.api_token_expires_at = datetime.now(UTC) - timedelta(
days=1
)
response = await authenticated_client.get("/api/v1/auth/api-token/status")
@@ -185,7 +197,8 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_revoke_api_token_success(
self, authenticated_client: AsyncClient,
self,
authenticated_client: AsyncClient,
):
"""Test successful API token revocation."""
# First generate a token
@@ -195,7 +208,9 @@ class TestApiTokenEndpoints:
)
# Verify token exists
status_response = await authenticated_client.get("/api/v1/auth/api-token/status")
status_response = await authenticated_client.get(
"/api/v1/auth/api-token/status"
)
assert status_response.json()["has_token"] is True
# Revoke the token
@@ -206,12 +221,15 @@ class TestApiTokenEndpoints:
assert data["message"] == "API token revoked successfully"
# Verify token is gone
status_response = await authenticated_client.get("/api/v1/auth/api-token/status")
status_response = await authenticated_client.get(
"/api/v1/auth/api-token/status"
)
assert status_response.json()["has_token"] is False
@pytest.mark.asyncio
async def test_revoke_api_token_no_token(
self, authenticated_client: AsyncClient,
self,
authenticated_client: AsyncClient,
):
"""Test revoking API token when user has no token."""
response = await authenticated_client.delete("/api/v1/auth/api-token")
@@ -228,7 +246,9 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_api_token_authentication_success(
self, client: AsyncClient, authenticated_client: AsyncClient,
self,
client: AsyncClient,
authenticated_client: AsyncClient,
):
"""Test successful authentication using API token."""
# Generate API token
@@ -259,7 +279,9 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_api_token_authentication_expired_token(
self, client: AsyncClient, authenticated_client: AsyncClient,
self,
client: AsyncClient,
authenticated_client: AsyncClient,
):
"""Test authentication with expired API token."""
# Generate API token
@@ -299,7 +321,10 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_api_token_authentication_inactive_user(
self, client: AsyncClient, authenticated_client: AsyncClient, authenticated_user: User,
self,
client: AsyncClient,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test authentication with API token for inactive user."""
# Generate API token
@@ -322,7 +347,10 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_flexible_authentication_prefers_api_token(
self, client: AsyncClient, authenticated_client: AsyncClient, auth_cookies: dict[str, str],
self,
client: AsyncClient,
authenticated_client: AsyncClient,
auth_cookies: dict[str, str],
):
"""Test that flexible authentication prefers API token over cookie."""
# Generate API token

View File

@@ -73,7 +73,9 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_register_duplicate_email(
self, test_client: AsyncClient, test_user: User,
self,
test_client: AsyncClient,
test_user: User,
) -> None:
"""Test registration with duplicate email."""
user_data = {
@@ -128,7 +130,10 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_login_success(
self, test_client: AsyncClient, test_user: User, test_login_data: dict[str, str],
self,
test_client: AsyncClient,
test_user: User,
test_login_data: dict[str, str],
) -> None:
"""Test successful user login."""
response = await test_client.post("/api/v1/auth/login", json=test_login_data)
@@ -161,7 +166,9 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_login_invalid_password(
self, test_client: AsyncClient, test_user: User,
self,
test_client: AsyncClient,
test_user: User,
) -> None:
"""Test login with invalid password."""
login_data = {"email": test_user.email, "password": "wrongpassword"}
@@ -183,7 +190,10 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_get_current_user_success(
self, test_client: AsyncClient, test_user: User, auth_cookies: dict[str, str],
self,
test_client: AsyncClient,
test_user: User,
auth_cookies: dict[str, str],
) -> None:
"""Test getting current user info successfully."""
# Set cookies on client instance to avoid deprecation warning
@@ -210,7 +220,8 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_get_current_user_invalid_token(
self, test_client: AsyncClient,
self,
test_client: AsyncClient,
) -> None:
"""Test getting current user with invalid token."""
# Set invalid cookies on client instance
@@ -223,7 +234,9 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_get_current_user_expired_token(
self, test_client: AsyncClient, test_user: User,
self,
test_client: AsyncClient,
test_user: User,
) -> None:
"""Test getting current user with expired token."""
from datetime import timedelta
@@ -237,7 +250,8 @@ class TestAuthEndpoints:
"role": "user",
}
expired_token = JWTUtils.create_access_token(
token_data, expires_delta=timedelta(seconds=-1),
token_data,
expires_delta=timedelta(seconds=-1),
)
# Set expired cookies on client instance
@@ -262,7 +276,9 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_admin_access_with_user_role(
self, test_client: AsyncClient, auth_cookies: dict[str, str],
self,
test_client: AsyncClient,
auth_cookies: dict[str, str],
) -> None:
"""Test that regular users cannot access admin endpoints."""
# This test would be for admin-only endpoints when they're created
@@ -293,7 +309,9 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_admin_access_with_admin_role(
self, test_client: AsyncClient, admin_cookies: dict[str, str],
self,
test_client: AsyncClient,
admin_cookies: dict[str, str],
) -> None:
"""Test that admin users can access admin endpoints."""
from app.core.dependencies import get_admin_user
@@ -357,7 +375,8 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_oauth_authorize_invalid_provider(
self, test_client: AsyncClient,
self,
test_client: AsyncClient,
) -> None:
"""Test OAuth authorization with invalid provider."""
response = await test_client.get("/api/v1/auth/invalid/authorize")
@@ -368,7 +387,9 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_oauth_callback_new_user(
self, test_client: AsyncClient, ensure_plans: tuple[Any, Any],
self,
test_client: AsyncClient,
ensure_plans: tuple[Any, Any],
) -> None:
"""Test OAuth callback for new user creation."""
# Mock OAuth user info
@@ -400,7 +421,10 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_oauth_callback_existing_user_link(
self, test_client: AsyncClient, test_user: Any, ensure_plans: tuple[Any, Any],
self,
test_client: AsyncClient,
test_user: Any,
ensure_plans: tuple[Any, Any],
) -> None:
"""Test OAuth callback for linking to existing user."""
# Mock OAuth user info with same email as test user
@@ -442,7 +466,8 @@ class TestAuthEndpoints:
@pytest.mark.asyncio
async def test_oauth_callback_invalid_provider(
self, test_client: AsyncClient,
self,
test_client: AsyncClient,
) -> None:
"""Test OAuth callback with invalid provider."""
response = await test_client.get(

File diff suppressed because it is too large Load Diff

View File

@@ -22,7 +22,12 @@ class TestSocketEndpoints:
"""Test socket API endpoints."""
@pytest.mark.asyncio
async def test_get_socket_status_authenticated(self, authenticated_client: AsyncClient, authenticated_user: User, mock_socket_manager):
async def test_get_socket_status_authenticated(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
mock_socket_manager,
):
"""Test getting socket status for authenticated user."""
response = await authenticated_client.get("/api/v1/socket/status")
@@ -43,7 +48,12 @@ class TestSocketEndpoints:
assert response.status_code == 401
@pytest.mark.asyncio
async def test_send_message_to_user_success(self, authenticated_client: AsyncClient, authenticated_user: User, mock_socket_manager):
async def test_send_message_to_user_success(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
mock_socket_manager,
):
"""Test sending message to specific user successfully."""
target_user_id = 2
message = "Hello there!"
@@ -72,7 +82,12 @@ class TestSocketEndpoints:
)
@pytest.mark.asyncio
async def test_send_message_to_user_not_connected(self, authenticated_client: AsyncClient, authenticated_user: User, mock_socket_manager):
async def test_send_message_to_user_not_connected(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
mock_socket_manager,
):
"""Test sending message to user who is not connected."""
target_user_id = 999
message = "Hello there!"
@@ -102,7 +117,12 @@ class TestSocketEndpoints:
assert response.status_code == 401
@pytest.mark.asyncio
async def test_broadcast_message_success(self, authenticated_client: AsyncClient, authenticated_user: User, mock_socket_manager):
async def test_broadcast_message_success(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
mock_socket_manager,
):
"""Test broadcasting message to all users successfully."""
message = "Important announcement!"
@@ -137,7 +157,9 @@ class TestSocketEndpoints:
assert response.status_code == 401
@pytest.mark.asyncio
async def test_send_message_missing_parameters(self, authenticated_client: AsyncClient, authenticated_user: User):
async def test_send_message_missing_parameters(
self, authenticated_client: AsyncClient, authenticated_user: User
):
"""Test sending message with missing parameters."""
# Missing target_user_id
response = await authenticated_client.post(
@@ -154,13 +176,17 @@ class TestSocketEndpoints:
assert response.status_code == 422
@pytest.mark.asyncio
async def test_broadcast_message_missing_parameters(self, authenticated_client: AsyncClient, authenticated_user: User):
async def test_broadcast_message_missing_parameters(
self, authenticated_client: AsyncClient, authenticated_user: User
):
"""Test broadcasting message with missing parameters."""
response = await authenticated_client.post("/api/v1/socket/broadcast")
assert response.status_code == 422
@pytest.mark.asyncio
async def test_send_message_invalid_user_id(self, authenticated_client: AsyncClient, authenticated_user: User):
async def test_send_message_invalid_user_id(
self, authenticated_client: AsyncClient, authenticated_user: User
):
"""Test sending message with invalid user ID."""
response = await authenticated_client.post(
"/api/v1/socket/send-message",
@@ -169,10 +195,19 @@ class TestSocketEndpoints:
assert response.status_code == 422
@pytest.mark.asyncio
async def test_socket_status_shows_user_connection(self, authenticated_client: AsyncClient, authenticated_user: User, mock_socket_manager):
async def test_socket_status_shows_user_connection(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
mock_socket_manager,
):
"""Test that socket status correctly shows if user is connected."""
# Test when user is connected
mock_socket_manager.get_connected_users.return_value = [str(authenticated_user.id), "2", "3"]
mock_socket_manager.get_connected_users.return_value = [
str(authenticated_user.id),
"2",
"3",
]
response = await authenticated_client.get("/api/v1/socket/status")
data = response.json()

View File

@@ -870,7 +870,6 @@ class TestSoundEndpoints:
) as mock_normalize_sound,
patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_sound,
):
mock_get_sound.return_value = mock_sound
mock_normalize_sound.return_value = mock_result
@@ -950,7 +949,6 @@ class TestSoundEndpoints:
) as mock_normalize_sound,
patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_sound,
):
mock_get_sound.return_value = mock_sound
mock_normalize_sound.return_value = mock_result
@@ -1003,7 +1001,6 @@ class TestSoundEndpoints:
) as mock_normalize_sound,
patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_sound,
):
mock_get_sound.return_value = mock_sound
mock_normalize_sound.return_value = mock_result
@@ -1059,7 +1056,6 @@ class TestSoundEndpoints:
) as mock_normalize_sound,
patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_sound,
):
mock_get_sound.return_value = mock_sound
mock_normalize_sound.return_value = mock_result

View File

@@ -103,7 +103,8 @@ async def test_client(test_app) -> AsyncGenerator[AsyncClient, None]:
@pytest_asyncio.fixture
async def authenticated_client(
test_app: FastAPI, auth_cookies: dict[str, str],
test_app: FastAPI,
auth_cookies: dict[str, str],
) -> AsyncGenerator[AsyncClient, None]:
"""Create a test HTTP client with authentication cookies."""
async with AsyncClient(
@@ -116,7 +117,8 @@ async def authenticated_client(
@pytest_asyncio.fixture
async def authenticated_admin_client(
test_app: FastAPI, admin_cookies: dict[str, str],
test_app: FastAPI,
admin_cookies: dict[str, str],
) -> AsyncGenerator[AsyncClient, None]:
"""Create a test HTTP client with admin authentication cookies."""
async with AsyncClient(
@@ -211,7 +213,8 @@ async def ensure_plans(test_session: AsyncSession) -> tuple[Plan, Plan]:
@pytest_asyncio.fixture
async def test_user(
test_session: AsyncSession, ensure_plans: tuple[Plan, Plan],
test_session: AsyncSession,
ensure_plans: tuple[Plan, Plan],
) -> User:
"""Create a test user."""
user = User(
@@ -231,7 +234,8 @@ async def test_user(
@pytest_asyncio.fixture
async def admin_user(
test_session: AsyncSession, ensure_plans: tuple[Plan, Plan],
test_session: AsyncSession,
ensure_plans: tuple[Plan, Plan],
) -> User:
"""Create a test admin user."""
user = User(

View File

@@ -36,7 +36,9 @@ class TestApiTokenDependencies:
@pytest.mark.asyncio
async def test_get_current_user_api_token_success(
self, mock_auth_service, test_user,
self,
mock_auth_service,
test_user,
):
"""Test successful API token authentication."""
mock_auth_service.get_user_by_api_token.return_value = test_user
@@ -46,7 +48,9 @@ class TestApiTokenDependencies:
result = await get_current_user_api_token(mock_auth_service, api_token_header)
assert result == test_user
mock_auth_service.get_user_by_api_token.assert_called_once_with("test_api_token_123")
mock_auth_service.get_user_by_api_token.assert_called_once_with(
"test_api_token_123"
)
@pytest.mark.asyncio
async def test_get_current_user_api_token_no_header(self, mock_auth_service):
@@ -94,7 +98,9 @@ class TestApiTokenDependencies:
@pytest.mark.asyncio
async def test_get_current_user_api_token_expired_token(
self, mock_auth_service, test_user,
self,
mock_auth_service,
test_user,
):
"""Test API token authentication with expired token."""
# Set expired token
@@ -111,7 +117,9 @@ class TestApiTokenDependencies:
@pytest.mark.asyncio
async def test_get_current_user_api_token_inactive_user(
self, mock_auth_service, test_user,
self,
mock_auth_service,
test_user,
):
"""Test API token authentication with inactive user."""
test_user.is_active = False
@@ -126,9 +134,13 @@ class TestApiTokenDependencies:
assert "Account is deactivated" in exc_info.value.detail
@pytest.mark.asyncio
async def test_get_current_user_api_token_service_exception(self, mock_auth_service):
async def test_get_current_user_api_token_service_exception(
self, mock_auth_service
):
"""Test API token authentication with service exception."""
mock_auth_service.get_user_by_api_token.side_effect = Exception("Database error")
mock_auth_service.get_user_by_api_token.side_effect = Exception(
"Database error"
)
api_token_header = "test_token"
@@ -140,7 +152,9 @@ class TestApiTokenDependencies:
@pytest.mark.asyncio
async def test_get_current_user_flexible_uses_api_token(
self, mock_auth_service, test_user,
self,
mock_auth_service,
test_user,
):
"""Test flexible authentication uses API token when available."""
mock_auth_service.get_user_by_api_token.return_value = test_user
@@ -149,11 +163,15 @@ class TestApiTokenDependencies:
access_token = "jwt_token"
result = await get_current_user_flexible(
mock_auth_service, access_token, api_token_header,
mock_auth_service,
access_token,
api_token_header,
)
assert result == test_user
mock_auth_service.get_user_by_api_token.assert_called_once_with("test_api_token_123")
mock_auth_service.get_user_by_api_token.assert_called_once_with(
"test_api_token_123"
)
@pytest.mark.asyncio
async def test_get_current_user_flexible_falls_back_to_jwt(self, mock_auth_service):
@@ -165,7 +183,9 @@ class TestApiTokenDependencies:
await get_current_user_flexible(mock_auth_service, "jwt_token", None)
@pytest.mark.asyncio
async def test_api_token_no_expiry_never_expires(self, mock_auth_service, test_user):
async def test_api_token_no_expiry_never_expires(
self, mock_auth_service, test_user
):
"""Test API token with no expiry date never expires."""
test_user.api_token_expires_at = None
mock_auth_service.get_user_by_api_token.return_value = test_user

View File

@@ -0,0 +1,828 @@
"""Tests for playlist repository."""
from collections.abc import AsyncGenerator
import pytest
import pytest_asyncio
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.playlist import Playlist
from app.models.sound import Sound
from app.models.user import User
from app.repositories.playlist import PlaylistRepository
class TestPlaylistRepository:
"""Test playlist repository operations."""
@pytest_asyncio.fixture
async def playlist_repository(
self,
test_session: AsyncSession,
) -> AsyncGenerator[PlaylistRepository, None]:
"""Create a playlist repository instance."""
yield PlaylistRepository(test_session)
@pytest_asyncio.fixture
async def test_playlist(
self,
test_session: AsyncSession,
test_user: User,
) -> Playlist:
"""Create a test playlist."""
playlist = Playlist(
user_id=test_user.id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
await test_session.commit()
await test_session.refresh(playlist)
return playlist
@pytest_asyncio.fixture
async def main_playlist(
self,
test_session: AsyncSession,
) -> Playlist:
"""Create a main playlist."""
playlist = Playlist(
user_id=None,
name="Main Playlist",
description="Main playlist",
is_main=True,
is_current=False,
is_deletable=False,
)
test_session.add(playlist)
await test_session.commit()
await test_session.refresh(playlist)
return playlist
@pytest_asyncio.fixture
async def test_sound(
self,
test_session: AsyncSession,
) -> Sound:
"""Create a test sound."""
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=0,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(sound)
return sound
@pytest.mark.asyncio
async def test_get_by_id_existing(
self,
playlist_repository: PlaylistRepository,
test_playlist: Playlist,
) -> None:
"""Test getting playlist by ID when playlist exists."""
assert test_playlist.id is not None
playlist = await playlist_repository.get_by_id(test_playlist.id)
assert playlist is not None
assert playlist.id == test_playlist.id
assert playlist.name == test_playlist.name
assert playlist.description == test_playlist.description
@pytest.mark.asyncio
async def test_get_by_id_nonexistent(
self,
playlist_repository: PlaylistRepository,
) -> None:
"""Test getting playlist by ID when playlist doesn't exist."""
playlist = await playlist_repository.get_by_id(99999)
assert playlist is None
@pytest.mark.asyncio
async def test_get_by_name_existing(
self,
playlist_repository: PlaylistRepository,
test_playlist: Playlist,
) -> None:
"""Test getting playlist by name when playlist exists."""
playlist = await playlist_repository.get_by_name(test_playlist.name)
assert playlist is not None
assert playlist.id == test_playlist.id
assert playlist.name == test_playlist.name
@pytest.mark.asyncio
async def test_get_by_name_nonexistent(
self,
playlist_repository: PlaylistRepository,
) -> None:
"""Test getting playlist by name when playlist doesn't exist."""
playlist = await playlist_repository.get_by_name("Nonexistent Playlist")
assert playlist is None
@pytest.mark.asyncio
async def test_get_by_user_id(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
ensure_plans,
) -> None:
"""Test getting playlists by user ID."""
# Create test user within this test
from app.utils.auth import PasswordUtils
user = User(
email="test@example.com",
name="Test User",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
# Extract user ID immediately after refresh
user_id = user.id
# Create test playlist for this user
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
await test_session.commit()
# Test the repository method
playlists = await playlist_repository.get_by_user_id(user_id)
# Should only return user's playlists, not the main playlist (user_id=None)
assert len(playlists) == 1
assert playlists[0].name == "Test Playlist"
@pytest.mark.asyncio
async def test_get_main_playlist(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
) -> None:
"""Test getting main playlist."""
# Create main playlist within this test
main_playlist = Playlist(
user_id=None,
name="Main Playlist",
description="Main playlist",
is_main=True,
is_current=False,
is_deletable=False,
)
test_session.add(main_playlist)
await test_session.commit()
await test_session.refresh(main_playlist)
# Extract ID before async call
main_playlist_id = main_playlist.id
# Test the repository method
playlist = await playlist_repository.get_main_playlist()
assert playlist is not None
assert playlist.id == main_playlist_id
assert playlist.is_main is True
@pytest.mark.asyncio
async def test_get_current_playlist(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
ensure_plans,
) -> None:
"""Test getting current playlist when none is set."""
# Create test user within this test
from app.utils.auth import PasswordUtils
user = User(
email="test2@example.com",
name="Test User 2",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
# Extract user ID immediately after refresh
user_id = user.id
# Test the repository method - should return None when no current playlist
playlist = await playlist_repository.get_current_playlist(user_id)
# Should return None since no user playlist is marked as current
assert playlist is None
@pytest.mark.asyncio
async def test_create_playlist(
self,
playlist_repository: PlaylistRepository,
test_user: User,
) -> None:
"""Test creating a new playlist."""
playlist_data = {
"user_id": test_user.id,
"name": "New Playlist",
"description": "A new playlist",
"genre": "rock",
"is_main": False,
"is_current": False,
"is_deletable": True,
}
playlist = await playlist_repository.create(playlist_data)
assert playlist.id is not None
assert playlist.name == "New Playlist"
assert playlist.description == "A new playlist"
assert playlist.genre == "rock"
assert playlist.is_main is False
assert playlist.is_current is False
assert playlist.is_deletable is True
@pytest.mark.asyncio
async def test_update_playlist(
self,
playlist_repository: PlaylistRepository,
test_playlist: Playlist,
) -> None:
"""Test updating a playlist."""
update_data = {
"name": "Updated Playlist",
"description": "Updated description",
"genre": "jazz",
}
updated_playlist = await playlist_repository.update(test_playlist, update_data)
assert updated_playlist.name == "Updated Playlist"
assert updated_playlist.description == "Updated description"
assert updated_playlist.genre == "jazz"
@pytest.mark.asyncio
async def test_delete_playlist(
self,
playlist_repository: PlaylistRepository,
test_playlist: Playlist,
) -> None:
"""Test deleting a playlist."""
playlist_id = test_playlist.id
await playlist_repository.delete(test_playlist)
# Verify playlist is deleted
deleted_playlist = await playlist_repository.get_by_id(playlist_id)
assert deleted_playlist is None
@pytest.mark.asyncio
async def test_search_by_name(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
ensure_plans,
) -> None:
"""Test searching playlists by name."""
# Create test user within this test
from app.utils.auth import PasswordUtils
user = User(
email="test3@example.com",
name="Test User 3",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
# Extract user ID immediately after refresh
user_id = user.id
# Create test playlist
test_playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(test_playlist)
# Create main playlist
main_playlist = Playlist(
user_id=None,
name="Main Playlist",
description="Main playlist",
is_main=True,
is_current=False,
is_deletable=False,
)
test_session.add(main_playlist)
await test_session.commit()
# Search for all playlists (no user filter)
all_results = await playlist_repository.search_by_name("playlist")
assert len(all_results) >= 2 # Should include both user and main playlists
# Search with user filter
user_results = await playlist_repository.search_by_name("playlist", user_id)
assert len(user_results) == 1 # Only user's playlists, not main playlist
# Search for specific playlist
test_results = await playlist_repository.search_by_name("test", user_id)
assert len(test_results) == 1
assert test_results[0].name == "Test Playlist"
@pytest.mark.asyncio
async def test_add_sound_to_playlist(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
ensure_plans,
) -> None:
"""Test adding a sound to a playlist."""
# Create test user within this test
from app.utils.auth import PasswordUtils
user = User(
email="test4@example.com",
name="Test User 4",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
# Create test playlist
playlist = Playlist(
user_id=user.id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
# Create test sound
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=0,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async call
playlist_id = playlist.id
sound_id = sound.id
# Test the repository method
playlist_sound = await playlist_repository.add_sound_to_playlist(
playlist_id, sound_id
)
assert playlist_sound.playlist_id == playlist_id
assert playlist_sound.sound_id == sound_id
assert playlist_sound.position == 0
@pytest.mark.asyncio
async def test_add_sound_to_playlist_with_position(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
ensure_plans,
) -> None:
"""Test adding a sound to a playlist with specific position."""
# Create test user within this test
from app.utils.auth import PasswordUtils
user = User(
email="test5@example.com",
name="Test User 5",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
# Extract user ID immediately after refresh
user_id = user.id
# Create test playlist
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
# Create test sound
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=0,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async call
playlist_id = playlist.id
sound_id = sound.id
# Test the repository method
playlist_sound = await playlist_repository.add_sound_to_playlist(
playlist_id, sound_id, position=5
)
assert playlist_sound.position == 5
@pytest.mark.asyncio
async def test_remove_sound_from_playlist(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
ensure_plans,
) -> None:
"""Test removing a sound from a playlist."""
# Create objects within this test
from app.utils.auth import PasswordUtils
user = User(
email="test@example.com",
name="Test User",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
user_id = user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=0,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async calls
playlist_id = playlist.id
sound_id = sound.id
# First add the sound
await playlist_repository.add_sound_to_playlist(playlist_id, sound_id)
# Verify it was added
assert await playlist_repository.is_sound_in_playlist(
playlist_id, sound_id
)
# Remove the sound
await playlist_repository.remove_sound_from_playlist(
playlist_id, sound_id
)
# Verify it was removed
assert not await playlist_repository.is_sound_in_playlist(
playlist_id, sound_id
)
@pytest.mark.asyncio
async def test_get_playlist_sounds(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
ensure_plans,
) -> None:
"""Test getting sounds in a playlist."""
# Create objects within this test
from app.utils.auth import PasswordUtils
user = User(
email="test@example.com",
name="Test User",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
user_id = user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=0,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async calls
playlist_id = playlist.id
sound_id = sound.id
# Initially empty
sounds = await playlist_repository.get_playlist_sounds(playlist_id)
assert len(sounds) == 0
# Add sound
await playlist_repository.add_sound_to_playlist(playlist_id, sound_id)
# Check sounds
sounds = await playlist_repository.get_playlist_sounds(playlist_id)
assert len(sounds) == 1
assert sounds[0].id == sound_id
@pytest.mark.asyncio
async def test_get_playlist_sound_count(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
ensure_plans,
) -> None:
"""Test getting sound count in a playlist."""
# Create objects within this test
from app.utils.auth import PasswordUtils
user = User(
email="test@example.com",
name="Test User",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
user_id = user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=0,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async calls
playlist_id = playlist.id
sound_id = sound.id
# Initially empty
count = await playlist_repository.get_playlist_sound_count(playlist_id)
assert count == 0
# Add sound
await playlist_repository.add_sound_to_playlist(playlist_id, sound_id)
# Check count
count = await playlist_repository.get_playlist_sound_count(playlist_id)
assert count == 1
@pytest.mark.asyncio
async def test_is_sound_in_playlist(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
ensure_plans,
) -> None:
"""Test checking if sound is in playlist."""
# Create objects within this test
from app.utils.auth import PasswordUtils
user = User(
email="test@example.com",
name="Test User",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
user_id = user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=0,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async calls
playlist_id = playlist.id
sound_id = sound.id
# Initially not in playlist
assert not await playlist_repository.is_sound_in_playlist(
playlist_id, sound_id
)
# Add sound
await playlist_repository.add_sound_to_playlist(playlist_id, sound_id)
# Now in playlist
assert await playlist_repository.is_sound_in_playlist(
playlist_id, sound_id
)
@pytest.mark.asyncio
async def test_reorder_playlist_sounds(
self,
playlist_repository: PlaylistRepository,
test_session: AsyncSession,
ensure_plans,
) -> None:
"""Test reordering sounds in a playlist."""
# Create objects within this test
from app.utils.auth import PasswordUtils
user = User(
email="test@example.com",
name="Test User",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
user_id = user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
# Create multiple sounds
sound1 = Sound(name="Sound 1", filename="sound1.mp3", type="SDB", hash="hash1")
sound2 = Sound(name="Sound 2", filename="sound2.mp3", type="SDB", hash="hash2")
test_session.add_all([playlist, sound1, sound2])
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound1)
await test_session.refresh(sound2)
# Extract IDs before async calls
playlist_id = playlist.id
sound1_id = sound1.id
sound2_id = sound2.id
# Add sounds to playlist
await playlist_repository.add_sound_to_playlist(
playlist_id, sound1_id, position=0
)
await playlist_repository.add_sound_to_playlist(
playlist_id, sound2_id, position=1
)
# Reorder sounds - use different positions to avoid constraint issues
sound_positions = [(sound1_id, 10), (sound2_id, 5)]
await playlist_repository.reorder_playlist_sounds(
playlist_id, sound_positions
)
# Verify new order
sounds = await playlist_repository.get_playlist_sounds(playlist_id)
assert len(sounds) == 2
assert sounds[0].id == sound2_id # sound2 now at position 5
assert sounds[1].id == sound1_id # sound1 now at position 10

View File

@@ -48,11 +48,15 @@ class TestAuthService:
@pytest.mark.asyncio
async def test_register_duplicate_email(
self, auth_service: AuthService, test_user: User,
self,
auth_service: AuthService,
test_user: User,
) -> None:
"""Test registration with duplicate email."""
request = UserRegisterRequest(
email=test_user.email, password="password123", name="Another User",
email=test_user.email,
password="password123",
name="Another User",
)
with pytest.raises(HTTPException) as exc_info:
@@ -89,7 +93,8 @@ class TestAuthService:
async def test_login_invalid_email(self, auth_service: AuthService) -> None:
"""Test login with invalid email."""
request = UserLoginRequest(
email="nonexistent@example.com", password="password123",
email="nonexistent@example.com",
password="password123",
)
with pytest.raises(HTTPException) as exc_info:
@@ -100,7 +105,9 @@ class TestAuthService:
@pytest.mark.asyncio
async def test_login_invalid_password(
self, auth_service: AuthService, test_user: User,
self,
auth_service: AuthService,
test_user: User,
) -> None:
"""Test login with invalid password."""
request = UserLoginRequest(email=test_user.email, password="wrongpassword")
@@ -113,7 +120,10 @@ class TestAuthService:
@pytest.mark.asyncio
async def test_login_inactive_user(
self, auth_service: AuthService, test_user: User, test_session: AsyncSession,
self,
auth_service: AuthService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test login with inactive user."""
# Store the email before deactivating
@@ -133,7 +143,10 @@ class TestAuthService:
@pytest.mark.asyncio
async def test_login_user_without_password(
self, auth_service: AuthService, test_user: User, test_session: AsyncSession,
self,
auth_service: AuthService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test login with user that has no password hash."""
# Store the email before removing password
@@ -153,7 +166,9 @@ class TestAuthService:
@pytest.mark.asyncio
async def test_get_current_user_success(
self, auth_service: AuthService, test_user: User,
self,
auth_service: AuthService,
test_user: User,
) -> None:
"""Test getting current user successfully."""
user = await auth_service.get_current_user(test_user.id)
@@ -174,7 +189,10 @@ class TestAuthService:
@pytest.mark.asyncio
async def test_get_current_user_inactive(
self, auth_service: AuthService, test_user: User, test_session: AsyncSession,
self,
auth_service: AuthService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test getting current user when user is inactive."""
# Store the user ID before deactivating
@@ -192,7 +210,9 @@ class TestAuthService:
@pytest.mark.asyncio
async def test_create_access_token(
self, auth_service: AuthService, test_user: User,
self,
auth_service: AuthService,
test_user: User,
) -> None:
"""Test access token creation."""
token_response = auth_service._create_access_token(test_user)
@@ -211,7 +231,10 @@ class TestAuthService:
@pytest.mark.asyncio
async def test_create_user_response(
self, auth_service: AuthService, test_user: User, test_session: AsyncSession,
self,
auth_service: AuthService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test user response creation."""
# Ensure plan relationship is loaded

View File

@@ -52,7 +52,9 @@ class TestExtractionService:
@patch("app.services.extraction.yt_dlp.YoutubeDL")
@pytest.mark.asyncio
async def test_detect_service_info_youtube(self, mock_ydl_class, extraction_service):
async def test_detect_service_info_youtube(
self, mock_ydl_class, extraction_service
):
"""Test service detection for YouTube."""
mock_ydl = Mock()
mock_ydl_class.return_value.__enter__.return_value = mock_ydl
@@ -75,7 +77,9 @@ class TestExtractionService:
@patch("app.services.extraction.yt_dlp.YoutubeDL")
@pytest.mark.asyncio
async def test_detect_service_info_failure(self, mock_ydl_class, extraction_service):
async def test_detect_service_info_failure(
self, mock_ydl_class, extraction_service
):
"""Test service detection failure."""
mock_ydl = Mock()
mock_ydl_class.return_value.__enter__.return_value = mock_ydl
@@ -169,7 +173,7 @@ class TestExtractionService:
async def test_process_extraction_with_service_detection(self, extraction_service):
"""Test extraction processing with service detection."""
extraction_id = 1
# Mock extraction without service info
mock_extraction = Extraction(
id=extraction_id,
@@ -180,7 +184,7 @@ class TestExtractionService:
title=None,
status="pending",
)
extraction_service.extraction_repo.get_by_id = AsyncMock(
return_value=mock_extraction
)
@@ -188,21 +192,25 @@ class TestExtractionService:
extraction_service.extraction_repo.get_by_service_and_id = AsyncMock(
return_value=None
)
# Mock service detection
service_info = {
"service": "youtube",
"service_id": "test123",
"service_id": "test123",
"title": "Test Video",
}
with (
patch.object(
extraction_service, "_detect_service_info", return_value=service_info
),
patch.object(extraction_service, "_extract_media") as mock_extract,
patch.object(extraction_service, "_move_files_to_final_location") as mock_move,
patch.object(extraction_service, "_create_sound_record") as mock_create_sound,
patch.object(
extraction_service, "_move_files_to_final_location"
) as mock_move,
patch.object(
extraction_service, "_create_sound_record"
) as mock_create_sound,
patch.object(extraction_service, "_normalize_sound") as mock_normalize,
patch.object(extraction_service, "_add_to_main_playlist") as mock_playlist,
):
@@ -210,17 +218,17 @@ class TestExtractionService:
mock_extract.return_value = (Path("/fake/audio.mp3"), None)
mock_move.return_value = (Path("/final/audio.mp3"), None)
mock_create_sound.return_value = mock_sound
result = await extraction_service.process_extraction(extraction_id)
# Verify service detection was called
extraction_service._detect_service_info.assert_called_once_with(
"https://www.youtube.com/watch?v=test123"
)
# Verify extraction was updated with service info
extraction_service.extraction_repo.update.assert_called()
assert result["status"] == "completed"
assert result["service"] == "youtube"
assert result["service_id"] == "test123"
@@ -288,7 +296,6 @@ class TestExtractionService:
"app.services.extraction.get_file_hash", return_value="test_hash"
),
):
extraction_service.sound_repo.create = AsyncMock(
return_value=mock_sound
)

View File

@@ -29,8 +29,9 @@ class TestExtractionProcessor:
async def test_start_and_stop(self, processor):
"""Test starting and stopping the processor."""
# Mock the _process_queue method to avoid actual processing
with patch.object(processor, "_process_queue", new_callable=AsyncMock) as mock_process:
with patch.object(
processor, "_process_queue", new_callable=AsyncMock
) as mock_process:
# Start the processor
await processor.start()
assert processor.processor_task is not None
@@ -44,7 +45,6 @@ class TestExtractionProcessor:
async def test_start_already_running(self, processor):
"""Test starting processor when already running."""
with patch.object(processor, "_process_queue", new_callable=AsyncMock):
# Start first time
await processor.start()
first_task = processor.processor_task
@@ -150,7 +150,6 @@ class TestExtractionProcessor:
return_value=mock_service,
),
):
mock_session = AsyncMock()
mock_session_class.return_value.__aenter__.return_value = mock_session
@@ -176,7 +175,6 @@ class TestExtractionProcessor:
return_value=mock_service,
),
):
mock_session = AsyncMock()
mock_session_class.return_value.__aenter__.return_value = mock_session
@@ -207,7 +205,6 @@ class TestExtractionProcessor:
return_value=mock_service,
),
):
mock_session = AsyncMock()
mock_session_class.return_value.__aenter__.return_value = mock_session
@@ -232,14 +229,15 @@ class TestExtractionProcessor:
patch(
"app.services.extraction_processor.AsyncSession"
) as mock_session_class,
patch.object(processor, "_process_single_extraction", new_callable=AsyncMock) as mock_process,
patch.object(
processor, "_process_single_extraction", new_callable=AsyncMock
) as mock_process,
patch(
"app.services.extraction_processor.ExtractionService",
return_value=mock_service,
),
patch("asyncio.create_task") as mock_create_task,
):
mock_session = AsyncMock()
mock_session_class.return_value.__aenter__.return_value = mock_session
@@ -276,14 +274,15 @@ class TestExtractionProcessor:
patch(
"app.services.extraction_processor.AsyncSession"
) as mock_session_class,
patch.object(processor, "_process_single_extraction", new_callable=AsyncMock) as mock_process,
patch.object(
processor, "_process_single_extraction", new_callable=AsyncMock
) as mock_process,
patch(
"app.services.extraction_processor.ExtractionService",
return_value=mock_service,
),
patch("asyncio.create_task") as mock_create_task,
):
mock_session = AsyncMock()
mock_session_class.return_value.__aenter__.return_value = mock_session

View File

@@ -0,0 +1,971 @@
"""Tests for playlist service."""
from collections.abc import AsyncGenerator
import pytest
import pytest_asyncio
from fastapi import HTTPException
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.playlist import Playlist
from app.models.sound import Sound
from app.models.user import User
from app.services.playlist import PlaylistService
class TestPlaylistService:
"""Test playlist service operations."""
@pytest_asyncio.fixture
async def playlist_service(
self,
test_session: AsyncSession,
) -> AsyncGenerator[PlaylistService, None]:
"""Create a playlist service instance."""
yield PlaylistService(test_session)
@pytest_asyncio.fixture
async def test_playlist(
self,
test_session: AsyncSession,
test_user: User,
) -> Playlist:
"""Create a test playlist."""
# Extract user_id from test_user within the fixture
user_id = test_user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
await test_session.commit()
await test_session.refresh(playlist)
return playlist
@pytest_asyncio.fixture
async def current_playlist(
self,
test_session: AsyncSession,
test_user: User,
) -> Playlist:
"""Create a current playlist."""
# Extract user_id from test_user within the fixture
user_id = test_user.id
playlist = Playlist(
user_id=user_id,
name="Current Playlist",
description="Currently active playlist",
is_main=False,
is_current=True,
is_deletable=True,
)
test_session.add(playlist)
await test_session.commit()
await test_session.refresh(playlist)
return playlist
@pytest_asyncio.fixture
async def main_playlist(
self,
test_session: AsyncSession,
) -> Playlist:
"""Create a main playlist."""
playlist = Playlist(
user_id=None,
name="Main Playlist",
description="Main playlist",
is_main=True,
is_current=False,
is_deletable=False,
)
test_session.add(playlist)
await test_session.commit()
await test_session.refresh(playlist)
return playlist
@pytest_asyncio.fixture
async def test_sound(
self,
test_session: AsyncSession,
) -> Sound:
"""Create a test sound."""
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=10,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(sound)
return sound
@pytest_asyncio.fixture
async def other_user(
self,
test_session: AsyncSession,
ensure_plans,
) -> User:
"""Create another test user."""
from app.utils.auth import PasswordUtils
user = User(
email="other@example.com",
name="Other User",
password_hash=PasswordUtils.hash_password("password123"),
role="user",
is_active=True,
plan_id=ensure_plans[0].id,
credits=100,
)
test_session.add(user)
await test_session.commit()
await test_session.refresh(user)
return user
@pytest.mark.asyncio
async def test_get_playlist_by_id_success(
self,
playlist_service: PlaylistService,
test_user: User,
test_playlist: Playlist,
) -> None:
"""Test getting playlist by ID successfully."""
assert test_playlist.id is not None
playlist = await playlist_service.get_playlist_by_id(test_playlist.id)
assert playlist.id == test_playlist.id
assert playlist.name == test_playlist.name
@pytest.mark.asyncio
async def test_get_playlist_by_id_not_found(
self,
playlist_service: PlaylistService,
test_user: User,
) -> None:
"""Test getting non-existent playlist."""
with pytest.raises(HTTPException) as exc_info:
await playlist_service.get_playlist_by_id(99999)
assert exc_info.value.status_code == 404
assert "not found" in exc_info.value.detail
@pytest.mark.asyncio
async def test_get_main_playlist_existing(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test getting existing main playlist."""
# Create main playlist manually
main_playlist = Playlist(
user_id=None,
name="Main Playlist",
description="Main playlist",
is_main=True,
is_deletable=False,
)
test_session.add(main_playlist)
await test_session.commit()
await test_session.refresh(main_playlist)
playlist = await playlist_service.get_main_playlist()
assert playlist.id == main_playlist.id
assert playlist.is_main is True
@pytest.mark.asyncio
async def test_get_main_playlist_create_if_not_exists(
self,
playlist_service: PlaylistService,
test_user: User,
) -> None:
"""Test that service fails if main playlist doesn't exist."""
# Should raise an HTTPException if no main playlist exists
with pytest.raises(HTTPException) as exc_info:
await playlist_service.get_main_playlist()
assert exc_info.value.status_code == 500
assert "Main playlist not found" in exc_info.value.detail
@pytest.mark.asyncio
async def test_create_playlist_success(
self,
playlist_service: PlaylistService,
test_user: User,
) -> None:
"""Test creating a new playlist successfully."""
user_id = test_user.id # Extract user_id while session is available
playlist = await playlist_service.create_playlist(
user_id=user_id,
name="New Playlist",
description="A new playlist",
genre="rock",
)
assert playlist.name == "New Playlist"
assert playlist.description == "A new playlist"
assert playlist.genre == "rock"
assert playlist.user_id == user_id
assert playlist.is_main is False
assert playlist.is_current is False
assert playlist.is_deletable is True
@pytest.mark.asyncio
async def test_create_playlist_duplicate_name(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test creating playlist with duplicate name fails."""
# Create test playlist within this test
user_id = test_user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
await test_session.commit()
await test_session.refresh(playlist)
# Extract name before async call
playlist_name = playlist.name
with pytest.raises(HTTPException) as exc_info:
await playlist_service.create_playlist(
user_id=user_id,
name=playlist_name, # Same name as existing playlist
)
assert exc_info.value.status_code == 400
assert "already exists" in exc_info.value.detail
@pytest.mark.asyncio
async def test_create_playlist_as_current(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test creating a playlist as current unsets previous current."""
# Create current playlist within this test
user_id = test_user.id
current_playlist = Playlist(
user_id=user_id,
name="Current Playlist",
description="Currently active playlist",
is_main=False,
is_current=True,
is_deletable=True,
)
test_session.add(current_playlist)
await test_session.commit()
await test_session.refresh(current_playlist)
# Verify the existing current playlist
assert current_playlist.is_current is True
# Extract ID before async call
current_playlist_id = current_playlist.id
# Create new playlist as current
new_playlist = await playlist_service.create_playlist(
user_id=user_id,
name="New Current Playlist",
is_current=True,
)
assert new_playlist.is_current is True
# Verify the old current playlist is no longer current
# We need to refresh the old playlist from the database
old_playlist = await playlist_service.get_playlist_by_id(current_playlist_id)
assert old_playlist.is_current is False
@pytest.mark.asyncio
async def test_update_playlist_success(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test updating a playlist successfully."""
# Create test playlist within this test
user_id = test_user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
await test_session.commit()
await test_session.refresh(playlist)
# Extract IDs before async call
playlist_id = playlist.id
updated_playlist = await playlist_service.update_playlist(
playlist_id=playlist_id,
user_id=user_id,
name="Updated Name",
description="Updated description",
genre="jazz",
)
assert updated_playlist.name == "Updated Name"
assert updated_playlist.description == "Updated description"
assert updated_playlist.genre == "jazz"
@pytest.mark.asyncio
async def test_update_playlist_set_current(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test setting a playlist as current via update."""
# Create test playlist within this test
user_id = test_user.id
test_playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(test_playlist)
current_playlist = Playlist(
user_id=user_id,
name="Current Playlist",
description="Currently active playlist",
is_main=False,
is_current=True,
is_deletable=True,
)
test_session.add(current_playlist)
await test_session.commit()
await test_session.refresh(test_playlist)
await test_session.refresh(current_playlist)
# Extract IDs before async calls
test_playlist_id = test_playlist.id
current_playlist_id = current_playlist.id
# Verify initial state
assert test_playlist.is_current is False
assert current_playlist.is_current is True
# Update playlist to be current
updated_playlist = await playlist_service.update_playlist(
playlist_id=test_playlist_id,
user_id=user_id,
is_current=True,
)
assert updated_playlist.is_current is True
# Verify old current playlist is no longer current
old_current = await playlist_service.get_playlist_by_id(current_playlist_id)
assert old_current.is_current is False
@pytest.mark.asyncio
async def test_delete_playlist_success(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test deleting a playlist successfully."""
# Create test playlist within this test
user_id = test_user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
await test_session.commit()
await test_session.refresh(playlist)
# Extract ID before async call
playlist_id = playlist.id
await playlist_service.delete_playlist(playlist_id, user_id)
# Verify playlist is deleted
with pytest.raises(HTTPException) as exc_info:
await playlist_service.get_playlist_by_id(playlist_id)
assert exc_info.value.status_code == 404
@pytest.mark.asyncio
async def test_delete_current_playlist_sets_main_as_current(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test deleting current playlist sets main as current."""
# Create main playlist first (required by service)
main_playlist = Playlist(
user_id=None,
name="Main Playlist",
description="Main playlist",
is_main=True,
is_current=False,
is_deletable=False,
)
test_session.add(main_playlist)
# Create current playlist within this test
user_id = test_user.id
current_playlist = Playlist(
user_id=user_id,
name="Current Playlist",
description="Currently active playlist",
is_main=False,
is_current=True,
is_deletable=True,
)
test_session.add(current_playlist)
await test_session.commit()
await test_session.refresh(current_playlist)
# Extract ID before async call
current_playlist_id = current_playlist.id
# Delete the current playlist
await playlist_service.delete_playlist(current_playlist_id, user_id)
# Verify main playlist is now fallback current (main playlist doesn't have is_current=True)
# The service returns main playlist when no current is set
current = await playlist_service.get_current_playlist(user_id)
assert current.is_main is True
@pytest.mark.asyncio
async def test_delete_non_deletable_playlist(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test deleting non-deletable playlist fails."""
# Extract user ID immediately
user_id = test_user.id
# Create non-deletable playlist
non_deletable = Playlist(
user_id=user_id,
name="Non-deletable",
is_deletable=False,
)
test_session.add(non_deletable)
await test_session.commit()
await test_session.refresh(non_deletable)
# Extract ID before async call
non_deletable_id = non_deletable.id
with pytest.raises(HTTPException) as exc_info:
await playlist_service.delete_playlist(non_deletable_id, user_id)
assert exc_info.value.status_code == 400
assert "cannot be deleted" in exc_info.value.detail
@pytest.mark.asyncio
async def test_add_sound_to_playlist_success(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test adding sound to playlist successfully."""
# Create test playlist and sound within this test
user_id = test_user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=10,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async calls
playlist_id = playlist.id
sound_id = sound.id
await playlist_service.add_sound_to_playlist(
playlist_id=playlist_id,
sound_id=sound_id,
user_id=user_id,
)
# Verify sound was added
sounds = await playlist_service.get_playlist_sounds(playlist_id)
assert len(sounds) == 1
assert sounds[0].id == sound_id
@pytest.mark.asyncio
async def test_add_sound_to_playlist_already_exists(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test adding sound that's already in playlist fails."""
# Create test playlist and sound within this test
user_id = test_user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=10,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async calls
playlist_id = playlist.id
sound_id = sound.id
# Add sound first time
await playlist_service.add_sound_to_playlist(
playlist_id=playlist_id,
sound_id=sound_id,
user_id=user_id,
)
# Try to add same sound again
with pytest.raises(HTTPException) as exc_info:
await playlist_service.add_sound_to_playlist(
playlist_id=playlist_id,
sound_id=sound_id,
user_id=user_id,
)
assert exc_info.value.status_code == 400
assert "already in this playlist" in exc_info.value.detail
@pytest.mark.asyncio
async def test_remove_sound_from_playlist_success(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test removing sound from playlist successfully."""
# Create test playlist and sound within this test
user_id = test_user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=10,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async calls
playlist_id = playlist.id
sound_id = sound.id
# Add sound first
await playlist_service.add_sound_to_playlist(
playlist_id=playlist_id,
sound_id=sound_id,
user_id=user_id,
)
# Remove sound
await playlist_service.remove_sound_from_playlist(
playlist_id=playlist_id,
sound_id=sound_id,
user_id=user_id,
)
# Verify sound was removed
sounds = await playlist_service.get_playlist_sounds(playlist_id)
assert len(sounds) == 0
@pytest.mark.asyncio
async def test_remove_sound_not_in_playlist(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test removing sound that's not in playlist fails."""
# Create test playlist and sound within this test
user_id = test_user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=10,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async calls
playlist_id = playlist.id
sound_id = sound.id
with pytest.raises(HTTPException) as exc_info:
await playlist_service.remove_sound_from_playlist(
playlist_id=playlist_id,
sound_id=sound_id,
user_id=user_id,
)
assert exc_info.value.status_code == 404
assert "not found in this playlist" in exc_info.value.detail
@pytest.mark.asyncio
async def test_set_current_playlist(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test setting a playlist as current."""
# Create test playlists within this test
user_id = test_user.id
test_playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(test_playlist)
current_playlist = Playlist(
user_id=user_id,
name="Current Playlist",
description="Currently active playlist",
is_main=False,
is_current=True,
is_deletable=True,
)
test_session.add(current_playlist)
await test_session.commit()
await test_session.refresh(test_playlist)
await test_session.refresh(current_playlist)
# Extract IDs before async calls
test_playlist_id = test_playlist.id
current_playlist_id = current_playlist.id
# Verify initial state
assert current_playlist.is_current is True
assert test_playlist.is_current is False
# Set test_playlist as current
updated_playlist = await playlist_service.set_current_playlist(
test_playlist_id, user_id
)
assert updated_playlist.is_current is True
# Verify old current is no longer current
old_current = await playlist_service.get_playlist_by_id(current_playlist_id)
assert old_current.is_current is False
@pytest.mark.asyncio
async def test_unset_current_playlist_sets_main_as_current(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test unsetting current playlist falls back to main playlist."""
# Create test playlists within this test
user_id = test_user.id
current_playlist = Playlist(
user_id=user_id,
name="Current Playlist",
description="Currently active playlist",
is_main=False,
is_current=True,
is_deletable=True,
)
test_session.add(current_playlist)
main_playlist = Playlist(
user_id=None,
name="Main Playlist",
description="Main playlist",
is_main=True,
is_current=False,
is_deletable=False,
)
test_session.add(main_playlist)
await test_session.commit()
await test_session.refresh(current_playlist)
await test_session.refresh(main_playlist)
# Extract IDs before async calls
current_playlist_id = current_playlist.id
main_playlist_id = main_playlist.id
# Verify initial state
assert current_playlist.is_current is True
# Unset current playlist
await playlist_service.unset_current_playlist(user_id)
# Verify get_current_playlist returns main playlist as fallback
current = await playlist_service.get_current_playlist(user_id)
assert current.id == main_playlist_id
assert current.is_main is True
# Verify old current is no longer current
old_current = await playlist_service.get_playlist_by_id(current_playlist_id)
assert old_current.is_current is False
@pytest.mark.asyncio
async def test_get_playlist_stats(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test getting playlist statistics."""
# Create test playlist and sound within this test
user_id = test_user.id
playlist = Playlist(
user_id=user_id,
name="Test Playlist",
description="A test playlist",
genre="test",
is_main=False,
is_current=False,
is_deletable=True,
)
test_session.add(playlist)
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=10,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(playlist)
await test_session.refresh(sound)
# Extract IDs before async calls
playlist_id = playlist.id
sound_id = sound.id
# Initially empty playlist
stats = await playlist_service.get_playlist_stats(playlist_id)
assert stats["sound_count"] == 0
assert stats["total_duration_ms"] == 0
assert stats["total_play_count"] == 0
# Add sound to playlist
await playlist_service.add_sound_to_playlist(
playlist_id=playlist_id,
sound_id=sound_id,
user_id=user_id,
)
# Check stats again
stats = await playlist_service.get_playlist_stats(playlist_id)
assert stats["sound_count"] == 1
assert stats["total_duration_ms"] == 5000 # From test_sound fixture
assert stats["total_play_count"] == 10 # From test_sound fixture
@pytest.mark.asyncio
async def test_add_sound_to_main_playlist(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test adding sound to main playlist."""
# Create test sound and main playlist within this test
user_id = test_user.id
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=10,
)
test_session.add(sound)
main_playlist = Playlist(
user_id=None,
name="Main Playlist",
description="Main playlist",
is_main=True,
is_current=False,
is_deletable=False,
)
test_session.add(main_playlist)
await test_session.commit()
await test_session.refresh(sound)
await test_session.refresh(main_playlist)
# Extract IDs before async calls
sound_id = sound.id
main_playlist_id = main_playlist.id
# Add sound to main playlist
await playlist_service.add_sound_to_main_playlist(sound_id, user_id)
# Verify sound was added to main playlist
sounds = await playlist_service.get_playlist_sounds(main_playlist_id)
assert len(sounds) == 1
assert sounds[0].id == sound_id
@pytest.mark.asyncio
async def test_add_sound_to_main_playlist_already_exists(
self,
playlist_service: PlaylistService,
test_user: User,
test_session: AsyncSession,
) -> None:
"""Test adding sound to main playlist when it already exists (should not duplicate)."""
# Create test sound and main playlist within this test
user_id = test_user.id
sound = Sound(
name="Test Sound",
filename="test.mp3",
type="SDB",
duration=5000,
size=1024,
hash="test_hash",
play_count=10,
)
test_session.add(sound)
main_playlist = Playlist(
user_id=None,
name="Main Playlist",
description="Main playlist",
is_main=True,
is_current=False,
is_deletable=False,
)
test_session.add(main_playlist)
await test_session.commit()
await test_session.refresh(sound)
await test_session.refresh(main_playlist)
# Extract IDs before async calls
sound_id = sound.id
main_playlist_id = main_playlist.id
# Add sound to main playlist twice
await playlist_service.add_sound_to_main_playlist(sound_id, user_id)
await playlist_service.add_sound_to_main_playlist(sound_id, user_id)
# Verify sound is only added once
sounds = await playlist_service.get_playlist_sounds(main_playlist_id)
assert len(sounds) == 1
assert sounds[0].id == sound_id

View File

@@ -97,7 +97,9 @@ class TestSocketManager:
@pytest.mark.asyncio
@patch("app.services.socket.extract_access_token_from_cookies")
@patch("app.services.socket.JWTUtils.decode_access_token")
async def test_connect_handler_success(self, mock_decode, mock_extract_token, socket_manager, mock_sio):
async def test_connect_handler_success(
self, mock_decode, mock_extract_token, socket_manager, mock_sio
):
"""Test successful connection with valid token."""
# Setup mocks
mock_extract_token.return_value = "valid_token"
@@ -130,7 +132,9 @@ class TestSocketManager:
@pytest.mark.asyncio
@patch("app.services.socket.extract_access_token_from_cookies")
async def test_connect_handler_no_token(self, mock_extract_token, socket_manager, mock_sio):
async def test_connect_handler_no_token(
self, mock_extract_token, socket_manager, mock_sio
):
"""Test connection with no access token."""
# Setup mocks
mock_extract_token.return_value = None
@@ -162,7 +166,9 @@ class TestSocketManager:
@pytest.mark.asyncio
@patch("app.services.socket.extract_access_token_from_cookies")
@patch("app.services.socket.JWTUtils.decode_access_token")
async def test_connect_handler_invalid_token(self, mock_decode, mock_extract_token, socket_manager, mock_sio):
async def test_connect_handler_invalid_token(
self, mock_decode, mock_extract_token, socket_manager, mock_sio
):
"""Test connection with invalid token."""
# Setup mocks
mock_extract_token.return_value = "invalid_token"
@@ -195,7 +201,9 @@ class TestSocketManager:
@pytest.mark.asyncio
@patch("app.services.socket.extract_access_token_from_cookies")
@patch("app.services.socket.JWTUtils.decode_access_token")
async def test_connect_handler_missing_user_id(self, mock_decode, mock_extract_token, socket_manager, mock_sio):
async def test_connect_handler_missing_user_id(
self, mock_decode, mock_extract_token, socket_manager, mock_sio
):
"""Test connection with token missing user ID."""
# Setup mocks
mock_extract_token.return_value = "token_without_user_id"

View File

@@ -182,7 +182,6 @@ class TestSoundNormalizerService:
"app.services.sound_normalizer.get_file_hash", return_value="new_hash"
),
):
# Setup path mocks
mock_orig_path.return_value = Path("/fake/original.mp3")
mock_norm_path.return_value = Path("/fake/normalized.mp3")
@@ -256,7 +255,6 @@ class TestSoundNormalizerService:
"app.services.sound_normalizer.get_file_hash", return_value="norm_hash"
),
):
# Setup path mocks
mock_orig_path.return_value = Path("/fake/original.mp3")
mock_norm_path.return_value = Path("/fake/normalized.mp3")
@@ -294,7 +292,6 @@ class TestSoundNormalizerService:
patch.object(normalizer_service, "_get_original_path") as mock_orig_path,
patch.object(normalizer_service, "_get_normalized_path") as mock_norm_path,
):
# Setup path mocks
mock_orig_path.return_value = Path("/fake/original.mp3")
mock_norm_path.return_value = Path("/fake/normalized.mp3")
@@ -306,7 +303,6 @@ class TestSoundNormalizerService:
normalizer_service, "_normalize_audio_two_pass"
) as mock_normalize,
):
mock_normalize.side_effect = Exception("Normalization failed")
result = await normalizer_service.normalize_sound(sound)

View File

@@ -41,6 +41,7 @@ class TestSoundScannerService:
try:
from app.utils.audio import get_file_hash
hash_value = get_file_hash(temp_path)
assert len(hash_value) == 64 # SHA-256 hash length
assert isinstance(hash_value, str)
@@ -56,6 +57,7 @@ class TestSoundScannerService:
try:
from app.utils.audio import get_file_size
size = get_file_size(temp_path)
assert size > 0
assert isinstance(size, int)
@@ -83,6 +85,7 @@ class TestSoundScannerService:
temp_path = Path("/fake/path/test.mp3")
from app.utils.audio import get_audio_duration
duration = get_audio_duration(temp_path)
assert duration == 123456 # 123.456 seconds * 1000 = 123456 ms
@@ -95,6 +98,7 @@ class TestSoundScannerService:
temp_path = Path("/fake/path/test.mp3")
from app.utils.audio import get_audio_duration
duration = get_audio_duration(temp_path)
assert duration == 0
@@ -129,10 +133,11 @@ class TestSoundScannerService:
)
# Mock file operations to return same hash
with patch("app.services.sound_scanner.get_file_hash", return_value="same_hash"), \
patch("app.services.sound_scanner.get_audio_duration", return_value=120000), \
patch("app.services.sound_scanner.get_file_size", return_value=1024):
with (
patch("app.services.sound_scanner.get_file_hash", return_value="same_hash"),
patch("app.services.sound_scanner.get_audio_duration", return_value=120000),
patch("app.services.sound_scanner.get_file_size", return_value=1024),
):
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
temp_path = Path(f.name)
@@ -175,10 +180,11 @@ class TestSoundScannerService:
scanner_service.sound_repo.create = AsyncMock(return_value=created_sound)
# Mock file operations
with patch("app.services.sound_scanner.get_file_hash", return_value="test_hash"), \
patch("app.services.sound_scanner.get_audio_duration", return_value=120000), \
patch("app.services.sound_scanner.get_file_size", return_value=1024):
with (
patch("app.services.sound_scanner.get_file_hash", return_value="test_hash"),
patch("app.services.sound_scanner.get_audio_duration", return_value=120000),
patch("app.services.sound_scanner.get_file_size", return_value=1024),
):
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
temp_path = Path(f.name)
@@ -208,7 +214,9 @@ class TestSoundScannerService:
assert call_args["duration"] == 120000 # Duration in ms
assert call_args["size"] == 1024
assert call_args["hash"] == "test_hash"
assert call_args["is_deletable"] is False # SDB sounds are not deletable
assert (
call_args["is_deletable"] is False
) # SDB sounds are not deletable
finally:
temp_path.unlink()
@@ -229,10 +237,11 @@ class TestSoundScannerService:
scanner_service.sound_repo.update = AsyncMock(return_value=existing_sound)
# Mock file operations to return new values
with patch("app.services.sound_scanner.get_file_hash", return_value="new_hash"), \
patch("app.services.sound_scanner.get_audio_duration", return_value=120000), \
patch("app.services.sound_scanner.get_file_size", return_value=1024):
with (
patch("app.services.sound_scanner.get_file_hash", return_value="new_hash"),
patch("app.services.sound_scanner.get_audio_duration", return_value=120000),
patch("app.services.sound_scanner.get_file_size", return_value=1024),
):
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
temp_path = Path(f.name)
@@ -259,7 +268,9 @@ class TestSoundScannerService:
assert results["files"][0]["reason"] == "file was modified"
# Verify sound_repo.update was called with correct data
call_args = scanner_service.sound_repo.update.call_args[0][1] # update_data
call_args = scanner_service.sound_repo.update.call_args[0][
1
] # update_data
assert call_args["duration"] == 120000
assert call_args["size"] == 1024
assert call_args["hash"] == "new_hash"
@@ -283,10 +294,13 @@ class TestSoundScannerService:
scanner_service.sound_repo.create = AsyncMock(return_value=created_sound)
# Mock file operations
with patch("app.services.sound_scanner.get_file_hash", return_value="custom_hash"), \
patch("app.services.sound_scanner.get_audio_duration", return_value=60000), \
patch("app.services.sound_scanner.get_file_size", return_value=2048):
with (
patch(
"app.services.sound_scanner.get_file_hash", return_value="custom_hash"
),
patch("app.services.sound_scanner.get_audio_duration", return_value=60000),
patch("app.services.sound_scanner.get_file_size", return_value=2048),
):
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
temp_path = Path(f.name)
@@ -301,7 +315,9 @@ class TestSoundScannerService:
"errors": 0,
"files": [],
}
await scanner_service._sync_audio_file(temp_path, "CUSTOM", None, results)
await scanner_service._sync_audio_file(
temp_path, "CUSTOM", None, results
)
assert results["added"] == 1
assert results["skipped"] == 0

View File

@@ -24,22 +24,22 @@ class TestAudioUtils:
try:
# Calculate hash using our function
result_hash = get_file_hash(temp_path)
# Calculate expected hash manually
expected_hash = hashlib.sha256(test_content.encode()).hexdigest()
# Verify the hash is correct
assert result_hash == expected_hash
assert len(result_hash) == 64 # SHA-256 hash length
assert isinstance(result_hash, str)
finally:
temp_path.unlink()
def test_get_file_hash_binary_content(self):
"""Test file hash calculation with binary content."""
# Create a temporary file with binary content
test_bytes = b"\x00\x01\x02\x03\xFF\xFE\xFD"
test_bytes = b"\x00\x01\x02\x03\xff\xfe\xfd"
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(test_bytes)
temp_path = Path(f.name)
@@ -47,15 +47,15 @@ class TestAudioUtils:
try:
# Calculate hash using our function
result_hash = get_file_hash(temp_path)
# Calculate expected hash manually
expected_hash = hashlib.sha256(test_bytes).hexdigest()
# Verify the hash is correct
assert result_hash == expected_hash
assert len(result_hash) == 64 # SHA-256 hash length
assert isinstance(result_hash, str)
finally:
temp_path.unlink()
@@ -68,15 +68,15 @@ class TestAudioUtils:
try:
# Calculate hash using our function
result_hash = get_file_hash(temp_path)
# Calculate expected hash for empty content
expected_hash = hashlib.sha256(b"").hexdigest()
# Verify the hash is correct
assert result_hash == expected_hash
assert len(result_hash) == 64 # SHA-256 hash length
assert isinstance(result_hash, str)
finally:
temp_path.unlink()
@@ -91,15 +91,15 @@ class TestAudioUtils:
try:
# Calculate hash using our function
result_hash = get_file_hash(temp_path)
# Calculate expected hash manually
expected_hash = hashlib.sha256(test_content.encode()).hexdigest()
# Verify the hash is correct
assert result_hash == expected_hash
assert len(result_hash) == 64 # SHA-256 hash length
assert isinstance(result_hash, str)
finally:
temp_path.unlink()
@@ -114,15 +114,15 @@ class TestAudioUtils:
try:
# Get size using our function
result_size = get_file_size(temp_path)
# Get expected size using pathlib directly
expected_size = temp_path.stat().st_size
# Verify the size is correct
assert result_size == expected_size
assert result_size > 0
assert isinstance(result_size, int)
finally:
temp_path.unlink()
@@ -135,18 +135,18 @@ class TestAudioUtils:
try:
# Get size using our function
result_size = get_file_size(temp_path)
# Verify the size is zero
assert result_size == 0
assert isinstance(result_size, int)
finally:
temp_path.unlink()
def test_get_file_size_binary_file(self):
"""Test file size calculation for binary file."""
# Create a temporary file with binary content
test_bytes = b"\x00\x01\x02\x03\xFF\xFE\xFD" * 100 # 700 bytes
test_bytes = b"\x00\x01\x02\x03\xff\xfe\xfd" * 100 # 700 bytes
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
f.write(test_bytes)
temp_path = Path(f.name)
@@ -154,12 +154,12 @@ class TestAudioUtils:
try:
# Get size using our function
result_size = get_file_size(temp_path)
# Verify the size is correct
assert result_size == len(test_bytes)
assert result_size == 700
assert isinstance(result_size, int)
finally:
temp_path.unlink()
@@ -168,10 +168,10 @@ class TestAudioUtils:
"""Test successful audio duration extraction."""
# Mock ffmpeg.probe to return duration
mock_probe.return_value = {"format": {"duration": "123.456"}}
temp_path = Path("/fake/path/test.mp3")
duration = get_audio_duration(temp_path)
# Verify duration is converted correctly (seconds to milliseconds)
assert duration == 123456 # 123.456 seconds * 1000 = 123456 ms
assert isinstance(duration, int)
@@ -182,10 +182,10 @@ class TestAudioUtils:
"""Test audio duration extraction with integer duration."""
# Mock ffmpeg.probe to return integer duration
mock_probe.return_value = {"format": {"duration": "60"}}
temp_path = Path("/fake/path/test.wav")
duration = get_audio_duration(temp_path)
# Verify duration is converted correctly
assert duration == 60000 # 60 seconds * 1000 = 60000 ms
assert isinstance(duration, int)
@@ -196,10 +196,10 @@ class TestAudioUtils:
"""Test audio duration extraction with zero duration."""
# Mock ffmpeg.probe to return zero duration
mock_probe.return_value = {"format": {"duration": "0.0"}}
temp_path = Path("/fake/path/silent.mp3")
duration = get_audio_duration(temp_path)
# Verify duration is zero
assert duration == 0
assert isinstance(duration, int)
@@ -210,10 +210,10 @@ class TestAudioUtils:
"""Test audio duration extraction with fractional seconds."""
# Mock ffmpeg.probe to return fractional duration
mock_probe.return_value = {"format": {"duration": "45.123"}}
temp_path = Path("/fake/path/test.flac")
duration = get_audio_duration(temp_path)
# Verify duration is converted and rounded correctly
assert duration == 45123 # 45.123 seconds * 1000 = 45123 ms
assert isinstance(duration, int)
@@ -224,10 +224,10 @@ class TestAudioUtils:
"""Test audio duration extraction when ffmpeg fails."""
# Mock ffmpeg.probe to raise an exception
mock_probe.side_effect = Exception("FFmpeg error: file not found")
temp_path = Path("/fake/path/nonexistent.mp3")
duration = get_audio_duration(temp_path)
# Verify duration defaults to 0 on error
assert duration == 0
assert isinstance(duration, int)
@@ -238,10 +238,10 @@ class TestAudioUtils:
"""Test audio duration extraction when format info is missing."""
# Mock ffmpeg.probe to return data without format info
mock_probe.return_value = {"streams": []}
temp_path = Path("/fake/path/corrupt.mp3")
duration = get_audio_duration(temp_path)
# Verify duration defaults to 0 when format info is missing
assert duration == 0
assert isinstance(duration, int)
@@ -252,10 +252,10 @@ class TestAudioUtils:
"""Test audio duration extraction when duration is missing."""
# Mock ffmpeg.probe to return format without duration
mock_probe.return_value = {"format": {"size": "1024"}}
temp_path = Path("/fake/path/noduration.mp3")
duration = get_audio_duration(temp_path)
# Verify duration defaults to 0 when duration is missing
assert duration == 0
assert isinstance(duration, int)
@@ -266,10 +266,10 @@ class TestAudioUtils:
"""Test audio duration extraction with invalid duration value."""
# Mock ffmpeg.probe to return invalid duration
mock_probe.return_value = {"format": {"duration": "invalid"}}
temp_path = Path("/fake/path/invalid.mp3")
duration = get_audio_duration(temp_path)
# Verify duration defaults to 0 when duration is invalid
assert duration == 0
assert isinstance(duration, int)
@@ -278,7 +278,7 @@ class TestAudioUtils:
def test_get_file_hash_nonexistent_file(self):
"""Test file hash calculation for nonexistent file."""
nonexistent_path = Path("/fake/nonexistent/file.mp3")
# Should raise FileNotFoundError for nonexistent file
with pytest.raises(FileNotFoundError):
get_file_hash(nonexistent_path)
@@ -286,7 +286,7 @@ class TestAudioUtils:
def test_get_file_size_nonexistent_file(self):
"""Test file size calculation for nonexistent file."""
nonexistent_path = Path("/fake/nonexistent/file.mp3")
# Should raise FileNotFoundError for nonexistent file
with pytest.raises(FileNotFoundError):
get_file_size(nonexistent_path)
get_file_size(nonexistent_path)

View File

@@ -1,6 +1,5 @@
"""Tests for cookie utilities."""
from app.utils.cookies import extract_access_token_from_cookies, parse_cookies