Refactor sound and extraction services to include user and timestamp fields
All checks were successful
Backend CI / lint (push) Successful in 18m8s
Backend CI / test (push) Successful in 53m35s

- Updated ExtractionInfo to include user_id, created_at, and updated_at fields.
- Modified ExtractionService to return user and timestamp information in extraction responses.
- Enhanced sound serialization in PlayerState to include extraction URL if available.
- Adjusted PlaylistRepository to load sound extractions when retrieving playlist sounds.
- Added tests for new fields in extraction and sound endpoints, ensuring proper response structure.
- Created new test file endpoints for sound downloads and thumbnail retrievals, including success and error cases.
- Refactored various test cases for consistency and clarity, ensuring proper mocking and assertions.
This commit is contained in:
JSC
2025-08-03 20:54:14 +02:00
parent 77446cb5a8
commit b4f0f54516
20 changed files with 780 additions and 73 deletions

View File

@@ -2,7 +2,17 @@
from fastapi import APIRouter
from app.api.v1 import admin, auth, extractions, main, player, playlists, socket, sounds
from app.api.v1 import (
admin,
auth,
extractions,
files,
main,
player,
playlists,
socket,
sounds,
)
# V1 API router with v1 prefix
api_router = APIRouter(prefix="/v1")
@@ -10,6 +20,7 @@ api_router = APIRouter(prefix="/v1")
# Include all route modules
api_router.include_router(auth.router, tags=["authentication"])
api_router.include_router(extractions.router, tags=["extractions"])
api_router.include_router(files.router, tags=["files"])
api_router.include_router(main.router, tags=["main"])
api_router.include_router(player.router, tags=["player"])
api_router.include_router(playlists.router, tags=["playlists"])

View File

@@ -226,5 +226,3 @@ async def normalize_sound_by_id(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to normalize sound: {e!s}",
) from e

153
app/api/v1/files.py Normal file
View File

@@ -0,0 +1,153 @@
"""File serving API endpoints for audio files and thumbnails."""
import mimetypes
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import FileResponse
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.config import settings
from app.core.database import get_db
from app.core.dependencies import get_current_active_user_flexible
from app.core.logging import get_logger
from app.models.user import User
from app.repositories.sound import SoundRepository
from app.utils.audio import get_sound_file_path
logger = get_logger(__name__)
router = APIRouter(prefix="/files", tags=["files"])
async def get_sound_repository(
session: Annotated[AsyncSession, Depends(get_db)],
) -> SoundRepository:
"""Get the sound repository."""
return SoundRepository(session)
@router.get("/sounds/{sound_id}/download")
async def download_sound(
sound_id: int,
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
sound_repo: Annotated[SoundRepository, Depends(get_sound_repository)],
) -> FileResponse:
"""Download a sound file."""
try:
# Get the sound record
sound = await sound_repo.get_by_id(sound_id)
if not sound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Sound with ID {sound_id} not found",
)
# Get the file path using the audio utility
file_path = get_sound_file_path(sound)
# Determine filename based on normalization status
if sound.is_normalized and sound.normalized_filename:
filename = sound.normalized_filename
else:
filename = sound.filename
# Check if file exists
if not file_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Sound file not found on disk",
)
# Get MIME type
mime_type, _ = mimetypes.guess_type(str(file_path))
if not mime_type:
mime_type = "audio/mpeg" # Default to MP3
logger.info(
"Serving sound download: %s (user: %d, sound: %d)",
filename,
current_user.id,
sound_id,
)
return FileResponse(
path=str(file_path),
filename=filename,
media_type=mime_type,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
except HTTPException:
raise
except Exception as e:
logger.exception("Error serving sound download for sound %d", sound_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to serve sound file",
) from e
@router.get("/sounds/{sound_id}/thumbnail")
async def get_sound_thumbnail(
sound_id: int,
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
sound_repo: Annotated[SoundRepository, Depends(get_sound_repository)],
) -> FileResponse:
"""Get a sound's thumbnail image."""
try:
# Get the sound record
sound = await sound_repo.get_by_id(sound_id)
if not sound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Sound with ID {sound_id} not found",
)
# Check if sound has a thumbnail
if not sound.thumbnail:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No thumbnail available for this sound",
)
# Get thumbnail file path
thumbnail_path = Path(settings.EXTRACTION_THUMBNAILS_DIR) / sound.thumbnail
# Check if thumbnail file exists
if not thumbnail_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Thumbnail file not found on disk",
)
# Get MIME type
mime_type, _ = mimetypes.guess_type(str(thumbnail_path))
if not mime_type:
mime_type = "image/jpeg" # Default to JPEG
logger.debug(
"Serving thumbnail: %s (user: %d, sound: %d)",
sound.thumbnail,
current_user.id,
sound_id,
)
return FileResponse(
path=str(thumbnail_path),
media_type=mime_type,
headers={
"Cache-Control": "public, max-age=3600", # Cache for 1 hour
"Content-Disposition": "inline", # Display inline, not download
},
)
except HTTPException:
raise
except Exception as e:
logger.exception("Error serving thumbnail for sound %d", sound_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to serve thumbnail",
) from e

