- Updated ExtractionInfo to include user_id, created_at, and updated_at fields. - Modified ExtractionService to return user and timestamp information in extraction responses. - Enhanced sound serialization in PlayerState to include extraction URL if available. - Adjusted PlaylistRepository to load sound extractions when retrieving playlist sounds. - Added tests for new fields in extraction and sound endpoints, ensuring proper response structure. - Created new test file endpoints for sound downloads and thumbnail retrievals, including success and error cases. - Refactored various test cases for consistency and clarity, ensuring proper mocking and assertions.
154 lines
4.9 KiB
Python
154 lines
4.9 KiB
Python
"""File serving API endpoints for audio files and thumbnails."""
|
|
|
|
import mimetypes
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.responses import FileResponse
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import get_db
|
|
from app.core.dependencies import get_current_active_user_flexible
|
|
from app.core.logging import get_logger
|
|
from app.models.user import User
|
|
from app.repositories.sound import SoundRepository
|
|
from app.utils.audio import get_sound_file_path
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
router = APIRouter(prefix="/files", tags=["files"])
|
|
|
|
|
|
async def get_sound_repository(
|
|
session: Annotated[AsyncSession, Depends(get_db)],
|
|
) -> SoundRepository:
|
|
"""Get the sound repository."""
|
|
return SoundRepository(session)
|
|
|
|
|
|
@router.get("/sounds/{sound_id}/download")
|
|
async def download_sound(
|
|
sound_id: int,
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
sound_repo: Annotated[SoundRepository, Depends(get_sound_repository)],
|
|
) -> FileResponse:
|
|
"""Download a sound file."""
|
|
try:
|
|
# Get the sound record
|
|
sound = await sound_repo.get_by_id(sound_id)
|
|
if not sound:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Sound with ID {sound_id} not found",
|
|
)
|
|
|
|
# Get the file path using the audio utility
|
|
file_path = get_sound_file_path(sound)
|
|
|
|
# Determine filename based on normalization status
|
|
if sound.is_normalized and sound.normalized_filename:
|
|
filename = sound.normalized_filename
|
|
else:
|
|
filename = sound.filename
|
|
|
|
# Check if file exists
|
|
if not file_path.exists():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Sound file not found on disk",
|
|
)
|
|
|
|
# Get MIME type
|
|
mime_type, _ = mimetypes.guess_type(str(file_path))
|
|
if not mime_type:
|
|
mime_type = "audio/mpeg" # Default to MP3
|
|
|
|
logger.info(
|
|
"Serving sound download: %s (user: %d, sound: %d)",
|
|
filename,
|
|
current_user.id,
|
|
sound_id,
|
|
)
|
|
|
|
return FileResponse(
|
|
path=str(file_path),
|
|
filename=filename,
|
|
media_type=mime_type,
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception("Error serving sound download for sound %d", sound_id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to serve sound file",
|
|
) from e
|
|
|
|
|
|
@router.get("/sounds/{sound_id}/thumbnail")
|
|
async def get_sound_thumbnail(
|
|
sound_id: int,
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
sound_repo: Annotated[SoundRepository, Depends(get_sound_repository)],
|
|
) -> FileResponse:
|
|
"""Get a sound's thumbnail image."""
|
|
try:
|
|
# Get the sound record
|
|
sound = await sound_repo.get_by_id(sound_id)
|
|
if not sound:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Sound with ID {sound_id} not found",
|
|
)
|
|
|
|
# Check if sound has a thumbnail
|
|
if not sound.thumbnail:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="No thumbnail available for this sound",
|
|
)
|
|
|
|
# Get thumbnail file path
|
|
thumbnail_path = Path(settings.EXTRACTION_THUMBNAILS_DIR) / sound.thumbnail
|
|
|
|
# Check if thumbnail file exists
|
|
if not thumbnail_path.exists():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Thumbnail file not found on disk",
|
|
)
|
|
|
|
# Get MIME type
|
|
mime_type, _ = mimetypes.guess_type(str(thumbnail_path))
|
|
if not mime_type:
|
|
mime_type = "image/jpeg" # Default to JPEG
|
|
|
|
logger.debug(
|
|
"Serving thumbnail: %s (user: %d, sound: %d)",
|
|
sound.thumbnail,
|
|
current_user.id,
|
|
sound_id,
|
|
)
|
|
|
|
return FileResponse(
|
|
path=str(thumbnail_path),
|
|
media_type=mime_type,
|
|
headers={
|
|
"Cache-Control": "public, max-age=3600", # Cache for 1 hour
|
|
"Content-Disposition": "inline", # Display inline, not download
|
|
},
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception("Error serving thumbnail for sound %d", sound_id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to serve thumbnail",
|
|
) from e
|