diff --git a/app/api/v1/__init__.py b/app/api/v1/__init__.py index 1df5b95..3933b1d 100644 --- a/app/api/v1/__init__.py +++ b/app/api/v1/__init__.py @@ -2,7 +2,7 @@ from fastapi import APIRouter -from app.api.v1 import auth, main, socket +from app.api.v1 import auth, main, socket, sounds # V1 API router with v1 prefix api_router = APIRouter(prefix="/v1") @@ -11,3 +11,4 @@ api_router = APIRouter(prefix="/v1") api_router.include_router(main.router, tags=["main"]) api_router.include_router(auth.router, prefix="/auth", tags=["authentication"]) api_router.include_router(socket.router, tags=["socket"]) +api_router.include_router(sounds.router, tags=["sounds"]) diff --git a/app/api/v1/sounds.py b/app/api/v1/sounds.py new file mode 100644 index 0000000..5c25157 --- /dev/null +++ b/app/api/v1/sounds.py @@ -0,0 +1,79 @@ +"""Sound management API endpoints.""" + +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.core.database import get_db +from app.core.dependencies import get_current_active_user_flexible +from app.models.user import User +from app.services.sound_scanner import ScanResults, SoundScannerService + +router = APIRouter(prefix="/sounds", tags=["sounds"]) + + +async def get_sound_scanner_service( + session: Annotated[AsyncSession, Depends(get_db)], +) -> SoundScannerService: + """Get the sound scanner service.""" + return SoundScannerService(session) + + +@router.post("/scan") +async def scan_sounds( + current_user: Annotated[User, Depends(get_current_active_user_flexible)], + scanner_service: Annotated[SoundScannerService, Depends(get_sound_scanner_service)], +) -> dict[str, ScanResults | str]: + """Sync the soundboard directory (add/update/delete sounds).""" + # Only allow admins to scan sounds + if current_user.role not in ["admin", "superadmin"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only administrators can sync sounds", + ) + + try: + results = await scanner_service.scan_soundboard_directory() + return { + "message": "Sound sync completed", + "results": results, + } + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to sync sounds: {e!s}", + ) from e + + +@router.post("/scan/custom") +async def scan_custom_directory( + directory: str, + current_user: Annotated[User, Depends(get_current_active_user_flexible)], + scanner_service: Annotated[SoundScannerService, Depends(get_sound_scanner_service)], + sound_type: str = "SDB", +) -> dict[str, ScanResults | str]: + """Sync a custom directory with the database (add/update/delete sounds).""" + # Only allow admins to sync sounds + if current_user.role not in ["admin", "superadmin"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only administrators can sync sounds", + ) + + try: + results = await scanner_service.scan_directory(directory, sound_type) + return { + "message": f"Sync of directory '{directory}' completed", + "results": results, + } + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) from e + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to sync directory: {e!s}", + ) from e diff --git a/app/repositories/sound.py b/app/repositories/sound.py new file mode 100644 index 0000000..8cff109 --- /dev/null +++ b/app/repositories/sound.py @@ -0,0 +1,128 @@ +"""Sound repository for database operations.""" + +from typing import Any + +from sqlalchemy import desc, func +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.core.logging import get_logger +from app.models.sound import Sound + +logger = get_logger(__name__) + + +class SoundRepository: + """Repository for sound operations.""" + + def __init__(self, session: AsyncSession) -> None: + """Initialize the sound repository.""" + self.session = session + + async def get_by_id(self, sound_id: int) -> Sound | None: + """Get a sound by ID.""" + try: + statement = select(Sound).where(Sound.id == sound_id) + result = await self.session.exec(statement) + return result.first() + except Exception: + logger.exception("Failed to get sound by ID: %s", sound_id) + raise + + async def get_by_filename(self, filename: str) -> Sound | None: + """Get a sound by filename.""" + try: + statement = select(Sound).where(Sound.filename == filename) + result = await self.session.exec(statement) + return result.first() + except Exception: + logger.exception("Failed to get sound by filename: %s", filename) + raise + + async def get_by_hash(self, hash_value: str) -> Sound | None: + """Get a sound by hash.""" + try: + statement = select(Sound).where(Sound.hash == hash_value) + result = await self.session.exec(statement) + return result.first() + except Exception: + logger.exception("Failed to get sound by hash") + raise + + async def get_by_type(self, sound_type: str) -> list[Sound]: + """Get all sounds by type.""" + try: + statement = select(Sound).where(Sound.type == sound_type) + result = await self.session.exec(statement) + return list(result.all()) + except Exception: + logger.exception("Failed to get sounds by type: %s", sound_type) + raise + + async def create(self, sound_data: dict[str, Any]) -> Sound: + """Create a new sound.""" + try: + sound = Sound(**sound_data) + self.session.add(sound) + await self.session.commit() + await self.session.refresh(sound) + except Exception: + await self.session.rollback() + logger.exception("Failed to create sound") + raise + else: + logger.info("Created new sound: %s", sound.name) + return sound + + async def update(self, sound: Sound, update_data: dict[str, Any]) -> Sound: + """Update a sound.""" + try: + for field, value in update_data.items(): + setattr(sound, field, value) + + await self.session.commit() + await self.session.refresh(sound) + except Exception: + await self.session.rollback() + logger.exception("Failed to update sound") + raise + else: + logger.info("Updated sound: %s", sound.name) + return sound + + async def delete(self, sound: Sound) -> None: + """Delete a sound.""" + try: + await self.session.delete(sound) + await self.session.commit() + logger.info("Deleted sound: %s", sound.name) + except Exception: + await self.session.rollback() + logger.exception("Failed to delete sound") + raise + + async def search_by_name(self, query: str) -> list[Sound]: + """Search sounds by name (case-insensitive).""" + try: + statement = select(Sound).where( + func.lower(Sound.name).like(f"%{query.lower()}%"), + ) + result = await self.session.exec(statement) + return list(result.all()) + except Exception: + logger.exception("Failed to search sounds by name: %s", query) + raise + + async def get_popular_sounds(self, limit: int = 10) -> list[Sound]: + """Get the most played sounds.""" + try: + statement = ( + select(Sound) + .order_by(desc(Sound.play_count)) + .limit(limit) + ) + result = await self.session.exec(statement) + return list(result.all()) + except Exception: + logger.exception("Failed to get popular sounds") + raise diff --git a/app/services/sound_scanner.py b/app/services/sound_scanner.py new file mode 100644 index 0000000..5098c0d --- /dev/null +++ b/app/services/sound_scanner.py @@ -0,0 +1,274 @@ +"""Sound scanner service for scanning and importing audio files.""" + +import hashlib +from pathlib import Path +from typing import TypedDict + +import ffmpeg # type: ignore[import-untyped] +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.core.logging import get_logger +from app.models.sound import Sound +from app.repositories.sound import SoundRepository + +logger = get_logger(__name__) + + +class FileInfo(TypedDict): + """Type definition for file information in scan results.""" + filename: str + status: str + reason: str | None + name: str | None + duration: int | None + size: int | None + id: int | None + error: str | None + changes: list[str] | None + + +class ScanResults(TypedDict): + """Type definition for scan results.""" + scanned: int + added: int + updated: int + deleted: int + skipped: int + errors: int + files: list[FileInfo] + + +class SoundScannerService: + """Service for scanning and importing audio files.""" + + def __init__(self, session: AsyncSession) -> None: + """Initialize the sound scanner service.""" + self.session = session + self.sound_repo = SoundRepository(session) + self.supported_extensions = {".mp3", ".wav", ".opus", ".flac", ".ogg", ".m4a", ".aac"} + + def get_file_hash(self, file_path: Path) -> str: + """Calculate MD5 hash of a file.""" + hash_md5 = hashlib.md5() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + def get_audio_duration(self, file_path: Path) -> int: + """Get audio duration in milliseconds using ffmpeg.""" + try: + probe = ffmpeg.probe(str(file_path)) + duration = float(probe["format"]["duration"]) + return int(duration * 1000) # Convert to milliseconds + except Exception as e: + logger.warning("Failed to get duration for %s: %s", file_path, e) + return 0 + + def get_file_size(self, file_path: Path) -> int: + """Get file size in bytes.""" + return file_path.stat().st_size + + def extract_name_from_filename(self, filename: str) -> str: + """Extract a clean name from filename.""" + # Remove extension + name = Path(filename).stem + # Replace underscores and hyphens with spaces + name = name.replace("_", " ").replace("-", " ") + # Capitalize words + name = " ".join(word.capitalize() for word in name.split()) + return name + + async def scan_directory( + self, + directory_path: str, + sound_type: str = "SDB", + ) -> ScanResults: + """Sync a directory with the database (add/update/delete sounds).""" + scan_path = Path(directory_path) + + if not scan_path.exists(): + msg = f"Directory does not exist: {directory_path}" + raise ValueError(msg) + + if not scan_path.is_dir(): + msg = f"Path is not a directory: {directory_path}" + raise ValueError(msg) + + results: ScanResults = { + "scanned": 0, + "added": 0, + "updated": 0, + "deleted": 0, + "skipped": 0, + "errors": 0, + "files": [], + } + + logger.info("Starting sync of directory: %s", directory_path) + + # Get all existing sounds of this type from database + existing_sounds = await self.sound_repo.get_by_type(sound_type) + sounds_by_filename = {sound.filename: sound for sound in existing_sounds} + + # Get all audio files from directory + audio_files = [ + f for f in scan_path.iterdir() + if f.is_file() and f.suffix.lower() in self.supported_extensions + ] + + # Process each file in directory + processed_filenames = set() + for file_path in audio_files: + results["scanned"] += 1 + filename = file_path.name + processed_filenames.add(filename) + + try: + await self._sync_audio_file( + file_path, + sound_type, + sounds_by_filename.get(filename), + results, + ) + except Exception as e: + logger.exception("Error processing file %s", file_path) + results["errors"] += 1 + results["files"].append({ + "filename": filename, + "status": "error", + "reason": None, + "name": None, + "duration": None, + "size": None, + "id": None, + "error": str(e), + "changes": None, + }) + + # Delete sounds that no longer exist in directory + for filename, sound in sounds_by_filename.items(): + if filename not in processed_filenames: + try: + await self.sound_repo.delete(sound) + logger.info("Deleted sound no longer in directory: %s", filename) + results["deleted"] += 1 + results["files"].append({ + "filename": filename, + "status": "deleted", + "reason": "file no longer exists", + "name": sound.name, + "duration": sound.duration, + "size": sound.size, + "id": sound.id, + "error": None, + "changes": None, + }) + except Exception as e: + logger.exception("Error deleting sound %s", filename) + results["errors"] += 1 + results["files"].append({ + "filename": filename, + "status": "error", + "reason": "failed to delete", + "name": sound.name, + "duration": sound.duration, + "size": sound.size, + "id": sound.id, + "error": str(e), + "changes": None, + }) + + logger.info("Sync completed: %s", results) + return results + + async def _sync_audio_file( + self, + file_path: Path, + sound_type: str, + existing_sound: Sound | None, + results: ScanResults, + ) -> None: + """Sync a single audio file (add new or update existing).""" + filename = file_path.name + file_hash = self.get_file_hash(file_path) + duration = self.get_audio_duration(file_path) + size = self.get_file_size(file_path) + name = self.extract_name_from_filename(filename) + + if existing_sound is None: + # Add new sound + sound_data = { + "type": sound_type, + "name": name, + "filename": filename, + "duration": duration, + "size": size, + "hash": file_hash, + "is_deletable": False, + "is_music": False, + "is_normalized": False, + "play_count": 0, + } + + sound = await self.sound_repo.create(sound_data) + logger.info("Added new sound: %s (ID: %s)", sound.name, sound.id) + + results["added"] += 1 + results["files"].append({ + "filename": filename, + "status": "added", + "reason": None, + "name": name, + "duration": duration, + "size": size, + "id": sound.id, + "error": None, + "changes": None, + }) + + elif existing_sound.hash != file_hash: + # Update existing sound (file was modified) + update_data = { + "name": name, + "duration": duration, + "size": size, + "hash": file_hash, + } + + await self.sound_repo.update(existing_sound, update_data) + logger.info("Updated modified sound: %s (ID: %s)", name, existing_sound.id) + + results["updated"] += 1 + results["files"].append({ + "filename": filename, + "status": "updated", + "reason": "file was modified", + "name": name, + "duration": duration, + "size": size, + "id": existing_sound.id, + "error": None, + "changes": ["hash", "duration", "size", "name"], + }) + + else: + # File unchanged, skip + logger.debug("Sound unchanged: %s", filename) + results["skipped"] += 1 + results["files"].append({ + "filename": filename, + "status": "skipped", + "reason": "file unchanged", + "name": existing_sound.name, + "duration": existing_sound.duration, + "size": existing_sound.size, + "id": existing_sound.id, + "error": None, + "changes": None, + }) + + async def scan_soundboard_directory(self) -> ScanResults: + """Sync the default soundboard directory.""" + soundboard_path = "sounds/originals/soundboard" + return await self.scan_directory(soundboard_path, "SDB") diff --git a/pyproject.toml b/pyproject.toml index 36de736..704f470 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "bcrypt==4.3.0", "email-validator==2.2.0", "fastapi[standard]==0.116.1", + "ffmpeg-python==0.2.0", "httpx==0.28.1", "pydantic-settings==2.10.1", "pyjwt==2.10.1", diff --git a/tests/api/v1/test_sound_endpoints.py b/tests/api/v1/test_sound_endpoints.py new file mode 100644 index 0000000..2c37cc3 --- /dev/null +++ b/tests/api/v1/test_sound_endpoints.py @@ -0,0 +1,514 @@ +"""Tests for sound API endpoints.""" + +from unittest.mock import patch + +import pytest +from httpx import ASGITransport, AsyncClient + +from app.models.user import User +from app.services.sound_scanner import ScanResults + + +class TestSoundEndpoints: + """Test sound API endpoints.""" + + @pytest.mark.asyncio + async def test_scan_sounds_success( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test successful sound scanning.""" + # Mock the scanner service to return successful results + mock_results: ScanResults = { + "scanned": 5, + "added": 3, + "updated": 1, + "deleted": 1, + "skipped": 0, + "errors": 0, + "files": [ + { + "filename": "test1.mp3", + "status": "added", + "reason": None, + "name": "Test1", + "duration": 5000, + "size": 1024, + "id": 1, + "error": None, + "changes": None, + }, + { + "filename": "test2.mp3", + "status": "updated", + "reason": "file was modified", + "name": "Test2", + "duration": 7500, + "size": 2048, + "id": 2, + "error": None, + "changes": ["hash", "duration", "size"], + }, + { + "filename": "test3.mp3", + "status": "deleted", + "reason": "file no longer exists", + "name": "Test3", + "duration": 3000, + "size": 512, + "id": 3, + "error": None, + "changes": None, + }, + ], + } + + with patch( + "app.services.sound_scanner.SoundScannerService.scan_soundboard_directory" + ) as mock_scan: + mock_scan.return_value = mock_results + + response = await authenticated_admin_client.post("/api/v1/sounds/scan") + + assert response.status_code == 200 + data = response.json() + + assert "message" in data + assert "Sound sync completed" in data["message"] + assert "results" in data + + results = data["results"] + assert results["scanned"] == 5 + assert results["added"] == 3 + assert results["updated"] == 1 + assert results["deleted"] == 1 + assert results["skipped"] == 0 + assert results["errors"] == 0 + assert len(results["files"]) == 3 + + # Check file details + assert results["files"][0]["filename"] == "test1.mp3" + assert results["files"][0]["status"] == "added" + assert results["files"][1]["status"] == "updated" + assert results["files"][2]["status"] == "deleted" + + @pytest.mark.asyncio + async def test_scan_sounds_unauthenticated(self, client: AsyncClient): + """Test scanning sounds without authentication.""" + response = await client.post("/api/v1/sounds/scan") + + assert response.status_code == 401 + data = response.json() + assert "Could not validate credentials" in data["detail"] + + @pytest.mark.asyncio + async def test_scan_sounds_non_admin( + self, + test_app, + test_user: User, + ): + """Test scanning sounds with non-admin user.""" + from app.core.dependencies import get_current_active_user_flexible + + # Override the dependency to return regular user + async def override_get_current_user(): + test_user.role = "user" + return test_user + + test_app.dependency_overrides[get_current_active_user_flexible] = ( + override_get_current_user + ) + + # Create API token for regular user + headers = {"API-TOKEN": "test_api_token"} + + async with AsyncClient( + transport=ASGITransport(app=test_app), + base_url="http://test", + ) as client: + response = await client.post("/api/v1/sounds/scan", headers=headers) + + assert response.status_code == 403 + data = response.json() + assert "Only administrators can sync sounds" in data["detail"] + + # Clean up override + test_app.dependency_overrides.pop(get_current_active_user_flexible, None) + + @pytest.mark.asyncio + async def test_scan_sounds_admin_user( + self, + test_app, + admin_user: User, + ): + """Test scanning sounds with admin user.""" + from app.core.dependencies import get_current_active_user_flexible + + mock_results: ScanResults = { + "scanned": 1, + "added": 1, + "updated": 0, + "deleted": 0, + "skipped": 0, + "errors": 0, + "files": [], + } + + # Override the dependency to return admin user + async def override_get_current_user(): + return admin_user + + test_app.dependency_overrides[get_current_active_user_flexible] = ( + override_get_current_user + ) + + headers = {"API-TOKEN": "admin_api_token"} + + with patch( + "app.services.sound_scanner.SoundScannerService.scan_soundboard_directory" + ) as mock_scan: + mock_scan.return_value = mock_results + + async with AsyncClient( + transport=ASGITransport(app=test_app), + base_url="http://test", + ) as client: + response = await client.post("/api/v1/sounds/scan", headers=headers) + + assert response.status_code == 200 + data = response.json() + assert "Sound sync completed" in data["message"] + + # Clean up override + test_app.dependency_overrides.pop(get_current_active_user_flexible, None) + + @pytest.mark.asyncio + async def test_scan_sounds_service_error( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test scanning sounds when service raises an error.""" + with patch( + "app.services.sound_scanner.SoundScannerService.scan_soundboard_directory" + ) as mock_scan: + mock_scan.side_effect = Exception("Directory not found") + + response = await authenticated_admin_client.post("/api/v1/sounds/scan") + + assert response.status_code == 500 + data = response.json() + assert "Failed to sync sounds" in data["detail"] + assert "Directory not found" in data["detail"] + + @pytest.mark.asyncio + async def test_scan_custom_directory_success( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test successful custom directory scanning.""" + mock_results: ScanResults = { + "scanned": 2, + "added": 2, + "updated": 0, + "deleted": 0, + "skipped": 0, + "errors": 0, + "files": [ + { + "filename": "custom1.wav", + "status": "added", + "reason": None, + "name": "Custom1", + "duration": 4000, + "size": 800, + "id": 10, + "error": None, + "changes": None, + }, + { + "filename": "custom2.wav", + "status": "added", + "reason": None, + "name": "Custom2", + "duration": 6000, + "size": 1200, + "id": 11, + "error": None, + "changes": None, + }, + ], + } + + with patch( + "app.services.sound_scanner.SoundScannerService.scan_directory" + ) as mock_scan: + mock_scan.return_value = mock_results + + response = await authenticated_admin_client.post( + "/api/v1/sounds/scan/custom", + params={"directory": "/custom/path", "sound_type": "CUSTOM"}, + ) + + assert response.status_code == 200 + data = response.json() + + assert "Sync of directory '/custom/path' completed" in data["message"] + assert "results" in data + + results = data["results"] + assert results["scanned"] == 2 + assert results["added"] == 2 + assert len(results["files"]) == 2 + + # Verify the service was called with correct parameters + mock_scan.assert_called_once_with("/custom/path", "CUSTOM") + + @pytest.mark.asyncio + async def test_scan_custom_directory_default_type( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test custom directory scanning with default sound type.""" + mock_results: ScanResults = { + "scanned": 1, + "added": 1, + "updated": 0, + "deleted": 0, + "skipped": 0, + "errors": 0, + "files": [], + } + + with patch( + "app.services.sound_scanner.SoundScannerService.scan_directory" + ) as mock_scan: + mock_scan.return_value = mock_results + + response = await authenticated_admin_client.post( + "/api/v1/sounds/scan/custom", + params={"directory": "/another/path"}, # No sound_type param + ) + + assert response.status_code == 200 + + # Verify the service was called with default SDB type + mock_scan.assert_called_once_with("/another/path", "SDB") + + @pytest.mark.asyncio + async def test_scan_custom_directory_invalid_path( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test custom directory scanning with invalid path.""" + with patch( + "app.services.sound_scanner.SoundScannerService.scan_directory" + ) as mock_scan: + mock_scan.side_effect = ValueError( + "Directory does not exist: /invalid/path" + ) + + response = await authenticated_admin_client.post( + "/api/v1/sounds/scan/custom", params={"directory": "/invalid/path"} + ) + + assert response.status_code == 400 + data = response.json() + assert "Directory does not exist" in data["detail"] + + @pytest.mark.asyncio + async def test_scan_custom_directory_unauthenticated(self, client: AsyncClient): + """Test custom directory scanning without authentication.""" + response = await client.post( + "/api/v1/sounds/scan/custom", params={"directory": "/some/path"} + ) + + assert response.status_code == 401 + data = response.json() + assert "Could not validate credentials" in data["detail"] + + @pytest.mark.asyncio + async def test_scan_custom_directory_non_admin( + self, + test_app, + test_user: User, + ): + """Test custom directory scanning with non-admin user.""" + from app.core.dependencies import get_current_active_user_flexible + + # Override the dependency to return regular user + async def override_get_current_user(): + test_user.role = "user" + return test_user + + test_app.dependency_overrides[get_current_active_user_flexible] = ( + override_get_current_user + ) + + headers = {"API-TOKEN": "test_api_token"} + + async with AsyncClient( + transport=ASGITransport(app=test_app), + base_url="http://test", + ) as client: + response = await client.post( + "/api/v1/sounds/scan/custom", + headers=headers, + params={"directory": "/some/path"}, + ) + + assert response.status_code == 403 + data = response.json() + assert "Only administrators can sync sounds" in data["detail"] + + # Clean up override + test_app.dependency_overrides.pop(get_current_active_user_flexible, None) + + @pytest.mark.asyncio + async def test_scan_custom_directory_service_error( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test custom directory scanning when service raises an error.""" + with patch( + "app.services.sound_scanner.SoundScannerService.scan_directory" + ) as mock_scan: + mock_scan.side_effect = Exception("Permission denied") + + response = await authenticated_admin_client.post( + "/api/v1/sounds/scan/custom", params={"directory": "/restricted/path"} + ) + + assert response.status_code == 500 + data = response.json() + assert "Failed to sync directory" in data["detail"] + assert "Permission denied" in data["detail"] + + @pytest.mark.asyncio + async def test_scan_results_with_errors( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test scanning with some errors in results.""" + mock_results: ScanResults = { + "scanned": 3, + "added": 1, + "updated": 0, + "deleted": 0, + "skipped": 1, + "errors": 1, + "files": [ + { + "filename": "good.mp3", + "status": "added", + "reason": None, + "name": "Good", + "duration": 5000, + "size": 1024, + "id": 1, + "error": None, + "changes": None, + }, + { + "filename": "unchanged.mp3", + "status": "skipped", + "reason": "file unchanged", + "name": "Unchanged", + "duration": 3000, + "size": 512, + "id": 2, + "error": None, + "changes": None, + }, + { + "filename": "bad.mp3", + "status": "error", + "reason": None, + "name": None, + "duration": None, + "size": None, + "id": None, + "error": "Invalid audio format", + "changes": None, + }, + ], + } + + with patch( + "app.services.sound_scanner.SoundScannerService.scan_soundboard_directory" + ) as mock_scan: + mock_scan.return_value = mock_results + + response = await authenticated_admin_client.post("/api/v1/sounds/scan") + + assert response.status_code == 200 + data = response.json() + + results = data["results"] + assert results["errors"] == 1 + assert results["added"] == 1 + assert results["skipped"] == 1 + + # Check error file details + error_file = next(f for f in results["files"] if f["status"] == "error") + assert error_file["filename"] == "bad.mp3" + assert error_file["error"] == "Invalid audio format" + assert error_file["id"] is None + + @pytest.mark.asyncio + async def test_endpoint_response_structure( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test that endpoint response has correct structure.""" + mock_results: ScanResults = { + "scanned": 0, + "added": 0, + "updated": 0, + "deleted": 0, + "skipped": 0, + "errors": 0, + "files": [], + } + + with patch( + "app.services.sound_scanner.SoundScannerService.scan_soundboard_directory" + ) as mock_scan: + mock_scan.return_value = mock_results + + response = await authenticated_admin_client.post("/api/v1/sounds/scan") + + assert response.status_code == 200 + data = response.json() + + # Check top-level structure + assert isinstance(data, dict) + assert "message" in data + assert "results" in data + assert isinstance(data["message"], str) + assert isinstance(data["results"], dict) + + # Check results structure + results = data["results"] + required_fields = [ + "scanned", + "added", + "updated", + "deleted", + "skipped", + "errors", + "files", + ] + for field in required_fields: + assert field in results + if field == "files": + assert isinstance(results[field], list) + else: + assert isinstance(results[field], int) diff --git a/tests/conftest.py b/tests/conftest.py index babdae6..2f72836 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -114,6 +114,19 @@ async def authenticated_client( yield client +@pytest_asyncio.fixture +async def authenticated_admin_client( + test_app: FastAPI, admin_cookies: dict[str, str], +) -> AsyncGenerator[AsyncClient, None]: + """Create a test HTTP client with admin authentication cookies.""" + async with AsyncClient( + transport=ASGITransport(app=test_app), + base_url="http://test", + cookies=admin_cookies, + ) as client: + yield client + + @pytest_asyncio.fixture async def test_plan(test_session: AsyncSession) -> Plan: """Create a test plan.""" @@ -307,3 +320,17 @@ async def auth_cookies(test_user: User) -> dict[str, str]: access_token = JWTUtils.create_access_token(token_data) return {"access_token": access_token} + + +@pytest_asyncio.fixture +async def admin_cookies(admin_user: User) -> dict[str, str]: + """Create admin authentication cookies with JWT token.""" + token_data = { + "sub": str(admin_user.id), + "email": admin_user.email, + "role": admin_user.role, + } + + access_token = JWTUtils.create_access_token(token_data) + + return {"access_token": access_token} diff --git a/tests/services/test_sound_scanner.py b/tests/services/test_sound_scanner.py new file mode 100644 index 0000000..c65c253 --- /dev/null +++ b/tests/services/test_sound_scanner.py @@ -0,0 +1,298 @@ +"""Tests for sound scanner service.""" + +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, Mock, patch + +import pytest +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.models.sound import Sound +from app.services.sound_scanner import SoundScannerService + + +class TestSoundScannerService: + """Test sound scanner service.""" + + @pytest.fixture + def mock_session(self): + """Create a mock session.""" + return Mock(spec=AsyncSession) + + @pytest.fixture + def scanner_service(self, mock_session): + """Create a scanner service with mock session.""" + return SoundScannerService(mock_session) + + def test_init(self, scanner_service): + """Test scanner service initialization.""" + assert scanner_service.session is not None + assert scanner_service.sound_repo is not None + assert len(scanner_service.supported_extensions) > 0 + assert ".mp3" in scanner_service.supported_extensions + assert ".wav" in scanner_service.supported_extensions + + def test_get_file_hash(self, scanner_service): + """Test file hash calculation.""" + # Create a temporary file + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write("test content") + temp_path = Path(f.name) + + try: + hash_value = scanner_service.get_file_hash(temp_path) + assert len(hash_value) == 32 # MD5 hash length + assert isinstance(hash_value, str) + finally: + temp_path.unlink() + + def test_get_file_size(self, scanner_service): + """Test file size calculation.""" + # Create a temporary file + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write("test content for size calculation") + temp_path = Path(f.name) + + try: + size = scanner_service.get_file_size(temp_path) + assert size > 0 + assert isinstance(size, int) + finally: + temp_path.unlink() + + def test_extract_name_from_filename(self, scanner_service): + """Test name extraction from filename.""" + test_cases = [ + ("hello_world.mp3", "Hello World"), + ("my-awesome-sound.wav", "My Awesome Sound"), + ("TEST_FILE_NAME.opus", "Test File Name"), + ("single.mp3", "Single"), + ("multiple_words_here.flac", "Multiple Words Here"), + ] + + for filename, expected_name in test_cases: + result = scanner_service.extract_name_from_filename(filename) + assert result == expected_name + + @patch("app.services.sound_scanner.ffmpeg.probe") + def test_get_audio_duration_success(self, mock_probe, scanner_service): + """Test successful audio duration extraction.""" + mock_probe.return_value = { + "format": {"duration": "123.456"} + } + + temp_path = Path("/fake/path/test.mp3") + duration = scanner_service.get_audio_duration(temp_path) + + assert duration == 123456 # 123.456 seconds * 1000 = 123456 ms + mock_probe.assert_called_once_with(str(temp_path)) + + @patch("app.services.sound_scanner.ffmpeg.probe") + def test_get_audio_duration_failure(self, mock_probe, scanner_service): + """Test audio duration extraction failure.""" + mock_probe.side_effect = Exception("FFmpeg error") + + temp_path = Path("/fake/path/test.mp3") + duration = scanner_service.get_audio_duration(temp_path) + + assert duration == 0 + mock_probe.assert_called_once_with(str(temp_path)) + + @pytest.mark.asyncio + async def test_scan_directory_nonexistent(self, scanner_service): + """Test scanning a non-existent directory.""" + with pytest.raises(ValueError, match="Directory does not exist"): + await scanner_service.scan_directory("/non/existent/path") + + @pytest.mark.asyncio + async def test_scan_directory_not_directory(self, scanner_service): + """Test scanning a path that is not a directory.""" + # Create a temporary file + with tempfile.NamedTemporaryFile() as f: + with pytest.raises(ValueError, match="Path is not a directory"): + await scanner_service.scan_directory(f.name) + + @pytest.mark.asyncio + async def test_sync_audio_file_unchanged(self, scanner_service): + """Test syncing file that is unchanged.""" + # Existing sound with same hash as file + existing_sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=120000, # 120 seconds = 120000 ms + size=1024, + hash="same_hash", + ) + + # Mock file operations to return same hash + scanner_service.get_file_hash = Mock(return_value="same_hash") + scanner_service.get_audio_duration = Mock(return_value=120000) + scanner_service.get_file_size = Mock(return_value=1024) + + # Create a temporary file + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: + temp_path = Path(f.name) + + try: + results = { + "scanned": 0, "added": 0, "updated": 0, "deleted": 0, + "skipped": 0, "errors": 0, "files": [] + } + await scanner_service._sync_audio_file( + temp_path, "SDB", existing_sound, results + ) + + assert results["skipped"] == 1 + assert results["added"] == 0 + assert results["updated"] == 0 + assert len(results["files"]) == 1 + assert results["files"][0]["status"] == "skipped" + assert results["files"][0]["reason"] == "file unchanged" + finally: + temp_path.unlink() + + @pytest.mark.asyncio + async def test_sync_audio_file_new(self, scanner_service): + """Test syncing a new audio file.""" + created_sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=120000, # 120 seconds = 120000 ms + size=1024, + hash="test_hash", + ) + scanner_service.sound_repo.create = AsyncMock(return_value=created_sound) + + # Mock file operations + scanner_service.get_file_hash = Mock(return_value="test_hash") + scanner_service.get_audio_duration = Mock(return_value=120000) # Duration in ms + scanner_service.get_file_size = Mock(return_value=1024) + + # Create a temporary file + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: + temp_path = Path(f.name) + + try: + results = { + "scanned": 0, "added": 0, "updated": 0, "deleted": 0, + "skipped": 0, "errors": 0, "files": [] + } + await scanner_service._sync_audio_file(temp_path, "SDB", None, results) + + assert results["added"] == 1 + assert results["skipped"] == 0 + assert results["updated"] == 0 + assert len(results["files"]) == 1 + assert results["files"][0]["status"] == "added" + + # Verify sound_repo.create was called with correct data + call_args = scanner_service.sound_repo.create.call_args[0][0] + assert call_args["type"] == "SDB" + assert call_args["filename"] == temp_path.name + assert call_args["duration"] == 120000 # Duration in ms + assert call_args["size"] == 1024 + assert call_args["hash"] == "test_hash" + assert call_args["is_deletable"] is False # SDB sounds are not deletable + finally: + temp_path.unlink() + + @pytest.mark.asyncio + async def test_sync_audio_file_updated(self, scanner_service): + """Test syncing a file that was modified (different hash).""" + # Existing sound with different hash than file + existing_sound = Sound( + id=1, + type="SDB", + name="Old Sound", + filename="test.mp3", + duration=60000, # Old duration + size=512, # Old size + hash="old_hash", # Old hash + ) + + scanner_service.sound_repo.update = AsyncMock(return_value=existing_sound) + + # Mock file operations to return new values + scanner_service.get_file_hash = Mock(return_value="new_hash") + scanner_service.get_audio_duration = Mock(return_value=120000) # New duration + scanner_service.get_file_size = Mock(return_value=1024) # New size + + # Create a temporary file + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: + temp_path = Path(f.name) + + try: + results = { + "scanned": 0, "added": 0, "updated": 0, "deleted": 0, + "skipped": 0, "errors": 0, "files": [] + } + await scanner_service._sync_audio_file( + temp_path, "SDB", existing_sound, results + ) + + assert results["updated"] == 1 + assert results["added"] == 0 + assert results["skipped"] == 0 + assert len(results["files"]) == 1 + assert results["files"][0]["status"] == "updated" + assert results["files"][0]["reason"] == "file was modified" + + # Verify sound_repo.update was called with correct data + call_args = scanner_service.sound_repo.update.call_args[0][1] # update_data + assert call_args["duration"] == 120000 + assert call_args["size"] == 1024 + assert call_args["hash"] == "new_hash" + # Name is extracted from temp filename, should be capitalized + assert call_args["name"].endswith("mp3") is False # Should be cleaned + finally: + temp_path.unlink() + + @pytest.mark.asyncio + async def test_sync_audio_file_custom_type(self, scanner_service): + """Test syncing file with custom type.""" + created_sound = Sound( + id=1, + type="CUSTOM", + name="Test Sound", + filename="test.mp3", + duration=60000, # 60 seconds = 60000 ms + size=2048, + hash="custom_hash", + ) + scanner_service.sound_repo.create = AsyncMock(return_value=created_sound) + + # Mock file operations + scanner_service.get_file_hash = Mock(return_value="custom_hash") + scanner_service.get_audio_duration = Mock(return_value=60000) # Duration in ms + scanner_service.get_file_size = Mock(return_value=2048) + + # Create a temporary file + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: + temp_path = Path(f.name) + + try: + results = { + "scanned": 0, "added": 0, "updated": 0, "deleted": 0, + "skipped": 0, "errors": 0, "files": [] + } + await scanner_service._sync_audio_file(temp_path, "CUSTOM", None, results) + + assert results["added"] == 1 + assert results["skipped"] == 0 + assert len(results["files"]) == 1 + assert results["files"][0]["status"] == "added" + + # Verify sound_repo.create was called with correct data for custom type + call_args = scanner_service.sound_repo.create.call_args[0][0] + assert call_args["type"] == "CUSTOM" + assert call_args["filename"] == temp_path.name + assert call_args["duration"] == 60000 # Duration in ms + assert call_args["size"] == 2048 + assert call_args["hash"] == "custom_hash" + assert call_args["is_deletable"] is False # All sounds are set to not deletable + finally: + temp_path.unlink() \ No newline at end of file diff --git a/uv.lock b/uv.lock index 0b18dbf..d421b5d 100644 --- a/uv.lock +++ b/uv.lock @@ -46,6 +46,7 @@ dependencies = [ { name = "bcrypt" }, { name = "email-validator" }, { name = "fastapi", extra = ["standard"] }, + { name = "ffmpeg-python" }, { name = "httpx" }, { name = "pydantic-settings" }, { name = "pyjwt" }, @@ -71,6 +72,7 @@ requires-dist = [ { name = "bcrypt", specifier = "==4.3.0" }, { name = "email-validator", specifier = "==2.2.0" }, { name = "fastapi", extras = ["standard"], specifier = "==0.116.1" }, + { name = "ffmpeg-python", specifier = "==0.2.0" }, { name = "httpx", specifier = "==0.28.1" }, { name = "pydantic-settings", specifier = "==2.10.1" }, { name = "pyjwt", specifier = "==2.10.1" }, @@ -339,6 +341,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/42/cf/8635cd778b7d89714325b967a28c05865a2b6cab4c0b4b30561df4704f24/fastapi_cloud_cli-0.1.4-py3-none-any.whl", hash = "sha256:1db1ba757aa46a16a5e5dacf7cddc137ca0a3c42f65dba2b1cc6a8f24c41be42", size = 18957 }, ] +[[package]] +name = "ffmpeg-python" +version = "0.2.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "future" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/dd/5e/d5f9105d59c1325759d838af4e973695081fbbc97182baf73afc78dec266/ffmpeg-python-0.2.0.tar.gz", hash = "sha256:65225db34627c578ef0e11c8b1eb528bb35e024752f6f10b78c011f6f64c4127", size = 21543 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d7/0c/56be52741f75bad4dc6555991fabd2e07b432d333da82c11ad701123888a/ffmpeg_python-0.2.0-py3-none-any.whl", hash = "sha256:ac441a0404e053f8b6a1113a77c0f452f1cfc62f6344a769475ffdc0f56c23c5", size = 25024 }, +] + +[[package]] +name = "future" +version = "1.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a7/b2/4140c69c6a66432916b26158687e821ba631a4c9273c474343badf84d3ba/future-1.0.0.tar.gz", hash = "sha256:bd2968309307861edae1458a4f8a4f3598c03be43b97521076aebf5d94c07b05", size = 1228490 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/da/71/ae30dadffc90b9006d77af76b393cb9dfbfc9629f339fc1574a1c52e6806/future-1.0.0-py3-none-any.whl", hash = "sha256:929292d34f5872e70396626ef385ec22355a1fae8ad29e1a734c3e43f9fbc216", size = 491326 }, +] + [[package]] name = "greenlet" version = "3.2.3"