View File

@@ -17,7 +17,6 @@ from app.services.vlc_player import VLCPlayerService, get_vlc_player_service
router = APIRouter(prefix="/sounds", tags=["sounds"])
def get_vlc_player() -> VLCPlayerService:
"""Get the VLC player service."""
return get_vlc_player_service(get_session_factory())
@@ -56,7 +55,6 @@ async def get_sounds(
return {"sounds": sounds}
# VLC PLAYER
@router.post("/play/{sound_id}")
async def play_sound_with_vlc(

View File

@@ -1,6 +1,7 @@
"""Playlist repository for database operations."""
from sqlalchemy import func
from sqlalchemy.orm import selectinload
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -84,11 +85,12 @@ class PlaylistRepository(BaseRepository[Playlist]):
raise
async def get_playlist_sounds(self, playlist_id: int) -> list[Sound]:
"""Get all sounds in a playlist, ordered by position."""
"""Get all sounds in a playlist with extractions, ordered by position."""
try:
statement = (
select(Sound)
.join(PlaylistSound)
.options(selectinload(Sound.extractions))
.where(PlaylistSound.playlist_id == playlist_id)
.order_by(PlaylistSound.position)
)

View File

@@ -31,6 +31,9 @@ class ExtractionInfo(TypedDict):
status: str
error: str | None
sound_id: int | None
user_id: int
created_at: str
updated_at: str
class ExtractionService:
@@ -88,6 +91,9 @@ class ExtractionService:
"status": extraction.status,
"error": extraction.error,
"sound_id": extraction.sound_id,
"user_id": extraction.user_id,
"created_at": extraction.created_at.isoformat(),
"updated_at": extraction.updated_at.isoformat(),
}
async def _detect_service_info(self, url: str) -> dict[str, str | None] | None:
@@ -201,6 +207,7 @@ class ExtractionService:
# Create Sound record
sound = await self._create_sound_record(
final_audio_path,
final_thumbnail_path,
extraction_title,
extraction_service,
extraction_service_id,
@@ -208,6 +215,9 @@ class ExtractionService:
# Store sound_id early to avoid session detachment issues
sound_id = sound.id
if not sound_id:
msg = "Sound creation failed - no ID returned"
raise RuntimeError(msg)
# Normalize the sound
await self._normalize_sound(sound_id)
@@ -234,6 +244,8 @@ class ExtractionService:
error_msg,
)
else:
# Get updated extraction to get latest timestamps
updated_extraction = await self.extraction_repo.get_by_id(extraction_id)
return {
"id": extraction_id,
"url": extraction_url,
@@ -243,6 +255,17 @@ class ExtractionService:
"status": "completed",
"error": None,
"sound_id": sound_id,
"user_id": user_id,
"created_at": (
updated_extraction.created_at.isoformat()
if updated_extraction
else ""
),
"updated_at": (
updated_extraction.updated_at.isoformat()
if updated_extraction
else ""
),
}
# Update extraction with error
@@ -254,6 +277,8 @@ class ExtractionService:
},
)
# Get updated extraction to get latest timestamps
updated_extraction = await self.extraction_repo.get_by_id(extraction_id)
return {
"id": extraction_id,
"url": extraction_url,
@@ -263,6 +288,17 @@ class ExtractionService:
"status": "failed",
"error": error_msg,
"sound_id": None,
"user_id": user_id,
"created_at": (
updated_extraction.created_at.isoformat()
if updated_extraction
else ""
),
"updated_at": (
updated_extraction.updated_at.isoformat()
if updated_extraction
else ""
),
}
async def _extract_media(
@@ -409,6 +445,7 @@ class ExtractionService:
async def _create_sound_record(
self,
audio_path: Path,
thumbnail_path: Path | None,
title: str | None,
service: str | None,
service_id: str | None,
@@ -427,6 +464,7 @@ class ExtractionService:
"duration": duration,
"size": size,
"hash": file_hash,
"thumbnail": thumbnail_path.name if thumbnail_path else None,
"is_deletable": True, # Extracted sounds can be deleted
"is_music": True, # Assume extracted content is music
"is_normalized": False,
@@ -434,7 +472,11 @@ class ExtractionService:
}
sound = await self.sound_repo.create(sound_data)
logger.info("Created sound record with ID: %d", sound.id)
logger.info(
"Created sound record with ID: %d, thumbnail: %s",
sound.id,
thumbnail_path.name if thumbnail_path else "None",
)
return sound
@@ -496,6 +538,9 @@ class ExtractionService:
"status": extraction.status,
"error": extraction.error,
"sound_id": extraction.sound_id,
"user_id": extraction.user_id,
"created_at": extraction.created_at.isoformat(),
"updated_at": extraction.updated_at.isoformat(),
}
async def get_user_extractions(self, user_id: int) -> list[ExtractionInfo]:
@@ -513,6 +558,9 @@ class ExtractionService:
"status": extraction.status,
"error": extraction.error,
"sound_id": extraction.sound_id,
"user_id": extraction.user_id,
"created_at": extraction.created_at.isoformat(),
"updated_at": extraction.updated_at.isoformat(),
}
for extraction in extractions
]
@@ -532,6 +580,9 @@ class ExtractionService:
"status": extraction.status,
"error": extraction.error,
"sound_id": extraction.sound_id,
"user_id": extraction.user_id,
"created_at": extraction.created_at.isoformat(),
"updated_at": extraction.updated_at.isoformat(),
}
for extraction in extractions
]

