From dd10ef5d41aff28ce1ced7ad66c4837395a61ad8 Mon Sep 17 00:00:00 2001 From: JSC Date: Wed, 30 Jul 2025 20:46:49 +0200 Subject: [PATCH] feat: Add VLC player API endpoints and associated tests - Implemented VLC player API endpoints for playing and stopping sounds. - Added tests for successful playback, error handling, and authentication scenarios. - Created utility function to get sound file paths based on sound properties. - Refactored player service to utilize shared sound path utility. - Enhanced test coverage for sound file path utility with various sound types. - Introduced tests for VLC player service, including subprocess handling and play count tracking. --- app/api/v1/sounds.py | 76 +++++ app/models/sound_played.py | 13 +- app/services/player.py | 69 ++-- app/services/vlc_player.py | 313 ++++++++++++++++++ app/utils/audio.py | 31 ++ tests/api/v1/test_vlc_endpoints.py | 305 +++++++++++++++++ tests/services/test_player.py | 109 +++--- tests/services/test_vlc_player.py | 511 +++++++++++++++++++++++++++++ tests/utils/test_audio.py | 120 ++++++- 9 files changed, 1413 insertions(+), 134 deletions(-) create mode 100644 app/services/vlc_player.py create mode 100644 tests/api/v1/test_vlc_endpoints.py create mode 100644 tests/services/test_vlc_player.py diff --git a/app/api/v1/sounds.py b/app/api/v1/sounds.py index 0c642d1..4be2619 100644 --- a/app/api/v1/sounds.py +++ b/app/api/v1/sounds.py @@ -8,10 +8,12 @@ from sqlmodel.ext.asyncio.session import AsyncSession from app.core.database import get_db from app.core.dependencies import get_current_active_user_flexible from app.models.user import User +from app.repositories.sound import SoundRepository from app.services.extraction import ExtractionInfo, ExtractionService from app.services.extraction_processor import extraction_processor from app.services.sound_normalizer import NormalizationResults, SoundNormalizerService from app.services.sound_scanner import ScanResults, SoundScannerService +from app.services.vlc_player import get_vlc_player_service, VLCPlayerService router = APIRouter(prefix="/sounds", tags=["sounds"]) @@ -37,6 +39,19 @@ async def get_extraction_service( return ExtractionService(session) +def get_vlc_player() -> VLCPlayerService: + """Get the VLC player service.""" + from app.core.database import get_session_factory + return get_vlc_player_service(get_session_factory()) + + +async def get_sound_repository( + session: Annotated[AsyncSession, Depends(get_db)], +) -> SoundRepository: + """Get the sound repository.""" + return SoundRepository(session) + + # SCAN @router.post("/scan") async def scan_sounds( @@ -349,3 +364,64 @@ async def get_user_extractions( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get extractions: {e!s}", ) from e + + +# VLC PLAYER +@router.post("/vlc/play/{sound_id}") +async def play_sound_with_vlc( + sound_id: int, + current_user: Annotated[User, Depends(get_current_active_user_flexible)], + vlc_player: Annotated[VLCPlayerService, Depends(get_vlc_player)], + sound_repo: Annotated[SoundRepository, Depends(get_sound_repository)], +) -> dict[str, str | int | bool]: + """Play a sound using VLC subprocess.""" + try: + # Get the sound + sound = await sound_repo.get_by_id(sound_id) + if not sound: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Sound with ID {sound_id} not found", + ) + + # Play the sound using VLC + success = await vlc_player.play_sound(sound) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to launch VLC for sound playback", + ) + + return { + "message": f"Sound '{sound.name}' is now playing via VLC", + "sound_id": sound_id, + "sound_name": sound.name, + "success": True, + } + + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to play sound: {e!s}", + ) from e + + + +@router.post("/vlc/stop-all") +async def stop_all_vlc_instances( + current_user: Annotated[User, Depends(get_current_active_user_flexible)], + vlc_player: Annotated[VLCPlayerService, Depends(get_vlc_player)], +) -> dict: + """Stop all running VLC instances.""" + try: + result = await vlc_player.stop_all_vlc_instances() + return result + + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to stop VLC instances: {e!s}", + ) from e diff --git a/app/models/sound_played.py b/app/models/sound_played.py index 6f95227..7a53b8c 100644 --- a/app/models/sound_played.py +++ b/app/models/sound_played.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING -from sqlmodel import Field, Relationship, UniqueConstraint +from sqlmodel import Field, Relationship from app.models.base import BaseModel @@ -14,18 +14,9 @@ class SoundPlayed(BaseModel, table=True): __tablename__ = "sound_played" # pyright: ignore[reportAssignmentType] - user_id: int = Field(foreign_key="user.id", nullable=False) + user_id: int | None = Field(foreign_key="user.id", nullable=True) sound_id: int = Field(foreign_key="sound.id", nullable=False) - # constraints - __table_args__ = ( - UniqueConstraint( - "user_id", - "sound_id", - name="uq_sound_played_user_sound", - ), - ) - # relationships user: "User" = Relationship(back_populates="sounds_played") sound: "Sound" = Relationship(back_populates="play_history") diff --git a/app/services/player.py b/app/services/player.py index a42c8b4..2c5b2b5 100644 --- a/app/services/player.py +++ b/app/services/player.py @@ -9,15 +9,14 @@ from pathlib import Path from typing import Any import vlc # type: ignore[import-untyped] -from sqlmodel import select from app.core.logging import get_logger from app.models.sound import Sound from app.models.sound_played import SoundPlayed from app.repositories.playlist import PlaylistRepository from app.repositories.sound import SoundRepository -from app.repositories.user import UserRepository from app.services.socket import socket_manager +from app.utils.audio import get_sound_file_path logger = get_logger(__name__) @@ -198,7 +197,7 @@ class PlayerService: return # Get sound file path - sound_path = self._get_sound_file_path(self.state.current_sound) + sound_path = get_sound_file_path(self.state.current_sound) if not sound_path.exists(): logger.error("Sound file not found: %s", sound_path) return @@ -344,6 +343,12 @@ class PlayerService: if self.state.status != PlayerStatus.STOPPED: await self._stop_playback() + # Set first track as current if no current track and playlist has sounds + if not self.state.current_sound_id and sounds: + self.state.current_sound_index = 0 + self.state.current_sound = sounds[0] + self.state.current_sound_id = sounds[0].id + logger.info( "Loaded playlist: %s (%s sounds)", current_playlist.name, @@ -360,21 +365,6 @@ class PlayerService: """Get current player state.""" return self.state.to_dict() - def _get_sound_file_path(self, sound: Sound) -> Path: - """Get the file path for a sound.""" - # Determine the correct subdirectory based on sound type - subdir = "extracted" if sound.type.upper() == "EXT" else sound.type.lower() - - # Use normalized file if available, otherwise original - if sound.is_normalized and sound.normalized_filename: - return ( - Path("sounds/normalized") - / subdir - / sound.normalized_filename - ) - return ( - Path("sounds/originals") / subdir / sound.filename - ) def _get_next_index(self, current_index: int) -> int | None: """Get next track index based on current mode.""" @@ -501,7 +491,6 @@ class PlayerService: session = self.db_session_factory() try: sound_repo = SoundRepository(session) - user_repo = UserRepository(session) # Update sound play count sound = await sound_repo.get_by_id(sound_id) @@ -519,37 +508,17 @@ class PlayerService: else: logger.warning("Sound %s not found for play count update", sound_id) - # Record play history for admin user (ID 1) as placeholder - # This could be refined to track per-user play history - admin_user = await user_repo.get_by_id(1) - if admin_user: - # Check if already recorded for this user using proper query - stmt = select(SoundPlayed).where( - SoundPlayed.user_id == admin_user.id, - SoundPlayed.sound_id == sound_id, - ) - result = await session.exec(stmt) - existing = result.first() - - if not existing: - sound_played = SoundPlayed( - user_id=admin_user.id, - sound_id=sound_id, - ) - session.add(sound_played) - logger.info( - "Created SoundPlayed record for user %s, sound %s", - admin_user.id, - sound_id, - ) - else: - logger.info( - "SoundPlayed record already exists for user %s, sound %s", - admin_user.id, - sound_id, - ) - else: - logger.warning("Admin user (ID 1) not found for play history") + # Record play history without user_id for player-based plays + # Always create a new SoundPlayed record for each play event + sound_played = SoundPlayed( + user_id=None, # No user_id for player-based plays + sound_id=sound_id, + ) + session.add(sound_played) + logger.info( + "Created SoundPlayed record for player play, sound %s", + sound_id, + ) await session.commit() logger.info("Successfully recorded play count for sound %s", sound_id) diff --git a/app/services/vlc_player.py b/app/services/vlc_player.py new file mode 100644 index 0000000..f5fc0d8 --- /dev/null +++ b/app/services/vlc_player.py @@ -0,0 +1,313 @@ +"""VLC subprocess-based player service for immediate sound playback.""" + +import asyncio +import subprocess +from collections.abc import Callable +from pathlib import Path +from typing import Any + +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.core.logging import get_logger +from app.models.sound import Sound +from app.models.sound_played import SoundPlayed +from app.repositories.sound import SoundRepository +from app.repositories.user import UserRepository +from app.services.socket import socket_manager +from app.utils.audio import get_sound_file_path + +logger = get_logger(__name__) + + +class VLCPlayerService: + """Service for launching VLC instances via subprocess to play sounds.""" + + def __init__( + self, db_session_factory: Callable[[], AsyncSession] | None = None, + ) -> None: + """Initialize the VLC player service.""" + self.vlc_executable = self._find_vlc_executable() + self.db_session_factory = db_session_factory + logger.info( + "VLC Player Service initialized with executable: %s", + self.vlc_executable, + ) + + def _find_vlc_executable(self) -> str: + """Find VLC executable path based on the operating system.""" + # Common VLC executable paths + possible_paths = [ + "vlc", # Linux/Mac with VLC in PATH + "/usr/bin/vlc", # Linux + "/usr/local/bin/vlc", # Linux/Mac + "/Applications/VLC.app/Contents/MacOS/VLC", # macOS + "C:\\Program Files\\VideoLAN\\VLC\\vlc.exe", # Windows + "C:\\Program Files (x86)\\VideoLAN\\VLC\\vlc.exe", # Windows 32-bit + ] + + for path in possible_paths: + try: + if Path(path).exists(): + return path + # For "vlc", try to find it in PATH + if path == "vlc": + result = subprocess.run( + ["which", "vlc"], + capture_output=True, + check=False, + text=True, + ) + if result.returncode == 0: + return path + except (OSError, subprocess.SubprocessError): + continue + + # Default to 'vlc' and let the system handle it + logger.warning( + "VLC executable not found in common paths, using 'vlc' from PATH", + ) + return "vlc" + + async def play_sound(self, sound: Sound) -> bool: + """Play a sound using a new VLC subprocess instance. + + Args: + sound: The Sound object to play + + Returns: + bool: True if VLC process was launched successfully, False otherwise + + """ + try: + sound_path = get_sound_file_path(sound) + + if not sound_path.exists(): + logger.error("Sound file not found: %s", sound_path) + return False + + # VLC command arguments for immediate playback + cmd = [ + self.vlc_executable, + str(sound_path), + "--play-and-exit", # Exit VLC when playback finishes + "--intf", + "dummy", # No interface + "--no-video", # Audio only + "--no-repeat", # Don't repeat + "--no-loop", # Don't loop + ] + + # Launch VLC process asynchronously without waiting + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + + logger.info( + "Launched VLC process (PID: %s) for sound: %s", + process.pid, + sound.name, + ) + + # Record play count and emit event + if self.db_session_factory and sound.id: + asyncio.create_task(self._record_play_count(sound.id, sound.name)) + + return True + + except Exception: + logger.exception("Failed to launch VLC for sound %s", sound.name) + return False + + async def stop_all_vlc_instances(self) -> dict[str, Any]: + """Stop all running VLC processes by killing them. + + Returns: + dict: Results of the stop operation including counts and any errors + + """ + try: + # Find all VLC processes + find_cmd = ["pgrep", "-f", "vlc"] + find_process = await asyncio.create_subprocess_exec( + *find_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await find_process.communicate() + + if find_process.returncode != 0: + # No VLC processes found + logger.info("No VLC processes found to stop") + return { + "success": True, + "processes_found": 0, + "processes_killed": 0, + "message": "No VLC processes found", + } + + # Parse PIDs from output + pids = [] + if stdout: + pids = [ + pid.strip() + for pid in stdout.decode().strip().split("\n") + if pid.strip() + ] + + if not pids: + logger.info("No VLC processes found to stop") + return { + "success": True, + "processes_found": 0, + "processes_killed": 0, + "message": "No VLC processes found", + } + + logger.info("Found %s VLC processes: %s", len(pids), ", ".join(pids)) + + # Kill all VLC processes + kill_cmd = ["pkill", "-f", "vlc"] + kill_process = await asyncio.create_subprocess_exec( + *kill_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + await kill_process.communicate() + + # Verify processes were killed + verify_process = await asyncio.create_subprocess_exec( + *find_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout_verify, _ = await verify_process.communicate() + + remaining_pids = [] + if verify_process.returncode == 0 and stdout_verify: + remaining_pids = [ + pid.strip() + for pid in stdout_verify.decode().strip().split("\n") + if pid.strip() + ] + + processes_killed = len(pids) - len(remaining_pids) + + logger.info( + "Kill operation completed. Found: %s, Killed: %s, Remaining: %s", + len(pids), + processes_killed, + len(remaining_pids), + ) + + return { + "success": True, + "processes_found": len(pids), + "processes_killed": processes_killed, + "processes_remaining": len(remaining_pids), + "message": f"Killed {processes_killed} VLC processes", + } + + except Exception as e: + logger.exception("Failed to stop VLC processes") + return { + "success": False, + "processes_found": 0, + "processes_killed": 0, + "error": str(e), + "message": "Failed to stop VLC processes", + } + + async def _record_play_count(self, sound_id: int, sound_name: str) -> None: + """Record a play count for a sound and emit sound_played event.""" + if not self.db_session_factory: + logger.warning( + "No database session factory available for play count recording", + ) + return + + logger.info("Recording play count for sound %s", sound_id) + session = self.db_session_factory() + try: + sound_repo = SoundRepository(session) + user_repo = UserRepository(session) + + # Update sound play count + sound = await sound_repo.get_by_id(sound_id) + old_count = 0 + if sound: + old_count = sound.play_count + await sound_repo.update( + sound, + {"play_count": sound.play_count + 1}, + ) + logger.info( + "Updated sound %s play_count: %s -> %s", + sound_id, + old_count, + old_count + 1, + ) + else: + logger.warning("Sound %s not found for play count update", sound_id) + + # Record play history for admin user (ID 1) as placeholder + # This could be refined to track per-user play history + admin_user = await user_repo.get_by_id(1) + admin_user_id = None + if admin_user: + admin_user_id = admin_user.id + + # Always create a new SoundPlayed record for each play event + sound_played = SoundPlayed( + user_id=admin_user_id, # Can be None for player-based plays + sound_id=sound_id, + ) + session.add(sound_played) + logger.info( + "Created SoundPlayed record for user %s, sound %s", + admin_user_id, + sound_id, + ) + + await session.commit() + logger.info("Successfully recorded play count for sound %s", sound_id) + + # Emit sound_played event via WebSocket + try: + event_data = { + "sound_id": sound_id, + "sound_name": sound_name, + "user_id": admin_user_id, + "play_count": (old_count + 1) if sound else None, + } + await socket_manager.broadcast_to_all("sound_played", event_data) + logger.info("Broadcasted sound_played event for sound %s", sound_id) + except Exception: + logger.exception( + "Failed to broadcast sound_played event for sound %s", sound_id, + ) + + except Exception: + logger.exception("Error recording play count for sound %s", sound_id) + await session.rollback() + finally: + await session.close() + + + +# Global VLC player service instance +vlc_player_service: VLCPlayerService | None = None + + +def get_vlc_player_service( + db_session_factory: Callable[[], AsyncSession] | None = None, +) -> VLCPlayerService: + """Get the global VLC player service instance.""" + global vlc_player_service # noqa: PLW0603 + if vlc_player_service is None: + vlc_player_service = VLCPlayerService(db_session_factory) + return vlc_player_service diff --git a/app/utils/audio.py b/app/utils/audio.py index 7f03f3c..b438ef7 100644 --- a/app/utils/audio.py +++ b/app/utils/audio.py @@ -2,11 +2,15 @@ import hashlib from pathlib import Path +from typing import TYPE_CHECKING import ffmpeg # type: ignore[import-untyped] from app.core.logging import get_logger +if TYPE_CHECKING: + from app.models.sound import Sound + logger = get_logger(__name__) @@ -33,3 +37,30 @@ def get_audio_duration(file_path: Path) -> int: except Exception as e: logger.warning("Failed to get duration for %s: %s", file_path, e) return 0 + + +def get_sound_file_path(sound: "Sound") -> Path: + """Get the file path for a sound based on its type and normalization status. + + Args: + sound: The Sound object to get the path for + + Returns: + Path: The full path to the sound file + + """ + # Determine the correct subdirectory based on sound type + if sound.type.upper() == "EXT": + subdir = "extracted" + elif sound.type.upper() == "SDB": + subdir = "soundboard" + elif sound.type.upper() == "TTS": + subdir = "text_to_speech" + else: + # Fallback to lowercase type + subdir = sound.type.lower() + + # Use normalized file if available, otherwise original + if sound.is_normalized and sound.normalized_filename: + return Path("sounds/normalized") / subdir / sound.normalized_filename + return Path("sounds/originals") / subdir / sound.filename diff --git a/tests/api/v1/test_vlc_endpoints.py b/tests/api/v1/test_vlc_endpoints.py new file mode 100644 index 0000000..f3c51da --- /dev/null +++ b/tests/api/v1/test_vlc_endpoints.py @@ -0,0 +1,305 @@ +"""Tests for VLC player API endpoints.""" + +from unittest.mock import AsyncMock, patch + +import pytest +from httpx import AsyncClient + +from app.models.sound import Sound +from app.models.user import User + + +class TestVLCEndpoints: + """Test VLC player API endpoints.""" + + @pytest.mark.asyncio + async def test_play_sound_with_vlc_success( + self, + authenticated_client: AsyncClient, + authenticated_user: User, + ): + """Test successful sound playback via VLC.""" + # Mock the VLC player service and sound repository methods + with patch("app.services.vlc_player.VLCPlayerService.play_sound") as mock_play_sound: + mock_play_sound.return_value = True + + with patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_by_id: + mock_sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=5000, + size=1024, + hash="test_hash", + ) + mock_get_by_id.return_value = mock_sound + + response = await authenticated_client.post("/api/v1/sounds/vlc/play/1") + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["sound_id"] == 1 + assert data["sound_name"] == "Test Sound" + assert "Test Sound" in data["message"] + + # Verify service calls + mock_get_by_id.assert_called_once_with(1) + mock_play_sound.assert_called_once_with(mock_sound) + + @pytest.mark.asyncio + async def test_play_sound_with_vlc_sound_not_found( + self, + authenticated_client: AsyncClient, + authenticated_user: User, + ): + """Test VLC playback when sound is not found.""" + # Mock the sound repository to return None + with patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_by_id: + mock_get_by_id.return_value = None + + response = await authenticated_client.post("/api/v1/sounds/vlc/play/999") + + assert response.status_code == 404 + data = response.json() + assert "Sound with ID 999 not found" in data["detail"] + + @pytest.mark.asyncio + async def test_play_sound_with_vlc_launch_failure( + self, + authenticated_client: AsyncClient, + authenticated_user: User, + ): + """Test VLC playback when VLC launch fails.""" + # Mock the VLC player service to fail + with patch("app.services.vlc_player.VLCPlayerService.play_sound") as mock_play_sound: + mock_play_sound.return_value = False + + with patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_by_id: + mock_sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=5000, + size=1024, + hash="test_hash", + ) + mock_get_by_id.return_value = mock_sound + + response = await authenticated_client.post("/api/v1/sounds/vlc/play/1") + + assert response.status_code == 500 + data = response.json() + assert "Failed to launch VLC for sound playback" in data["detail"] + + @pytest.mark.asyncio + async def test_play_sound_with_vlc_service_exception( + self, + authenticated_client: AsyncClient, + authenticated_user: User, + ): + """Test VLC playback when service raises an exception.""" + # Mock the sound repository to raise an exception + with patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_by_id: + mock_get_by_id.side_effect = Exception("Database error") + + response = await authenticated_client.post("/api/v1/sounds/vlc/play/1") + + assert response.status_code == 500 + data = response.json() + assert "Failed to play sound" in data["detail"] + + @pytest.mark.asyncio + async def test_play_sound_with_vlc_unauthenticated( + self, + client: AsyncClient, + ): + """Test VLC playback without authentication.""" + response = await client.post("/api/v1/sounds/vlc/play/1") + assert response.status_code == 401 + + @pytest.mark.asyncio + async def test_stop_all_vlc_instances_success( + self, + authenticated_client: AsyncClient, + authenticated_user: User, + ): + """Test successful stopping of all VLC instances.""" + # Mock the VLC player service + with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all: + mock_result = { + "success": True, + "processes_found": 3, + "processes_killed": 3, + "processes_remaining": 0, + "message": "Killed 3 VLC processes", + } + mock_stop_all.return_value = mock_result + + response = await authenticated_client.post("/api/v1/sounds/vlc/stop-all") + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["processes_found"] == 3 + assert data["processes_killed"] == 3 + assert data["processes_remaining"] == 0 + assert "Killed 3 VLC processes" in data["message"] + + # Verify service call + mock_stop_all.assert_called_once() + + @pytest.mark.asyncio + async def test_stop_all_vlc_instances_no_processes( + self, + authenticated_client: AsyncClient, + authenticated_user: User, + ): + """Test stopping VLC instances when none are running.""" + # Mock the VLC player service + with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all: + mock_result = { + "success": True, + "processes_found": 0, + "processes_killed": 0, + "message": "No VLC processes found", + } + mock_stop_all.return_value = mock_result + + response = await authenticated_client.post("/api/v1/sounds/vlc/stop-all") + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["processes_found"] == 0 + assert data["processes_killed"] == 0 + assert data["message"] == "No VLC processes found" + + @pytest.mark.asyncio + async def test_stop_all_vlc_instances_partial_success( + self, + authenticated_client: AsyncClient, + authenticated_user: User, + ): + """Test stopping VLC instances with partial success.""" + # Mock the VLC player service + with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all: + mock_result = { + "success": True, + "processes_found": 3, + "processes_killed": 2, + "processes_remaining": 1, + "message": "Killed 2 VLC processes", + } + mock_stop_all.return_value = mock_result + + response = await authenticated_client.post("/api/v1/sounds/vlc/stop-all") + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["processes_found"] == 3 + assert data["processes_killed"] == 2 + assert data["processes_remaining"] == 1 + + @pytest.mark.asyncio + async def test_stop_all_vlc_instances_failure( + self, + authenticated_client: AsyncClient, + authenticated_user: User, + ): + """Test stopping VLC instances when service fails.""" + # Mock the VLC player service + with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all: + mock_result = { + "success": False, + "processes_found": 0, + "processes_killed": 0, + "error": "Command failed", + "message": "Failed to stop VLC processes", + } + mock_stop_all.return_value = mock_result + + response = await authenticated_client.post("/api/v1/sounds/vlc/stop-all") + + assert response.status_code == 200 + data = response.json() + assert data["success"] is False + assert data["error"] == "Command failed" + assert data["message"] == "Failed to stop VLC processes" + + @pytest.mark.asyncio + async def test_stop_all_vlc_instances_service_exception( + self, + authenticated_client: AsyncClient, + authenticated_user: User, + ): + """Test stopping VLC instances when service raises an exception.""" + # Mock the VLC player service to raise an exception + with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all: + mock_stop_all.side_effect = Exception("Service error") + + response = await authenticated_client.post("/api/v1/sounds/vlc/stop-all") + + assert response.status_code == 500 + data = response.json() + assert "Failed to stop VLC instances" in data["detail"] + + @pytest.mark.asyncio + async def test_stop_all_vlc_instances_unauthenticated( + self, + client: AsyncClient, + ): + """Test stopping VLC instances without authentication.""" + response = await client.post("/api/v1/sounds/vlc/stop-all") + assert response.status_code == 401 + + @pytest.mark.asyncio + async def test_vlc_endpoints_with_admin_user( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test VLC endpoints work with admin user.""" + # Test play endpoint with admin + with patch("app.services.vlc_player.VLCPlayerService.play_sound") as mock_play_sound: + mock_play_sound.return_value = True + + with patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_by_id: + mock_sound = Sound( + id=1, + type="SDB", + name="Admin Test Sound", + filename="admin_test.mp3", + duration=3000, + size=512, + hash="admin_hash", + ) + mock_get_by_id.return_value = mock_sound + + response = await authenticated_admin_client.post("/api/v1/sounds/vlc/play/1") + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["sound_name"] == "Admin Test Sound" + + # Test stop-all endpoint with admin + with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all: + mock_result = { + "success": True, + "processes_found": 1, + "processes_killed": 1, + "processes_remaining": 0, + "message": "Killed 1 VLC processes", + } + mock_stop_all.return_value = mock_result + + response = await authenticated_admin_client.post("/api/v1/sounds/vlc/stop-all") + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["processes_killed"] == 1 \ No newline at end of file diff --git a/tests/services/test_player.py b/tests/services/test_player.py index 405fc7c..da9ba3c 100644 --- a/tests/services/test_player.py +++ b/tests/services/test_player.py @@ -21,6 +21,7 @@ from app.services.player import ( initialize_player_service, shutdown_player_service, ) +from app.utils.audio import get_sound_file_path class TestPlayerState: @@ -196,7 +197,7 @@ class TestPlayerService: ) player_service.state.playlist_sounds = [sound] - with patch.object(player_service, "_get_sound_file_path") as mock_path: + with patch("app.services.player.get_sound_file_path") as mock_path: mock_file_path = Mock(spec=Path) mock_file_path.exists.return_value = True mock_path.return_value = mock_file_path @@ -385,51 +386,6 @@ class TestPlayerService: assert player_service.state.playlist_length == 2 assert player_service.state.playlist_duration == 75000 - def test_get_sound_file_path_normalized(self, player_service): - """Test getting file path for normalized sound.""" - sound = Sound( - id=1, - name="Test Song", - filename="original.mp3", - normalized_filename="normalized.mp3", - is_normalized=True, - type="SDB", - ) - - result = player_service._get_sound_file_path(sound) - - expected = Path("sounds/normalized/sdb/normalized.mp3") - assert result == expected - - def test_get_sound_file_path_original(self, player_service): - """Test getting file path for original sound.""" - sound = Sound( - id=1, - name="Test Song", - filename="original.mp3", - is_normalized=False, - type="SDB", - ) - - result = player_service._get_sound_file_path(sound) - - expected = Path("sounds/originals/sdb/original.mp3") - assert result == expected - - def test_get_sound_file_path_ext_type(self, player_service): - """Test getting file path for EXT type sound.""" - sound = Sound( - id=1, - name="Test Song", - filename="extracted.mp3", - is_normalized=False, - type="EXT", - ) - - result = player_service._get_sound_file_path(sound) - - expected = Path("sounds/originals/extracted/extracted.mp3") - assert result == expected def test_get_next_index_continuous_mode(self, player_service): """Test getting next index in continuous mode.""" @@ -538,36 +494,24 @@ class TestPlayerService: # Mock repositories with patch("app.services.player.SoundRepository") as mock_sound_repo_class: - with patch("app.services.player.UserRepository") as mock_user_repo_class: - mock_sound_repo = AsyncMock() - mock_user_repo = AsyncMock() - mock_sound_repo_class.return_value = mock_sound_repo - mock_user_repo_class.return_value = mock_user_repo + mock_sound_repo = AsyncMock() + mock_sound_repo_class.return_value = mock_sound_repo - # Mock sound and user - mock_sound = Mock() - mock_sound.play_count = 5 - mock_sound_repo.get_by_id.return_value = mock_sound + # Mock sound + mock_sound = Mock() + mock_sound.play_count = 5 + mock_sound_repo.get_by_id.return_value = mock_sound - mock_user = Mock() - mock_user.id = 1 - mock_user_repo.get_by_id.return_value = mock_user + await player_service._record_play_count(1) - # Mock no existing SoundPlayed record - mock_result = Mock() - mock_result.first.return_value = None - mock_session.exec.return_value = mock_result + # Verify sound play count was updated + mock_sound_repo.update.assert_called_once_with( + mock_sound, {"play_count": 6} + ) - await player_service._record_play_count(1) - - # Verify sound play count was updated - mock_sound_repo.update.assert_called_once_with( - mock_sound, {"play_count": 6} - ) - - # Verify SoundPlayed record was created - mock_session.add.assert_called_once() - mock_session.commit.assert_called_once() + # Verify SoundPlayed record was created with None user_id for player + mock_session.add.assert_called_once() + mock_session.commit.assert_called_once() def test_get_state(self, player_service): """Test getting current player state.""" @@ -577,6 +521,27 @@ class TestPlayerService: assert "mode" in result assert "volume" in result + def test_uses_shared_sound_path_utility(self, player_service): + """Test that player service uses the shared sound path utility.""" + sound = Sound( + id=1, + name="Test Song", + filename="test.mp3", + type="SDB", + is_normalized=False, + ) + player_service.state.playlist_sounds = [sound] + + with patch("app.services.player.get_sound_file_path") as mock_path: + mock_file_path = Mock(spec=Path) + mock_file_path.exists.return_value = False # File doesn't exist + mock_path.return_value = mock_file_path + + # This should fail because file doesn't exist + result = asyncio.run(player_service.play(0)) + # Verify the utility was called + mock_path.assert_called_once_with(sound) + class TestPlayerServiceGlobalFunctions: """Test global player service functions.""" diff --git a/tests/services/test_vlc_player.py b/tests/services/test_vlc_player.py new file mode 100644 index 0000000..d5d57b3 --- /dev/null +++ b/tests/services/test_vlc_player.py @@ -0,0 +1,511 @@ +"""Tests for VLC player service.""" + +import asyncio +from pathlib import Path +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from app.models.sound import Sound +from app.models.sound_played import SoundPlayed +from app.models.user import User +from app.services.vlc_player import VLCPlayerService, get_vlc_player_service +from app.utils.audio import get_sound_file_path + + +class TestVLCPlayerService: + """Test VLC player service.""" + + @pytest.fixture + def vlc_service(self): + """Create a VLC service instance.""" + with patch("app.services.vlc_player.subprocess.run") as mock_run: + # Mock VLC executable detection + mock_run.return_value.returncode = 0 + return VLCPlayerService() + + @pytest.fixture + def vlc_service_with_db(self): + """Create a VLC service instance with database session factory.""" + with patch("app.services.vlc_player.subprocess.run") as mock_run: + # Mock VLC executable detection + mock_run.return_value.returncode = 0 + mock_session_factory = Mock() + return VLCPlayerService(mock_session_factory) + + @pytest.fixture + def sample_sound(self): + """Create a sample sound for testing.""" + return Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test_audio.mp3", + duration=5000, + size=1024, + hash="test_hash", + is_normalized=False, + normalized_filename=None, + ) + + @pytest.fixture + def normalized_sound(self): + """Create a normalized sound for testing.""" + return Sound( + id=2, + type="TTS", + name="Normalized Sound", + filename="original.wav", + duration=7500, + size=2048, + hash="normalized_hash", + is_normalized=True, + normalized_filename="normalized.mp3", + ) + + def test_init(self, vlc_service): + """Test VLC service initialization.""" + assert vlc_service.vlc_executable is not None + assert isinstance(vlc_service.vlc_executable, str) + + @patch("app.services.vlc_player.subprocess.run") + def test_find_vlc_executable_found_in_path(self, mock_run): + """Test VLC executable detection when found in PATH.""" + mock_run.return_value.returncode = 0 + service = VLCPlayerService() + assert service.vlc_executable == "vlc" + + @patch("app.services.vlc_player.subprocess.run") + def test_find_vlc_executable_found_by_path(self, mock_run): + """Test VLC executable detection when found by absolute path.""" + mock_run.return_value.returncode = 1 # which command fails + + # Mock Path to return True for the first absolute path + with patch("app.services.vlc_player.Path") as mock_path: + def path_side_effect(path_str): + mock_instance = Mock() + mock_instance.exists.return_value = str(path_str) == "/usr/bin/vlc" + return mock_instance + + mock_path.side_effect = path_side_effect + + service = VLCPlayerService() + assert service.vlc_executable == "/usr/bin/vlc" + + @patch("app.services.vlc_player.subprocess.run") + @patch("app.services.vlc_player.Path") + def test_find_vlc_executable_fallback(self, mock_path, mock_run): + """Test VLC executable detection fallback to default.""" + # Mock all paths as non-existent + mock_path_instance = Mock() + mock_path_instance.exists.return_value = False + mock_path.return_value = mock_path_instance + + # Mock which command as failing + mock_run.return_value.returncode = 1 + + service = VLCPlayerService() + assert service.vlc_executable == "vlc" + + + @pytest.mark.asyncio + @patch("app.services.vlc_player.asyncio.create_subprocess_exec") + async def test_play_sound_success( + self, mock_subprocess, vlc_service, sample_sound + ): + """Test successful sound playback.""" + # Mock subprocess + mock_process = Mock() + mock_process.pid = 12345 + mock_subprocess.return_value = mock_process + + # Mock the file path utility to avoid Path issues + with patch("app.services.vlc_player.get_sound_file_path") as mock_get_path: + mock_path = Mock() + mock_path.exists.return_value = True + mock_get_path.return_value = mock_path + + result = await vlc_service.play_sound(sample_sound) + + assert result is True + mock_subprocess.assert_called_once() + args = mock_subprocess.call_args + + # Check command arguments + cmd_args = args[1] # keyword arguments + assert "--play-and-exit" in args[0] + assert "--intf" in args[0] + assert "dummy" in args[0] + assert "--no-video" in args[0] + assert "--no-repeat" in args[0] + assert "--no-loop" in args[0] + assert cmd_args["stdout"] == asyncio.subprocess.DEVNULL + assert cmd_args["stderr"] == asyncio.subprocess.DEVNULL + + @pytest.mark.asyncio + async def test_play_sound_file_not_found( + self, vlc_service, sample_sound + ): + """Test sound playback when file doesn't exist.""" + # Mock the file path utility to return a non-existent path + with patch("app.services.vlc_player.get_sound_file_path") as mock_get_path: + mock_path = Mock() + mock_path.exists.return_value = False + mock_get_path.return_value = mock_path + + result = await vlc_service.play_sound(sample_sound) + + assert result is False + + @pytest.mark.asyncio + @patch("app.services.vlc_player.asyncio.create_subprocess_exec") + async def test_play_sound_subprocess_error( + self, mock_subprocess, vlc_service, sample_sound + ): + """Test sound playback when subprocess fails.""" + # Mock the file path utility to return an existing path + with patch("app.services.vlc_player.get_sound_file_path") as mock_get_path: + mock_path = Mock() + mock_path.exists.return_value = True + mock_get_path.return_value = mock_path + + # Mock subprocess exception + mock_subprocess.side_effect = Exception("Subprocess failed") + + result = await vlc_service.play_sound(sample_sound) + + assert result is False + + @pytest.mark.asyncio + @patch("app.services.vlc_player.asyncio.create_subprocess_exec") + async def test_stop_all_vlc_instances_success(self, mock_subprocess, vlc_service): + """Test successful stopping of all VLC instances.""" + # Mock pgrep process (find VLC processes) + mock_find_process = Mock() + mock_find_process.returncode = 0 + mock_find_process.communicate = AsyncMock( + return_value=(b"12345\n67890\n", b"") + ) + + # Mock pkill process (kill VLC processes) + mock_kill_process = Mock() + mock_kill_process.communicate = AsyncMock(return_value=(b"", b"")) + + # Mock verify process (check remaining processes) + mock_verify_process = Mock() + mock_verify_process.returncode = 1 # No processes found + mock_verify_process.communicate = AsyncMock(return_value=(b"", b"")) + + # Set up subprocess mock to return different processes for each call + mock_subprocess.side_effect = [ + mock_find_process, + mock_kill_process, + mock_verify_process, + ] + + result = await vlc_service.stop_all_vlc_instances() + + assert result["success"] is True + assert result["processes_found"] == 2 + assert result["processes_killed"] == 2 + assert result["processes_remaining"] == 0 + assert "Killed 2 VLC processes" in result["message"] + + @pytest.mark.asyncio + @patch("app.services.vlc_player.asyncio.create_subprocess_exec") + async def test_stop_all_vlc_instances_no_processes( + self, mock_subprocess, vlc_service + ): + """Test stopping VLC instances when none are running.""" + # Mock pgrep process (no VLC processes found) + mock_find_process = Mock() + mock_find_process.returncode = 1 # No processes found + mock_find_process.communicate = AsyncMock(return_value=(b"", b"")) + + mock_subprocess.return_value = mock_find_process + + result = await vlc_service.stop_all_vlc_instances() + + assert result["success"] is True + assert result["processes_found"] == 0 + assert result["processes_killed"] == 0 + assert result["message"] == "No VLC processes found" + + @pytest.mark.asyncio + @patch("app.services.vlc_player.asyncio.create_subprocess_exec") + async def test_stop_all_vlc_instances_partial_kill( + self, mock_subprocess, vlc_service + ): + """Test stopping VLC instances when some processes remain.""" + # Mock pgrep process (find VLC processes) + mock_find_process = Mock() + mock_find_process.returncode = 0 + mock_find_process.communicate = AsyncMock( + return_value=(b"12345\n67890\n11111\n", b"") + ) + + # Mock pkill process (kill VLC processes) + mock_kill_process = Mock() + mock_kill_process.communicate = AsyncMock(return_value=(b"", b"")) + + # Mock verify process (one process remains) + mock_verify_process = Mock() + mock_verify_process.returncode = 0 + mock_verify_process.communicate = AsyncMock(return_value=(b"11111\n", b"")) + + mock_subprocess.side_effect = [ + mock_find_process, + mock_kill_process, + mock_verify_process, + ] + + result = await vlc_service.stop_all_vlc_instances() + + assert result["success"] is True + assert result["processes_found"] == 3 + assert result["processes_killed"] == 2 + assert result["processes_remaining"] == 1 + + @pytest.mark.asyncio + @patch("app.services.vlc_player.asyncio.create_subprocess_exec") + async def test_stop_all_vlc_instances_error(self, mock_subprocess, vlc_service): + """Test stopping VLC instances when an error occurs.""" + # Mock subprocess exception + mock_subprocess.side_effect = Exception("Command failed") + + result = await vlc_service.stop_all_vlc_instances() + + assert result["success"] is False + assert result["processes_found"] == 0 + assert result["processes_killed"] == 0 + assert "error" in result + assert result["message"] == "Failed to stop VLC processes" + + def test_get_vlc_player_service_singleton(self): + """Test that get_vlc_player_service returns the same instance.""" + with patch("app.services.vlc_player.VLCPlayerService") as mock_service_class: + mock_instance = Mock() + mock_service_class.return_value = mock_instance + + # Clear the global instance + import app.services.vlc_player + app.services.vlc_player.vlc_player_service = None + + # First call should create new instance + service1 = get_vlc_player_service() + assert service1 == mock_instance + mock_service_class.assert_called_once() + + # Second call should return same instance + service2 = get_vlc_player_service() + assert service2 == mock_instance + assert service1 is service2 + # Constructor should not be called again + mock_service_class.assert_called_once() + + @pytest.mark.asyncio + @patch("app.services.vlc_player.asyncio.create_subprocess_exec") + async def test_play_sound_with_play_count_tracking( + self, mock_subprocess, vlc_service_with_db, sample_sound + ): + """Test sound playback with play count tracking.""" + # Mock subprocess + mock_process = Mock() + mock_process.pid = 12345 + mock_subprocess.return_value = mock_process + + # Mock session and repositories + mock_session = AsyncMock() + vlc_service_with_db.db_session_factory.return_value = mock_session + + # Mock repositories + mock_sound_repo = AsyncMock() + mock_user_repo = AsyncMock() + + with patch("app.services.vlc_player.SoundRepository", return_value=mock_sound_repo): + with patch("app.services.vlc_player.UserRepository", return_value=mock_user_repo): + with patch("app.services.vlc_player.socket_manager") as mock_socket: + with patch("app.services.vlc_player.select") as mock_select: + # Mock the file path utility + with patch("app.services.vlc_player.get_sound_file_path") as mock_get_path: + mock_path = Mock() + mock_path.exists.return_value = True + mock_get_path.return_value = mock_path + + # Mock sound repository responses + updated_sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=5000, + size=1024, + hash="test_hash", + play_count=1, # Updated count + ) + mock_sound_repo.get_by_id.return_value = sample_sound + mock_sound_repo.update.return_value = updated_sound + + # Mock admin user + admin_user = User( + id=1, + email="admin@test.com", + name="Admin User", + role="admin", + ) + mock_user_repo.get_by_id.return_value = admin_user + + # Mock socket broadcast + mock_socket.broadcast_to_all = AsyncMock() + + result = await vlc_service_with_db.play_sound(sample_sound) + + # Wait a bit for the async task to complete + await asyncio.sleep(0.1) + + assert result is True + + # Verify subprocess was called + mock_subprocess.assert_called_once() + + # Note: The async task runs in the background, so we can't easily + # verify the database operations in this test without more complex + # mocking or using a real async test framework setup + + @pytest.mark.asyncio + async def test_record_play_count_success(self, vlc_service_with_db): + """Test successful play count recording.""" + # Mock session and repositories + mock_session = AsyncMock() + vlc_service_with_db.db_session_factory.return_value = mock_session + + mock_sound_repo = AsyncMock() + mock_user_repo = AsyncMock() + + # Create test sound and user + test_sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=5000, + size=1024, + hash="test_hash", + play_count=0, + ) + admin_user = User( + id=1, + email="admin@test.com", + name="Admin User", + role="admin", + ) + + with patch("app.services.vlc_player.SoundRepository", return_value=mock_sound_repo): + with patch("app.services.vlc_player.UserRepository", return_value=mock_user_repo): + with patch("app.services.vlc_player.socket_manager") as mock_socket: + with patch("app.services.vlc_player.select") as mock_select: + # Setup mocks + mock_sound_repo.get_by_id.return_value = test_sound + mock_user_repo.get_by_id.return_value = admin_user + + # Mock socket broadcast + mock_socket.broadcast_to_all = AsyncMock() + + await vlc_service_with_db._record_play_count(1, "Test Sound") + + # Verify sound repository calls + mock_sound_repo.get_by_id.assert_called_once_with(1) + mock_sound_repo.update.assert_called_once_with( + test_sound, {"play_count": 1} + ) + + # Verify user repository calls + mock_user_repo.get_by_id.assert_called_once_with(1) + + # Verify session operations + mock_session.add.assert_called_once() + mock_session.commit.assert_called_once() + mock_session.close.assert_called_once() + + # Verify socket broadcast + mock_socket.broadcast_to_all.assert_called_once_with( + "sound_played", + { + "sound_id": 1, + "sound_name": "Test Sound", + "user_id": 1, + "play_count": 1, + }, + ) + + @pytest.mark.asyncio + async def test_record_play_count_no_session_factory(self, vlc_service): + """Test play count recording when no session factory is available.""" + # This should not raise an error and should log a warning + await vlc_service._record_play_count(1, "Test Sound") + # The method should return early without doing anything + + @pytest.mark.asyncio + async def test_record_play_count_always_creates_record(self, vlc_service_with_db): + """Test play count recording always creates a new SoundPlayed record.""" + # Mock session and repositories + mock_session = AsyncMock() + vlc_service_with_db.db_session_factory.return_value = mock_session + + mock_sound_repo = AsyncMock() + mock_user_repo = AsyncMock() + + # Create test sound and user + test_sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=5000, + size=1024, + hash="test_hash", + play_count=5, + ) + admin_user = User( + id=1, + email="admin@test.com", + name="Admin User", + role="admin", + ) + + with patch("app.services.vlc_player.SoundRepository", return_value=mock_sound_repo): + with patch("app.services.vlc_player.UserRepository", return_value=mock_user_repo): + with patch("app.services.vlc_player.socket_manager") as mock_socket: + # Setup mocks + mock_sound_repo.get_by_id.return_value = test_sound + mock_user_repo.get_by_id.return_value = admin_user + + # Mock socket broadcast + mock_socket.broadcast_to_all = AsyncMock() + + await vlc_service_with_db._record_play_count(1, "Test Sound") + + # Verify sound play count was updated + mock_sound_repo.update.assert_called_once_with( + test_sound, {"play_count": 6} + ) + + # Verify new SoundPlayed record was always added + mock_session.add.assert_called_once() + + # Verify commit happened + mock_session.commit.assert_called_once() + + def test_uses_shared_sound_path_utility(self, vlc_service, sample_sound): + """Test that VLC service uses the shared sound path utility.""" + with patch("app.services.vlc_player.get_sound_file_path") as mock_path: + mock_file_path = Mock(spec=Path) + mock_file_path.exists.return_value = False # File doesn't exist + mock_path.return_value = mock_file_path + + # This should fail because file doesn't exist + result = asyncio.run(vlc_service.play_sound(sample_sound)) + + # Verify the utility was called and returned False + mock_path.assert_called_once_with(sample_sound) + assert result is False \ No newline at end of file diff --git a/tests/utils/test_audio.py b/tests/utils/test_audio.py index 56eb6c1..b184401 100644 --- a/tests/utils/test_audio.py +++ b/tests/utils/test_audio.py @@ -7,7 +7,8 @@ from unittest.mock import patch import pytest -from app.utils.audio import get_audio_duration, get_file_hash, get_file_size +from app.models.sound import Sound +from app.utils.audio import get_audio_duration, get_file_hash, get_file_size, get_sound_file_path class TestAudioUtils: @@ -290,3 +291,120 @@ class TestAudioUtils: # Should raise FileNotFoundError for nonexistent file with pytest.raises(FileNotFoundError): get_file_size(nonexistent_path) + + def test_get_sound_file_path_sdb_original(self): + """Test getting sound file path for SDB type original file.""" + sound = Sound( + id=1, + name="Test Sound", + filename="test.mp3", + type="SDB", + is_normalized=False, + ) + + result = get_sound_file_path(sound) + expected = Path("sounds/originals/soundboard/test.mp3") + assert result == expected + + def test_get_sound_file_path_sdb_normalized(self): + """Test getting sound file path for SDB type normalized file.""" + sound = Sound( + id=1, + name="Test Sound", + filename="original.mp3", + normalized_filename="normalized.mp3", + type="SDB", + is_normalized=True, + ) + + result = get_sound_file_path(sound) + expected = Path("sounds/normalized/soundboard/normalized.mp3") + assert result == expected + + def test_get_sound_file_path_tts_original(self): + """Test getting sound file path for TTS type original file.""" + sound = Sound( + id=2, + name="TTS Sound", + filename="tts_file.wav", + type="TTS", + is_normalized=False, + ) + + result = get_sound_file_path(sound) + expected = Path("sounds/originals/text_to_speech/tts_file.wav") + assert result == expected + + def test_get_sound_file_path_tts_normalized(self): + """Test getting sound file path for TTS type normalized file.""" + sound = Sound( + id=2, + name="TTS Sound", + filename="original.wav", + normalized_filename="normalized.mp3", + type="TTS", + is_normalized=True, + ) + + result = get_sound_file_path(sound) + expected = Path("sounds/normalized/text_to_speech/normalized.mp3") + assert result == expected + + def test_get_sound_file_path_ext_original(self): + """Test getting sound file path for EXT type original file.""" + sound = Sound( + id=3, + name="Extracted Sound", + filename="extracted.mp3", + type="EXT", + is_normalized=False, + ) + + result = get_sound_file_path(sound) + expected = Path("sounds/originals/extracted/extracted.mp3") + assert result == expected + + def test_get_sound_file_path_ext_normalized(self): + """Test getting sound file path for EXT type normalized file.""" + sound = Sound( + id=3, + name="Extracted Sound", + filename="original.mp3", + normalized_filename="normalized.mp3", + type="EXT", + is_normalized=True, + ) + + result = get_sound_file_path(sound) + expected = Path("sounds/normalized/extracted/normalized.mp3") + assert result == expected + + def test_get_sound_file_path_unknown_type_fallback(self): + """Test getting sound file path for unknown type falls back to lowercase.""" + sound = Sound( + id=4, + name="Unknown Type Sound", + filename="unknown.mp3", + type="CUSTOM", + is_normalized=False, + ) + + result = get_sound_file_path(sound) + expected = Path("sounds/originals/custom/unknown.mp3") + assert result == expected + + def test_get_sound_file_path_normalized_without_filename(self): + """Test getting sound file path when normalized but no normalized_filename.""" + sound = Sound( + id=5, + name="Test Sound", + filename="original.mp3", + normalized_filename=None, + type="SDB", + is_normalized=True, # True but no normalized_filename + ) + + result = get_sound_file_path(sound) + # Should fall back to original file + expected = Path("sounds/originals/soundboard/original.mp3") + assert result == expected