View File

@@ -87,6 +87,14 @@ class PlayerState:
"""Serialize a sound object for JSON serialization."""
if not sound:
return None
# Get extraction URL if sound is linked to an extraction
extract_url = None
if hasattr(sound, "extractions") and sound.extractions:
# Get the first extraction (there should only be one per sound)
extraction = sound.extractions[0]
extract_url = extraction.url
return {
"id": sound.id,
"name": sound.name,
@@ -96,6 +104,7 @@ class PlayerState:
"type": sound.type,
"thumbnail": sound.thumbnail,
"play_count": sound.play_count,
"extract_url": extract_url,
}
@@ -585,7 +594,8 @@ class PlayerService:
# Check if track finished
player_state = self._player.get_state()
if hasattr(vlc, "State") and player_state == vlc.State.Ended:
vlc_state_ended = 6 # vlc.State.Ended value
if player_state == vlc_state_ended:
# Track finished, handle auto-advance
self._schedule_async_task(self._handle_track_finished())

View File

@@ -306,7 +306,8 @@ class TestAdminSoundEndpoints:
@pytest.mark.asyncio
async def test_normalize_all_sounds_unauthenticated(
self, client: AsyncClient,
self,
client: AsyncClient,
) -> None:
"""Test normalizing sounds without authentication."""
response = await client.post("/api/v1/admin/sounds/normalize/all")

View File

@@ -125,7 +125,8 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_generate_api_token_unauthenticated(
self, client: AsyncClient,
self,
client: AsyncClient,
) -> None:
"""Test API token generation without authentication."""
response = await client.post(
@@ -197,7 +198,8 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_get_api_token_status_unauthenticated(
self, client: AsyncClient,
self,
client: AsyncClient,
) -> None:
"""Test getting API token status without authentication."""
response = await client.get("/api/v1/auth/api-token/status")
@@ -277,7 +279,8 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_api_token_authentication_invalid_token(
self, client: AsyncClient,
self,
client: AsyncClient,
) -> None:
"""Test authentication with invalid API token."""
headers = {"API-TOKEN": "invalid_token"}
@@ -312,7 +315,8 @@ class TestApiTokenEndpoints:
@pytest.mark.asyncio
async def test_api_token_authentication_empty_token(
self, client: AsyncClient,
self,
client: AsyncClient,
) -> None:
"""Test authentication with empty API-TOKEN header."""
# Empty token

View File

@@ -1,34 +1,134 @@
"""Tests for extraction API endpoints."""
from unittest.mock import patch
import pytest
import pytest_asyncio
from httpx import AsyncClient
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.extraction import Extraction
from app.models.user import User
class TestExtractionEndpoints:
"""Test extraction API endpoints."""
@pytest_asyncio.fixture
async def test_extraction(
self,
test_session: AsyncSession,
authenticated_user: User,
) -> Extraction:
"""Create a test extraction."""
extraction = Extraction(
url="https://www.youtube.com/watch?v=test",
user_id=authenticated_user.id,
service="youtube",
service_id="test",
title="Test Video",
status="completed",
)
test_session.add(extraction)
await test_session.commit()
await test_session.refresh(extraction)
return extraction
@pytest.mark.asyncio
async def test_create_extraction_success(
self,
test_client: AsyncClient,
auth_cookies: dict[str, str],
authenticated_client: AsyncClient,
authenticated_user: User,
) -> None:
"""Test successful extraction creation."""
# Set cookies on client instance to avoid deprecation warning
test_client.cookies.update(auth_cookies)
"""Test successful extraction creation with proper response format."""
# Store user ID to avoid session issues
user_id = authenticated_user.id
response = await test_client.post(
"/api/v1/extractions/",
params={"url": "https://www.youtube.com/watch?v=test"},
with patch(
"app.services.extraction_processor.extraction_processor.queue_extraction",
):
response = await authenticated_client.post(
"/api/v1/extractions/",
params={"url": "https://www.youtube.com/watch?v=test"},
)
assert response.status_code == 200
data = response.json()
# Verify response structure
assert "message" in data
assert "extraction" in data
extraction_data = data["extraction"]
# Verify all required fields including timestamps
assert "id" in extraction_data
assert "url" in extraction_data
assert "user_id" in extraction_data
assert "status" in extraction_data
assert "created_at" in extraction_data
assert "updated_at" in extraction_data
assert extraction_data["url"] == "https://www.youtube.com/watch?v=test"
assert extraction_data["user_id"] == user_id
assert extraction_data["status"] == "pending"
@pytest.mark.asyncio
async def test_get_extraction_success(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
test_extraction: Extraction,
) -> None:
"""Test successful extraction retrieval with timestamp fields."""
response = await authenticated_client.get(
f"/api/v1/extractions/{test_extraction.id}",
)
# This will fail because we don't have actual extraction service mocked
# But at least we'll get past authentication
assert response.status_code in [200, 400, 500] # Allow any non-auth error
assert response.status_code == 200
data = response.json()
# Verify all fields including timestamps
assert data["id"] == test_extraction.id
assert data["url"] == test_extraction.url
assert data["user_id"] == test_extraction.user_id
assert data["service"] == test_extraction.service
assert data["service_id"] == test_extraction.service_id
assert data["title"] == test_extraction.title
assert data["status"] == test_extraction.status
assert "created_at" in data
assert "updated_at" in data
@pytest.mark.asyncio
async def test_get_user_extractions_success(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
test_extraction: Extraction,
) -> None:
"""Test successful user extractions retrieval with timestamp fields."""
response = await authenticated_client.get(
"/api/v1/extractions/",
)
assert response.status_code == 200
data = response.json()
# Verify response structure
assert "extractions" in data
assert len(data["extractions"]) >= 1
extraction_data = data["extractions"][0]
# Verify all fields including timestamps
assert "id" in extraction_data
assert "url" in extraction_data
assert "user_id" in extraction_data
assert "status" in extraction_data
assert "created_at" in extraction_data
assert "updated_at" in extraction_data
@pytest.mark.asyncio
async def test_create_extraction_unauthenticated(
self, test_client: AsyncClient,
self,
test_client: AsyncClient,
) -> None:
"""Test extraction creation without authentication."""
response = await test_client.post(
@@ -41,7 +141,8 @@ class TestExtractionEndpoints:
@pytest.mark.asyncio
async def test_get_extraction_unauthenticated(
self, test_client: AsyncClient,
self,
test_client: AsyncClient,
) -> None:
"""Test extraction retrieval without authentication."""
response = await test_client.get("/api/v1/extractions/1")

View File

@@ -0,0 +1,245 @@
"""Tests for file serving API endpoints."""
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
import pytest_asyncio
from fastapi import status
from httpx import AsyncClient
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.sound import Sound
from app.models.user import User
class TestFilesEndpoints:
"""Test file serving endpoints."""
@pytest_asyncio.fixture
async def test_sound(
self,
test_session: AsyncSession,
) -> Sound:
"""Create a test sound."""
sound = Sound(
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=30000,
size=1024,
hash="test_hash",
play_count=0,
is_normalized=False,
is_music=True,
is_deletable=True,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(sound)
return sound
@pytest_asyncio.fixture
async def test_sound_with_thumbnail(
self,
test_session: AsyncSession,
) -> Sound:
"""Create a test sound with thumbnail."""
sound = Sound(
type="EXT",
name="Test Extracted Sound",
filename="extracted.mp3",
duration=60000,
size=2048,
hash="extracted_hash",
thumbnail="test_thumb.jpg",
play_count=5,
is_normalized=True,
is_music=True,
is_deletable=True,
)
test_session.add(sound)
await test_session.commit()
await test_session.refresh(sound)
return sound
@pytest.mark.asyncio
async def test_download_sound_success(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
test_sound: Sound,
) -> None:
"""Test successful sound download."""
with (
patch("app.api.v1.files.get_sound_file_path") as mock_get_path,
tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file,
):
# Setup mock file - write to temp file and close it
temp_path = Path(temp_file.name)
temp_file.write(b"fake audio data")
temp_file.close()
mock_get_path.return_value = temp_path
try:
response = await authenticated_client.get(
f"/api/v1/files/sounds/{test_sound.id}/download",
)
assert response.status_code == status.HTTP_200_OK
finally:
# Clean up the temp file
temp_path.unlink(missing_ok=True)
assert response.headers["content-type"] == "audio/mpeg"
assert "attachment" in response.headers.get("content-disposition", "")
assert response.content == b"fake audio data"
@pytest.mark.asyncio
@pytest.mark.asyncio
async def test_download_sound_not_found(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
) -> None:
"""Test download with non-existent sound."""
response = await authenticated_client.get(
"/api/v1/files/sounds/99999/download",
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "not found" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_download_sound_file_not_found_on_disk(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
test_sound: Sound,
) -> None:
"""Test download when file doesn't exist on disk."""
with patch("app.api.v1.files.get_sound_file_path") as mock_get_path:
# Mock path that doesn't exist
mock_get_path.return_value = Path("/non/existent/file.mp3")
response = await authenticated_client.get(
f"/api/v1/files/sounds/{test_sound.id}/download",
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "not found on disk" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_download_sound_unauthorized(
self,
client: AsyncClient,
test_sound: Sound,
) -> None:
"""Test download without authentication."""
response = await client.get(f"/api/v1/files/sounds/{test_sound.id}/download")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_get_thumbnail_success(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
test_sound_with_thumbnail: Sound,
) -> None:
"""Test successful thumbnail retrieval."""
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
# Create the actual thumbnail file
thumbnail_path = temp_dir_path / test_sound_with_thumbnail.thumbnail
thumbnail_path.write_bytes(b"fake image data")
with patch("app.api.v1.files.settings") as mock_settings:
mock_settings.EXTRACTION_THUMBNAILS_DIR = temp_dir_path
response = await authenticated_client.get(
f"/api/v1/files/sounds/{test_sound_with_thumbnail.id}/thumbnail",
)
assert response.status_code == status.HTTP_200_OK
assert response.headers["content-type"] == "image/jpeg"
assert "inline" in response.headers.get("content-disposition", "")
assert "Cache-Control" in response.headers
@pytest.mark.asyncio
async def test_get_thumbnail_no_thumbnail_field(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
test_sound: Sound, # Sound without thumbnail
) -> None:
"""Test thumbnail request for sound without thumbnail."""
response = await authenticated_client.get(
f"/api/v1/files/sounds/{test_sound.id}/thumbnail",
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "no thumbnail available" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_get_thumbnail_file_not_found_on_disk(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
test_sound_with_thumbnail: Sound,
) -> None:
"""Test thumbnail request when file doesn't exist on disk."""
with patch("app.core.config.settings") as mock_settings:
# Mock directory that doesn't contain the thumbnail
mock_settings.EXTRACTION_THUMBNAILS_DIR = "/non/existent/directory"
response = await authenticated_client.get(
f"/api/v1/files/sounds/{test_sound_with_thumbnail.id}/thumbnail",
)
assert response.status_code == status.HTTP_404_NOT_FOUND
assert "not found on disk" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_get_thumbnail_unauthorized(
self,
client: AsyncClient,
test_sound_with_thumbnail: Sound,
) -> None:
"""Test thumbnail request without authentication."""
response = await client.get(
f"/api/v1/files/sounds/{test_sound_with_thumbnail.id}/thumbnail",
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_download_normalized_sound(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
test_sound_with_thumbnail: Sound, # This sound is normalized
) -> None:
"""Test downloading a normalized sound returns the normalized file."""
with (
patch("app.api.v1.files.get_sound_file_path") as mock_get_path,
tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file,
):
# Setup mock normalized file
temp_path = Path(temp_file.name)
temp_file.write(b"normalized audio data")
temp_file.close()
mock_get_path.return_value = temp_path
try:
response = await authenticated_client.get(
f"/api/v1/files/sounds/{test_sound_with_thumbnail.id}/download",
)
assert response.status_code == status.HTTP_200_OK
assert response.content == b"normalized audio data"
# Should use normalized filename
mock_get_path.assert_called_once_with(test_sound_with_thumbnail)
finally:
temp_path.unlink(missing_ok=True)

View File

@@ -31,6 +31,9 @@ class TestSoundEndpoints:
"status": "pending",
"error": None,
"sound_id": None,
"user_id": authenticated_user.id,
"created_at": "2025-08-03T12:00:00Z",
"updated_at": "2025-08-03T12:00:00Z",
}
with (
@@ -107,6 +110,9 @@ class TestSoundEndpoints:
"status": "completed",
"error": None,
"sound_id": 42,
"user_id": authenticated_user.id,
"created_at": "2025-08-03T12:00:00Z",
"updated_at": "2025-08-03T12:00:00Z",
}
with patch(
@@ -158,6 +164,9 @@ class TestSoundEndpoints:
"status": "completed",
"error": None,
"sound_id": 42,
"user_id": authenticated_user.id,
"created_at": "2025-08-03T12:00:00Z",
"updated_at": "2025-08-03T12:00:00Z",
},
{
"id": 2,
@@ -168,6 +177,9 @@ class TestSoundEndpoints:
"status": "pending",
"error": None,
"sound_id": None,
"user_id": authenticated_user.id,
"created_at": "2025-08-03T12:00:00Z",
"updated_at": "2025-08-03T12:00:00Z",
},
]

View File

@@ -335,3 +335,5 @@ async def admin_cookies(admin_user: User) -> dict[str, str]:
access_token = JWTUtils.create_access_token(token_data)
return {"access_token": access_token}

View File

@@ -49,7 +49,8 @@ class TestCreditService:
mock_repo.get_by_id.return_value = sample_user
result = await credit_service.check_credits(
1, CreditActionType.VLC_PLAY_SOUND,
1,
CreditActionType.VLC_PLAY_SOUND,
)
assert result is True
@@ -75,7 +76,8 @@ class TestCreditService:
mock_repo.get_by_id.return_value = poor_user
result = await credit_service.check_credits(
1, CreditActionType.VLC_PLAY_SOUND,
1,
CreditActionType.VLC_PLAY_SOUND,
)
assert result is False
@@ -92,7 +94,8 @@ class TestCreditService:
mock_repo.get_by_id.return_value = None
result = await credit_service.check_credits(
999, CreditActionType.VLC_PLAY_SOUND,
999,
CreditActionType.VLC_PLAY_SOUND,
)
assert result is False
@@ -100,7 +103,9 @@ class TestCreditService:
@pytest.mark.asyncio
async def test_validate_and_reserve_credits_success(
self, credit_service, sample_user,
self,
credit_service,
sample_user,
) -> None:
"""Test successful credit validation and reservation."""
mock_session = credit_service.db_session_factory()
@@ -122,7 +127,8 @@ class TestCreditService:
@pytest.mark.asyncio
async def test_validate_and_reserve_credits_insufficient(
self, credit_service,
self,
credit_service,
) -> None:
"""Test credit validation with insufficient credits."""
mock_session = credit_service.db_session_factory()
@@ -152,7 +158,8 @@ class TestCreditService:
@pytest.mark.asyncio
async def test_validate_and_reserve_credits_user_not_found(
self, credit_service,
self,
credit_service,
) -> None:
"""Test credit validation when user is not found."""
mock_session = credit_service.db_session_factory()
@@ -225,7 +232,9 @@ class TestCreditService:
@pytest.mark.asyncio
async def test_deduct_credits_failed_action_requires_success(
self, credit_service, sample_user,
self,
credit_service,
sample_user,
) -> None:
"""Test credit deduction when action failed but requires success."""
mock_session = credit_service.db_session_factory()

View File

@@ -175,7 +175,8 @@ class TestExtractionService:
@pytest.mark.asyncio
async def test_process_extraction_with_service_detection(
self, extraction_service,
self,
extraction_service,
) -> None:
"""Test extraction processing with service detection."""
extraction_id = 1
@@ -314,6 +315,7 @@ class TestExtractionService:
result = await extraction_service._create_sound_record(
audio_path,
None, # thumbnail_path
extraction.title,
extraction.service,
extraction.service_id,

View File

@@ -94,6 +94,48 @@ class TestPlayerState:
assert result["type"] == "SDB"
assert result["thumbnail"] == "test.jpg"
assert result["play_count"] == 5
assert result["extract_url"] is None
def test_serialize_sound_with_extraction_url(self) -> None:
"""Test serializing a sound object with extraction URL."""
from app.models.extraction import Extraction
state = PlayerState()
sound = Sound(
id=1,
name="Test Song",
filename="test.mp3",
duration=30000,
size=1024,
type="EXT",
thumbnail="test.jpg",
play_count=5,
)
# Mock extraction relationship
extraction = Extraction(
id=1,
url="https://www.youtube.com/watch?v=test",
service="youtube",
service_id="test",
title="Test Song",
status="completed",
user_id=1,
sound_id=1,
)
sound.extractions = [extraction]
result = state._serialize_sound(sound)
assert result["id"] == 1
assert result["name"] == "Test Song"
assert result["filename"] == "test.mp3"
assert result["duration"] == 30000
assert result["size"] == 1024
assert result["type"] == "EXT"
assert result["thumbnail"] == "test.jpg"
assert result["play_count"] == 5
assert result["extract_url"] == "https://www.youtube.com/watch?v=test"
def test_serialize_sound_with_none(self) -> None:
"""Test serializing None sound."""
@@ -132,13 +174,18 @@ class TestPlayerService:
@pytest.fixture
def player_service(
self, mock_db_session_factory, mock_vlc_instance, mock_socket_manager,
self,
mock_db_session_factory,
mock_vlc_instance,
mock_socket_manager,
):
"""Create a player service instance for testing."""
return PlayerService(mock_db_session_factory)
def test_init_creates_player_service(
self, mock_db_session_factory, mock_vlc_instance,
self,
mock_db_session_factory,
mock_vlc_instance,
) -> None:
"""Test that player service initializes correctly."""
with patch("app.services.player.socket_manager"):
@@ -157,7 +204,9 @@ class TestPlayerService:
@pytest.mark.asyncio
async def test_start_initializes_service(
self, player_service, mock_vlc_instance,
self,
player_service,
mock_vlc_instance,
) -> None:
"""Test that start method initializes the service."""
with patch.object(player_service, "reload_playlist", new_callable=AsyncMock):
@@ -204,7 +253,9 @@ class TestPlayerService:
mock_path.return_value = mock_file_path
with patch.object(
player_service, "_broadcast_state", new_callable=AsyncMock,
player_service,
"_broadcast_state",
new_callable=AsyncMock,
):
mock_media = Mock()
player_service._vlc_instance.media_new.return_value = mock_media
@@ -261,7 +312,9 @@ class TestPlayerService:
player_service.state.status = PlayerStatus.STOPPED
with patch.object(
player_service, "_broadcast_state", new_callable=AsyncMock,
player_service,
"_broadcast_state",
new_callable=AsyncMock,
) as mock_broadcast:
await player_service.pause()
@@ -275,10 +328,14 @@ class TestPlayerService:
player_service.state.current_sound_position = 5000
with patch.object(
player_service, "_process_play_count", new_callable=AsyncMock,
player_service,
"_process_play_count",
new_callable=AsyncMock,
):
with patch.object(
player_service, "_broadcast_state", new_callable=AsyncMock,
player_service,
"_broadcast_state",
new_callable=AsyncMock,
):
await player_service.stop_playback()
@@ -329,7 +386,9 @@ class TestPlayerService:
player_service.state.status = PlayerStatus.STOPPED
with patch.object(
player_service, "_broadcast_state", new_callable=AsyncMock,
player_service,
"_broadcast_state",
new_callable=AsyncMock,
) as mock_broadcast:
await player_service.seek(15000)
@@ -391,7 +450,9 @@ class TestPlayerService:
mock_repo.get_playlist_sounds.return_value = mock_sounds
with patch.object(
player_service, "_broadcast_state", new_callable=AsyncMock,
player_service,
"_broadcast_state",
new_callable=AsyncMock,
):
await player_service.reload_playlist()
@@ -415,7 +476,9 @@ class TestPlayerService:
sounds = [sound1, sound2]
with patch.object(
player_service, "_stop_playback", new_callable=AsyncMock,
player_service,
"_stop_playback",
new_callable=AsyncMock,
) as mock_stop:
await player_service._handle_playlist_id_changed(1, 2, sounds)
@@ -427,13 +490,16 @@ class TestPlayerService:
@pytest.mark.asyncio
async def test_handle_playlist_id_changed_empty_playlist(
self, player_service,
self,
player_service,
) -> None:
"""Test handling playlist ID change with empty playlist."""
player_service.state.status = PlayerStatus.PLAYING
with patch.object(
player_service, "_stop_playback", new_callable=AsyncMock,
player_service,
"_stop_playback",
new_callable=AsyncMock,
) as mock_stop:
await player_service._handle_playlist_id_changed(1, 2, [])
@@ -444,7 +510,8 @@ class TestPlayerService:
@pytest.mark.asyncio
async def test_handle_same_playlist_track_exists_same_index(
self, player_service,
self,
player_service,
) -> None:
"""Test handling same playlist when track exists at same index."""
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
@@ -461,7 +528,8 @@ class TestPlayerService:
@pytest.mark.asyncio
async def test_handle_same_playlist_track_exists_different_index(
self, player_service,
self,
player_service,
) -> None:
"""Test handling same playlist when track exists at different index."""
sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
@@ -483,7 +551,9 @@ class TestPlayerService:
sounds = [sound1, sound2] # Track with ID 1 is missing
with patch.object(
player_service, "_handle_track_removed", new_callable=AsyncMock,
player_service,
"_handle_track_removed",
new_callable=AsyncMock,
) as mock_removed:
await player_service._handle_same_playlist_track_check(1, 0, sounds)
mock_removed.assert_called_once_with(1, sounds)
@@ -496,7 +566,9 @@ class TestPlayerService:
sounds = [sound1]
with patch.object(
player_service, "_stop_playback", new_callable=AsyncMock,
player_service,
"_stop_playback",
new_callable=AsyncMock,
) as mock_stop:
await player_service._handle_track_removed(1, sounds)
@@ -511,7 +583,9 @@ class TestPlayerService:
player_service.state.status = PlayerStatus.PLAYING
with patch.object(
player_service, "_stop_playback", new_callable=AsyncMock,
player_service,
"_stop_playback",
new_callable=AsyncMock,
) as mock_stop:
await player_service._handle_track_removed(1, [])
@@ -609,10 +683,14 @@ class TestPlayerService:
mock_repo.get_playlist_sounds.return_value = mock_sounds
with patch.object(
player_service, "_stop_playback", new_callable=AsyncMock,
player_service,
"_stop_playback",
new_callable=AsyncMock,
) as mock_stop:
with patch.object(
player_service, "_broadcast_state", new_callable=AsyncMock,
player_service,
"_broadcast_state",
new_callable=AsyncMock,
):
await player_service.reload_playlist()
@@ -652,7 +730,9 @@ class TestPlayerService:
mock_repo.get_playlist_sounds.return_value = mock_sounds
with patch.object(
player_service, "_broadcast_state", new_callable=AsyncMock,
player_service,
"_broadcast_state",
new_callable=AsyncMock,
):
await player_service.reload_playlist()

View File

@@ -270,7 +270,9 @@ class TestSocketManager:
@pytest.mark.asyncio
async def test_disconnect_handler_unknown_socket(
self, socket_manager, mock_sio,
self,
socket_manager,
mock_sio,
) -> None:
"""Test disconnect handler with unknown socket."""
# Access the disconnect handler directly

View File

@@ -155,7 +155,8 @@ class TestSoundNormalizerService:
@pytest.mark.asyncio
async def test_normalize_sound_force_already_normalized(
self, normalizer_service,
self,
normalizer_service,
) -> None:
"""Test force normalizing a sound that's already normalized."""
sound = Sound(
@@ -284,7 +285,8 @@ class TestSoundNormalizerService:
@pytest.mark.asyncio
async def test_normalize_sound_normalization_error(
self, normalizer_service,
self,
normalizer_service,
) -> None:
"""Test handling normalization errors."""
sound = Sound(

View File

@@ -185,7 +185,9 @@ class TestVLCPlayerService:
@pytest.mark.asyncio
@patch("app.services.vlc_player.asyncio.create_subprocess_exec")
async def test_stop_all_vlc_instances_success(
self, mock_subprocess, vlc_service,
self,
mock_subprocess,
vlc_service,
) -> None:
"""Test successful stopping of all VLC instances."""
# Mock pgrep process (find VLC processes)
@@ -281,7 +283,9 @@ class TestVLCPlayerService:
@pytest.mark.asyncio
@patch("app.services.vlc_player.asyncio.create_subprocess_exec")
async def test_stop_all_vlc_instances_error(
self, mock_subprocess, vlc_service,
self,
mock_subprocess,
vlc_service,
) -> None:
"""Test stopping VLC instances when an error occurs."""
# Mock subprocess exception
@@ -341,10 +345,12 @@ class TestVLCPlayerService:
mock_user_repo = AsyncMock()
with patch(
"app.services.vlc_player.SoundRepository", return_value=mock_sound_repo,
"app.services.vlc_player.SoundRepository",
return_value=mock_sound_repo,
):
with patch(
"app.services.vlc_player.UserRepository", return_value=mock_user_repo,
"app.services.vlc_player.UserRepository",
return_value=mock_user_repo,
):
with patch("app.services.vlc_player.socket_manager") as mock_socket:
# Mock the file path utility
@@ -424,10 +430,12 @@ class TestVLCPlayerService:
)
with patch(
"app.services.vlc_player.SoundRepository", return_value=mock_sound_repo,
"app.services.vlc_player.SoundRepository",
return_value=mock_sound_repo,
):
with patch(
"app.services.vlc_player.UserRepository", return_value=mock_user_repo,
"app.services.vlc_player.UserRepository",
return_value=mock_user_repo,
):
with patch("app.services.vlc_player.socket_manager") as mock_socket:
# Setup mocks
@@ -461,6 +469,7 @@ class TestVLCPlayerService:
"sound_id": 1,
"sound_name": "Test Sound",
"user_id": 1,
"user_name": "Admin User",
"play_count": 1,
},
)
@@ -474,7 +483,8 @@ class TestVLCPlayerService:
@pytest.mark.asyncio
async def test_record_play_count_always_creates_record(
self, vlc_service_with_db,
self,
vlc_service_with_db,
) -> None:
"""Test play count recording always creates a new SoundPlayed record."""
# Mock session and repositories
@@ -503,10 +513,12 @@ class TestVLCPlayerService:
)
with patch(
"app.services.vlc_player.SoundRepository", return_value=mock_sound_repo,
"app.services.vlc_player.SoundRepository",
return_value=mock_sound_repo,
):
with patch(
"app.services.vlc_player.UserRepository", return_value=mock_user_repo,
"app.services.vlc_player.UserRepository",
return_value=mock_user_repo,
):
with patch("app.services.vlc_player.socket_manager") as mock_socket:
# Setup mocks

View File

@@ -28,7 +28,8 @@ class TestRequiresCreditsDecorator:
@pytest.fixture
def credit_service_factory(
self, mock_credit_service: AsyncMock,
self,
mock_credit_service: AsyncMock,
) -> Callable[[], AsyncMock]:
"""Create a credit service factory."""
return lambda: mock_credit_service
@@ -98,7 +99,9 @@ class TestRequiresCreditsDecorator:
@pytest.mark.asyncio
async def test_decorator_failed_action(
self, credit_service_factory, mock_credit_service,
self,
credit_service_factory,
mock_credit_service,
) -> None:
"""Test decorator with failed action."""
@@ -122,7 +125,9 @@ class TestRequiresCreditsDecorator:
@pytest.mark.asyncio
async def test_decorator_exception_in_action(
self, credit_service_factory, mock_credit_service,
self,
credit_service_factory,
mock_credit_service,
) -> None:
"""Test decorator when action raises exception."""
@@ -147,7 +152,9 @@ class TestRequiresCreditsDecorator:
@pytest.mark.asyncio
async def test_decorator_insufficient_credits(
self, credit_service_factory, mock_credit_service,
self,
credit_service_factory,
mock_credit_service,
) -> None:
"""Test decorator with insufficient credits."""
mock_credit_service.validate_and_reserve_credits.side_effect = (
@@ -170,7 +177,9 @@ class TestRequiresCreditsDecorator:
@pytest.mark.asyncio
async def test_decorator_user_id_in_args(
self, credit_service_factory, mock_credit_service,
self,
credit_service_factory,
mock_credit_service,
) -> None:
"""Test decorator extracting user_id from positional args."""
@@ -218,14 +227,17 @@ class TestValidateCreditsOnlyDecorator:
@pytest.fixture
def credit_service_factory(
self, mock_credit_service: AsyncMock,
self,
mock_credit_service: AsyncMock,
) -> Callable[[], AsyncMock]:
"""Create a credit service factory."""
return lambda: mock_credit_service
@pytest.mark.asyncio
async def test_validate_only_decorator(
self, credit_service_factory, mock_credit_service,
self,
credit_service_factory,
mock_credit_service,
) -> None:
"""Test validate_credits_only decorator."""