feat: Add VLC player API endpoints and associated tests

- Implemented VLC player API endpoints for playing and stopping sounds.
- Added tests for successful playback, error handling, and authentication scenarios.
- Created utility function to get sound file paths based on sound properties.
- Refactored player service to utilize shared sound path utility.
- Enhanced test coverage for sound file path utility with various sound types.
- Introduced tests for VLC player service, including subprocess handling and play count tracking.
This commit is contained in:
JSC
2025-07-30 20:46:49 +02:00
parent 1b0d291ad3
commit dd10ef5d41
9 changed files with 1413 additions and 134 deletions

View File

@@ -8,10 +8,12 @@ from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.database import get_db from app.core.database import get_db
from app.core.dependencies import get_current_active_user_flexible from app.core.dependencies import get_current_active_user_flexible
from app.models.user import User from app.models.user import User
from app.repositories.sound import SoundRepository
from app.services.extraction import ExtractionInfo, ExtractionService from app.services.extraction import ExtractionInfo, ExtractionService
from app.services.extraction_processor import extraction_processor from app.services.extraction_processor import extraction_processor
from app.services.sound_normalizer import NormalizationResults, SoundNormalizerService from app.services.sound_normalizer import NormalizationResults, SoundNormalizerService
from app.services.sound_scanner import ScanResults, SoundScannerService from app.services.sound_scanner import ScanResults, SoundScannerService
from app.services.vlc_player import get_vlc_player_service, VLCPlayerService
router = APIRouter(prefix="/sounds", tags=["sounds"]) router = APIRouter(prefix="/sounds", tags=["sounds"])
@@ -37,6 +39,19 @@ async def get_extraction_service(
return ExtractionService(session) return ExtractionService(session)
def get_vlc_player() -> VLCPlayerService:
"""Get the VLC player service."""
from app.core.database import get_session_factory
return get_vlc_player_service(get_session_factory())
async def get_sound_repository(
session: Annotated[AsyncSession, Depends(get_db)],
) -> SoundRepository:
"""Get the sound repository."""
return SoundRepository(session)
# SCAN # SCAN
@router.post("/scan") @router.post("/scan")
async def scan_sounds( async def scan_sounds(
@@ -349,3 +364,64 @@ async def get_user_extractions(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get extractions: {e!s}", detail=f"Failed to get extractions: {e!s}",
) from e ) from e
# VLC PLAYER
@router.post("/vlc/play/{sound_id}")
async def play_sound_with_vlc(
sound_id: int,
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
vlc_player: Annotated[VLCPlayerService, Depends(get_vlc_player)],
sound_repo: Annotated[SoundRepository, Depends(get_sound_repository)],
) -> dict[str, str | int | bool]:
"""Play a sound using VLC subprocess."""
try:
# Get the sound
sound = await sound_repo.get_by_id(sound_id)
if not sound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Sound with ID {sound_id} not found",
)
# Play the sound using VLC
success = await vlc_player.play_sound(sound)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to launch VLC for sound playback",
)
return {
"message": f"Sound '{sound.name}' is now playing via VLC",
"sound_id": sound_id,
"sound_name": sound.name,
"success": True,
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to play sound: {e!s}",
) from e
@router.post("/vlc/stop-all")
async def stop_all_vlc_instances(
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
vlc_player: Annotated[VLCPlayerService, Depends(get_vlc_player)],
) -> dict:
"""Stop all running VLC instances."""
try:
result = await vlc_player.stop_all_vlc_instances()
return result
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to stop VLC instances: {e!s}",
) from e

View File

@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from sqlmodel import Field, Relationship, UniqueConstraint from sqlmodel import Field, Relationship
from app.models.base import BaseModel from app.models.base import BaseModel
@@ -14,18 +14,9 @@ class SoundPlayed(BaseModel, table=True):
__tablename__ = "sound_played" # pyright: ignore[reportAssignmentType] __tablename__ = "sound_played" # pyright: ignore[reportAssignmentType]
user_id: int = Field(foreign_key="user.id", nullable=False) user_id: int | None = Field(foreign_key="user.id", nullable=True)
sound_id: int = Field(foreign_key="sound.id", nullable=False) sound_id: int = Field(foreign_key="sound.id", nullable=False)
# constraints
__table_args__ = (
UniqueConstraint(
"user_id",
"sound_id",
name="uq_sound_played_user_sound",
),
)
# relationships # relationships
user: "User" = Relationship(back_populates="sounds_played") user: "User" = Relationship(back_populates="sounds_played")
sound: "Sound" = Relationship(back_populates="play_history") sound: "Sound" = Relationship(back_populates="play_history")

View File

@@ -9,15 +9,14 @@ from pathlib import Path
from typing import Any from typing import Any
import vlc # type: ignore[import-untyped] import vlc # type: ignore[import-untyped]
from sqlmodel import select
from app.core.logging import get_logger from app.core.logging import get_logger
from app.models.sound import Sound from app.models.sound import Sound
from app.models.sound_played import SoundPlayed from app.models.sound_played import SoundPlayed
from app.repositories.playlist import PlaylistRepository from app.repositories.playlist import PlaylistRepository
from app.repositories.sound import SoundRepository from app.repositories.sound import SoundRepository
from app.repositories.user import UserRepository
from app.services.socket import socket_manager from app.services.socket import socket_manager
from app.utils.audio import get_sound_file_path
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -198,7 +197,7 @@ class PlayerService:
return return
# Get sound file path # Get sound file path
sound_path = self._get_sound_file_path(self.state.current_sound) sound_path = get_sound_file_path(self.state.current_sound)
if not sound_path.exists(): if not sound_path.exists():
logger.error("Sound file not found: %s", sound_path) logger.error("Sound file not found: %s", sound_path)
return return
@@ -344,6 +343,12 @@ class PlayerService:
if self.state.status != PlayerStatus.STOPPED: if self.state.status != PlayerStatus.STOPPED:
await self._stop_playback() await self._stop_playback()
# Set first track as current if no current track and playlist has sounds
if not self.state.current_sound_id and sounds:
self.state.current_sound_index = 0
self.state.current_sound = sounds[0]
self.state.current_sound_id = sounds[0].id
logger.info( logger.info(
"Loaded playlist: %s (%s sounds)", "Loaded playlist: %s (%s sounds)",
current_playlist.name, current_playlist.name,
@@ -360,21 +365,6 @@ class PlayerService:
"""Get current player state.""" """Get current player state."""
return self.state.to_dict() return self.state.to_dict()
def _get_sound_file_path(self, sound: Sound) -> Path:
"""Get the file path for a sound."""
# Determine the correct subdirectory based on sound type
subdir = "extracted" if sound.type.upper() == "EXT" else sound.type.lower()
# Use normalized file if available, otherwise original
if sound.is_normalized and sound.normalized_filename:
return (
Path("sounds/normalized")
/ subdir
/ sound.normalized_filename
)
return (
Path("sounds/originals") / subdir / sound.filename
)
def _get_next_index(self, current_index: int) -> int | None: def _get_next_index(self, current_index: int) -> int | None:
"""Get next track index based on current mode.""" """Get next track index based on current mode."""
@@ -501,7 +491,6 @@ class PlayerService:
session = self.db_session_factory() session = self.db_session_factory()
try: try:
sound_repo = SoundRepository(session) sound_repo = SoundRepository(session)
user_repo = UserRepository(session)
# Update sound play count # Update sound play count
sound = await sound_repo.get_by_id(sound_id) sound = await sound_repo.get_by_id(sound_id)
@@ -519,37 +508,17 @@ class PlayerService:
else: else:
logger.warning("Sound %s not found for play count update", sound_id) logger.warning("Sound %s not found for play count update", sound_id)
# Record play history for admin user (ID 1) as placeholder # Record play history without user_id for player-based plays
# This could be refined to track per-user play history # Always create a new SoundPlayed record for each play event
admin_user = await user_repo.get_by_id(1) sound_played = SoundPlayed(
if admin_user: user_id=None, # No user_id for player-based plays
# Check if already recorded for this user using proper query sound_id=sound_id,
stmt = select(SoundPlayed).where( )
SoundPlayed.user_id == admin_user.id, session.add(sound_played)
SoundPlayed.sound_id == sound_id, logger.info(
) "Created SoundPlayed record for player play, sound %s",
result = await session.exec(stmt) sound_id,
existing = result.first() )
if not existing:
sound_played = SoundPlayed(
user_id=admin_user.id,
sound_id=sound_id,
)
session.add(sound_played)
logger.info(
"Created SoundPlayed record for user %s, sound %s",
admin_user.id,
sound_id,
)
else:
logger.info(
"SoundPlayed record already exists for user %s, sound %s",
admin_user.id,
sound_id,
)
else:
logger.warning("Admin user (ID 1) not found for play history")
await session.commit() await session.commit()
logger.info("Successfully recorded play count for sound %s", sound_id) logger.info("Successfully recorded play count for sound %s", sound_id)

313
app/services/vlc_player.py Normal file
View File

@@ -0,0 +1,313 @@
"""VLC subprocess-based player service for immediate sound playback."""
import asyncio
import subprocess
from collections.abc import Callable
from pathlib import Path
from typing import Any
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.logging import get_logger
from app.models.sound import Sound
from app.models.sound_played import SoundPlayed
from app.repositories.sound import SoundRepository
from app.repositories.user import UserRepository
from app.services.socket import socket_manager
from app.utils.audio import get_sound_file_path
logger = get_logger(__name__)
class VLCPlayerService:
"""Service for launching VLC instances via subprocess to play sounds."""
def __init__(
self, db_session_factory: Callable[[], AsyncSession] | None = None,
) -> None:
"""Initialize the VLC player service."""
self.vlc_executable = self._find_vlc_executable()
self.db_session_factory = db_session_factory
logger.info(
"VLC Player Service initialized with executable: %s",
self.vlc_executable,
)
def _find_vlc_executable(self) -> str:
"""Find VLC executable path based on the operating system."""
# Common VLC executable paths
possible_paths = [
"vlc", # Linux/Mac with VLC in PATH
"/usr/bin/vlc", # Linux
"/usr/local/bin/vlc", # Linux/Mac
"/Applications/VLC.app/Contents/MacOS/VLC", # macOS
"C:\\Program Files\\VideoLAN\\VLC\\vlc.exe", # Windows
"C:\\Program Files (x86)\\VideoLAN\\VLC\\vlc.exe", # Windows 32-bit
]
for path in possible_paths:
try:
if Path(path).exists():
return path
# For "vlc", try to find it in PATH
if path == "vlc":
result = subprocess.run(
["which", "vlc"],
capture_output=True,
check=False,
text=True,
)
if result.returncode == 0:
return path
except (OSError, subprocess.SubprocessError):
continue
# Default to 'vlc' and let the system handle it
logger.warning(
"VLC executable not found in common paths, using 'vlc' from PATH",
)
return "vlc"
async def play_sound(self, sound: Sound) -> bool:
"""Play a sound using a new VLC subprocess instance.
Args:
sound: The Sound object to play
Returns:
bool: True if VLC process was launched successfully, False otherwise
"""
try:
sound_path = get_sound_file_path(sound)
if not sound_path.exists():
logger.error("Sound file not found: %s", sound_path)
return False
# VLC command arguments for immediate playback
cmd = [
self.vlc_executable,
str(sound_path),
"--play-and-exit", # Exit VLC when playback finishes
"--intf",
"dummy", # No interface
"--no-video", # Audio only
"--no-repeat", # Don't repeat
"--no-loop", # Don't loop
]
# Launch VLC process asynchronously without waiting
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
logger.info(
"Launched VLC process (PID: %s) for sound: %s",
process.pid,
sound.name,
)
# Record play count and emit event
if self.db_session_factory and sound.id:
asyncio.create_task(self._record_play_count(sound.id, sound.name))
return True
except Exception:
logger.exception("Failed to launch VLC for sound %s", sound.name)
return False
async def stop_all_vlc_instances(self) -> dict[str, Any]:
"""Stop all running VLC processes by killing them.
Returns:
dict: Results of the stop operation including counts and any errors
"""
try:
# Find all VLC processes
find_cmd = ["pgrep", "-f", "vlc"]
find_process = await asyncio.create_subprocess_exec(
*find_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await find_process.communicate()
if find_process.returncode != 0:
# No VLC processes found
logger.info("No VLC processes found to stop")
return {
"success": True,
"processes_found": 0,
"processes_killed": 0,
"message": "No VLC processes found",
}
# Parse PIDs from output
pids = []
if stdout:
pids = [
pid.strip()
for pid in stdout.decode().strip().split("\n")
if pid.strip()
]
if not pids:
logger.info("No VLC processes found to stop")
return {
"success": True,
"processes_found": 0,
"processes_killed": 0,
"message": "No VLC processes found",
}
logger.info("Found %s VLC processes: %s", len(pids), ", ".join(pids))
# Kill all VLC processes
kill_cmd = ["pkill", "-f", "vlc"]
kill_process = await asyncio.create_subprocess_exec(
*kill_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await kill_process.communicate()
# Verify processes were killed
verify_process = await asyncio.create_subprocess_exec(
*find_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_verify, _ = await verify_process.communicate()
remaining_pids = []
if verify_process.returncode == 0 and stdout_verify:
remaining_pids = [
pid.strip()
for pid in stdout_verify.decode().strip().split("\n")
if pid.strip()
]
processes_killed = len(pids) - len(remaining_pids)
logger.info(
"Kill operation completed. Found: %s, Killed: %s, Remaining: %s",
len(pids),
processes_killed,
len(remaining_pids),
)
return {
"success": True,
"processes_found": len(pids),
"processes_killed": processes_killed,
"processes_remaining": len(remaining_pids),
"message": f"Killed {processes_killed} VLC processes",
}
except Exception as e:
logger.exception("Failed to stop VLC processes")
return {
"success": False,
"processes_found": 0,
"processes_killed": 0,
"error": str(e),
"message": "Failed to stop VLC processes",
}
async def _record_play_count(self, sound_id: int, sound_name: str) -> None:
"""Record a play count for a sound and emit sound_played event."""
if not self.db_session_factory:
logger.warning(
"No database session factory available for play count recording",
)
return
logger.info("Recording play count for sound %s", sound_id)
session = self.db_session_factory()
try:
sound_repo = SoundRepository(session)
user_repo = UserRepository(session)
# Update sound play count
sound = await sound_repo.get_by_id(sound_id)
old_count = 0
if sound:
old_count = sound.play_count
await sound_repo.update(
sound,
{"play_count": sound.play_count + 1},
)
logger.info(
"Updated sound %s play_count: %s -> %s",
sound_id,
old_count,
old_count + 1,
)
else:
logger.warning("Sound %s not found for play count update", sound_id)
# Record play history for admin user (ID 1) as placeholder
# This could be refined to track per-user play history
admin_user = await user_repo.get_by_id(1)
admin_user_id = None
if admin_user:
admin_user_id = admin_user.id
# Always create a new SoundPlayed record for each play event
sound_played = SoundPlayed(
user_id=admin_user_id, # Can be None for player-based plays
sound_id=sound_id,
)
session.add(sound_played)
logger.info(
"Created SoundPlayed record for user %s, sound %s",
admin_user_id,
sound_id,
)
await session.commit()
logger.info("Successfully recorded play count for sound %s", sound_id)
# Emit sound_played event via WebSocket
try:
event_data = {
"sound_id": sound_id,
"sound_name": sound_name,
"user_id": admin_user_id,
"play_count": (old_count + 1) if sound else None,
}
await socket_manager.broadcast_to_all("sound_played", event_data)
logger.info("Broadcasted sound_played event for sound %s", sound_id)
except Exception:
logger.exception(
"Failed to broadcast sound_played event for sound %s", sound_id,
)
except Exception:
logger.exception("Error recording play count for sound %s", sound_id)
await session.rollback()
finally:
await session.close()
# Global VLC player service instance
vlc_player_service: VLCPlayerService | None = None
def get_vlc_player_service(
db_session_factory: Callable[[], AsyncSession] | None = None,
) -> VLCPlayerService:
"""Get the global VLC player service instance."""
global vlc_player_service # noqa: PLW0603
if vlc_player_service is None:
vlc_player_service = VLCPlayerService(db_session_factory)
return vlc_player_service

View File

@@ -2,11 +2,15 @@
import hashlib import hashlib
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
import ffmpeg # type: ignore[import-untyped] import ffmpeg # type: ignore[import-untyped]
from app.core.logging import get_logger from app.core.logging import get_logger
if TYPE_CHECKING:
from app.models.sound import Sound
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -33,3 +37,30 @@ def get_audio_duration(file_path: Path) -> int:
except Exception as e: except Exception as e:
logger.warning("Failed to get duration for %s: %s", file_path, e) logger.warning("Failed to get duration for %s: %s", file_path, e)
return 0 return 0
def get_sound_file_path(sound: "Sound") -> Path:
"""Get the file path for a sound based on its type and normalization status.
Args:
sound: The Sound object to get the path for
Returns:
Path: The full path to the sound file
"""
# Determine the correct subdirectory based on sound type
if sound.type.upper() == "EXT":
subdir = "extracted"
elif sound.type.upper() == "SDB":
subdir = "soundboard"
elif sound.type.upper() == "TTS":
subdir = "text_to_speech"
else:
# Fallback to lowercase type
subdir = sound.type.lower()
# Use normalized file if available, otherwise original
if sound.is_normalized and sound.normalized_filename:
return Path("sounds/normalized") / subdir / sound.normalized_filename
return Path("sounds/originals") / subdir / sound.filename

View File

@@ -0,0 +1,305 @@
"""Tests for VLC player API endpoints."""
from unittest.mock import AsyncMock, patch
import pytest
from httpx import AsyncClient
from app.models.sound import Sound
from app.models.user import User
class TestVLCEndpoints:
"""Test VLC player API endpoints."""
@pytest.mark.asyncio
async def test_play_sound_with_vlc_success(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test successful sound playback via VLC."""
# Mock the VLC player service and sound repository methods
with patch("app.services.vlc_player.VLCPlayerService.play_sound") as mock_play_sound:
mock_play_sound.return_value = True
with patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_by_id:
mock_sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=5000,
size=1024,
hash="test_hash",
)
mock_get_by_id.return_value = mock_sound
response = await authenticated_client.post("/api/v1/sounds/vlc/play/1")
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["sound_id"] == 1
assert data["sound_name"] == "Test Sound"
assert "Test Sound" in data["message"]
# Verify service calls
mock_get_by_id.assert_called_once_with(1)
mock_play_sound.assert_called_once_with(mock_sound)
@pytest.mark.asyncio
async def test_play_sound_with_vlc_sound_not_found(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test VLC playback when sound is not found."""
# Mock the sound repository to return None
with patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_by_id:
mock_get_by_id.return_value = None
response = await authenticated_client.post("/api/v1/sounds/vlc/play/999")
assert response.status_code == 404
data = response.json()
assert "Sound with ID 999 not found" in data["detail"]
@pytest.mark.asyncio
async def test_play_sound_with_vlc_launch_failure(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test VLC playback when VLC launch fails."""
# Mock the VLC player service to fail
with patch("app.services.vlc_player.VLCPlayerService.play_sound") as mock_play_sound:
mock_play_sound.return_value = False
with patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_by_id:
mock_sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=5000,
size=1024,
hash="test_hash",
)
mock_get_by_id.return_value = mock_sound
response = await authenticated_client.post("/api/v1/sounds/vlc/play/1")
assert response.status_code == 500
data = response.json()
assert "Failed to launch VLC for sound playback" in data["detail"]
@pytest.mark.asyncio
async def test_play_sound_with_vlc_service_exception(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test VLC playback when service raises an exception."""
# Mock the sound repository to raise an exception
with patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_by_id:
mock_get_by_id.side_effect = Exception("Database error")
response = await authenticated_client.post("/api/v1/sounds/vlc/play/1")
assert response.status_code == 500
data = response.json()
assert "Failed to play sound" in data["detail"]
@pytest.mark.asyncio
async def test_play_sound_with_vlc_unauthenticated(
self,
client: AsyncClient,
):
"""Test VLC playback without authentication."""
response = await client.post("/api/v1/sounds/vlc/play/1")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_stop_all_vlc_instances_success(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test successful stopping of all VLC instances."""
# Mock the VLC player service
with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all:
mock_result = {
"success": True,
"processes_found": 3,
"processes_killed": 3,
"processes_remaining": 0,
"message": "Killed 3 VLC processes",
}
mock_stop_all.return_value = mock_result
response = await authenticated_client.post("/api/v1/sounds/vlc/stop-all")
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["processes_found"] == 3
assert data["processes_killed"] == 3
assert data["processes_remaining"] == 0
assert "Killed 3 VLC processes" in data["message"]
# Verify service call
mock_stop_all.assert_called_once()
@pytest.mark.asyncio
async def test_stop_all_vlc_instances_no_processes(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test stopping VLC instances when none are running."""
# Mock the VLC player service
with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all:
mock_result = {
"success": True,
"processes_found": 0,
"processes_killed": 0,
"message": "No VLC processes found",
}
mock_stop_all.return_value = mock_result
response = await authenticated_client.post("/api/v1/sounds/vlc/stop-all")
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["processes_found"] == 0
assert data["processes_killed"] == 0
assert data["message"] == "No VLC processes found"
@pytest.mark.asyncio
async def test_stop_all_vlc_instances_partial_success(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test stopping VLC instances with partial success."""
# Mock the VLC player service
with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all:
mock_result = {
"success": True,
"processes_found": 3,
"processes_killed": 2,
"processes_remaining": 1,
"message": "Killed 2 VLC processes",
}
mock_stop_all.return_value = mock_result
response = await authenticated_client.post("/api/v1/sounds/vlc/stop-all")
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["processes_found"] == 3
assert data["processes_killed"] == 2
assert data["processes_remaining"] == 1
@pytest.mark.asyncio
async def test_stop_all_vlc_instances_failure(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test stopping VLC instances when service fails."""
# Mock the VLC player service
with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all:
mock_result = {
"success": False,
"processes_found": 0,
"processes_killed": 0,
"error": "Command failed",
"message": "Failed to stop VLC processes",
}
mock_stop_all.return_value = mock_result
response = await authenticated_client.post("/api/v1/sounds/vlc/stop-all")
assert response.status_code == 200
data = response.json()
assert data["success"] is False
assert data["error"] == "Command failed"
assert data["message"] == "Failed to stop VLC processes"
@pytest.mark.asyncio
async def test_stop_all_vlc_instances_service_exception(
self,
authenticated_client: AsyncClient,
authenticated_user: User,
):
"""Test stopping VLC instances when service raises an exception."""
# Mock the VLC player service to raise an exception
with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all:
mock_stop_all.side_effect = Exception("Service error")
response = await authenticated_client.post("/api/v1/sounds/vlc/stop-all")
assert response.status_code == 500
data = response.json()
assert "Failed to stop VLC instances" in data["detail"]
@pytest.mark.asyncio
async def test_stop_all_vlc_instances_unauthenticated(
self,
client: AsyncClient,
):
"""Test stopping VLC instances without authentication."""
response = await client.post("/api/v1/sounds/vlc/stop-all")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_vlc_endpoints_with_admin_user(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test VLC endpoints work with admin user."""
# Test play endpoint with admin
with patch("app.services.vlc_player.VLCPlayerService.play_sound") as mock_play_sound:
mock_play_sound.return_value = True
with patch("app.repositories.sound.SoundRepository.get_by_id") as mock_get_by_id:
mock_sound = Sound(
id=1,
type="SDB",
name="Admin Test Sound",
filename="admin_test.mp3",
duration=3000,
size=512,
hash="admin_hash",
)
mock_get_by_id.return_value = mock_sound
response = await authenticated_admin_client.post("/api/v1/sounds/vlc/play/1")
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["sound_name"] == "Admin Test Sound"
# Test stop-all endpoint with admin
with patch("app.services.vlc_player.VLCPlayerService.stop_all_vlc_instances") as mock_stop_all:
mock_result = {
"success": True,
"processes_found": 1,
"processes_killed": 1,
"processes_remaining": 0,
"message": "Killed 1 VLC processes",
}
mock_stop_all.return_value = mock_result
response = await authenticated_admin_client.post("/api/v1/sounds/vlc/stop-all")
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["processes_killed"] == 1

View File

@@ -21,6 +21,7 @@ from app.services.player import (
initialize_player_service, initialize_player_service,
shutdown_player_service, shutdown_player_service,
) )
from app.utils.audio import get_sound_file_path
class TestPlayerState: class TestPlayerState:
@@ -196,7 +197,7 @@ class TestPlayerService:
) )
player_service.state.playlist_sounds = [sound] player_service.state.playlist_sounds = [sound]
with patch.object(player_service, "_get_sound_file_path") as mock_path: with patch("app.services.player.get_sound_file_path") as mock_path:
mock_file_path = Mock(spec=Path) mock_file_path = Mock(spec=Path)
mock_file_path.exists.return_value = True mock_file_path.exists.return_value = True
mock_path.return_value = mock_file_path mock_path.return_value = mock_file_path
@@ -385,51 +386,6 @@ class TestPlayerService:
assert player_service.state.playlist_length == 2 assert player_service.state.playlist_length == 2
assert player_service.state.playlist_duration == 75000 assert player_service.state.playlist_duration == 75000
def test_get_sound_file_path_normalized(self, player_service):
"""Test getting file path for normalized sound."""
sound = Sound(
id=1,
name="Test Song",
filename="original.mp3",
normalized_filename="normalized.mp3",
is_normalized=True,
type="SDB",
)
result = player_service._get_sound_file_path(sound)
expected = Path("sounds/normalized/sdb/normalized.mp3")
assert result == expected
def test_get_sound_file_path_original(self, player_service):
"""Test getting file path for original sound."""
sound = Sound(
id=1,
name="Test Song",
filename="original.mp3",
is_normalized=False,
type="SDB",
)
result = player_service._get_sound_file_path(sound)
expected = Path("sounds/originals/sdb/original.mp3")
assert result == expected
def test_get_sound_file_path_ext_type(self, player_service):
"""Test getting file path for EXT type sound."""
sound = Sound(
id=1,
name="Test Song",
filename="extracted.mp3",
is_normalized=False,
type="EXT",
)
result = player_service._get_sound_file_path(sound)
expected = Path("sounds/originals/extracted/extracted.mp3")
assert result == expected
def test_get_next_index_continuous_mode(self, player_service): def test_get_next_index_continuous_mode(self, player_service):
"""Test getting next index in continuous mode.""" """Test getting next index in continuous mode."""
@@ -538,36 +494,24 @@ class TestPlayerService:
# Mock repositories # Mock repositories
with patch("app.services.player.SoundRepository") as mock_sound_repo_class: with patch("app.services.player.SoundRepository") as mock_sound_repo_class:
with patch("app.services.player.UserRepository") as mock_user_repo_class: mock_sound_repo = AsyncMock()
mock_sound_repo = AsyncMock() mock_sound_repo_class.return_value = mock_sound_repo
mock_user_repo = AsyncMock()
mock_sound_repo_class.return_value = mock_sound_repo
mock_user_repo_class.return_value = mock_user_repo
# Mock sound and user # Mock sound
mock_sound = Mock() mock_sound = Mock()
mock_sound.play_count = 5 mock_sound.play_count = 5
mock_sound_repo.get_by_id.return_value = mock_sound mock_sound_repo.get_by_id.return_value = mock_sound
mock_user = Mock() await player_service._record_play_count(1)
mock_user.id = 1
mock_user_repo.get_by_id.return_value = mock_user
# Mock no existing SoundPlayed record # Verify sound play count was updated
mock_result = Mock() mock_sound_repo.update.assert_called_once_with(
mock_result.first.return_value = None mock_sound, {"play_count": 6}
mock_session.exec.return_value = mock_result )
await player_service._record_play_count(1) # Verify SoundPlayed record was created with None user_id for player
mock_session.add.assert_called_once()
# Verify sound play count was updated mock_session.commit.assert_called_once()
mock_sound_repo.update.assert_called_once_with(
mock_sound, {"play_count": 6}
)
# Verify SoundPlayed record was created
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
def test_get_state(self, player_service): def test_get_state(self, player_service):
"""Test getting current player state.""" """Test getting current player state."""
@@ -577,6 +521,27 @@ class TestPlayerService:
assert "mode" in result assert "mode" in result
assert "volume" in result assert "volume" in result
def test_uses_shared_sound_path_utility(self, player_service):
"""Test that player service uses the shared sound path utility."""
sound = Sound(
id=1,
name="Test Song",
filename="test.mp3",
type="SDB",
is_normalized=False,
)
player_service.state.playlist_sounds = [sound]
with patch("app.services.player.get_sound_file_path") as mock_path:
mock_file_path = Mock(spec=Path)
mock_file_path.exists.return_value = False # File doesn't exist
mock_path.return_value = mock_file_path
# This should fail because file doesn't exist
result = asyncio.run(player_service.play(0))
# Verify the utility was called
mock_path.assert_called_once_with(sound)
class TestPlayerServiceGlobalFunctions: class TestPlayerServiceGlobalFunctions:
"""Test global player service functions.""" """Test global player service functions."""

View File

@@ -0,0 +1,511 @@
"""Tests for VLC player service."""
import asyncio
from pathlib import Path
from unittest.mock import AsyncMock, Mock, patch
import pytest
from app.models.sound import Sound
from app.models.sound_played import SoundPlayed
from app.models.user import User
from app.services.vlc_player import VLCPlayerService, get_vlc_player_service
from app.utils.audio import get_sound_file_path
class TestVLCPlayerService:
"""Test VLC player service."""
@pytest.fixture
def vlc_service(self):
"""Create a VLC service instance."""
with patch("app.services.vlc_player.subprocess.run") as mock_run:
# Mock VLC executable detection
mock_run.return_value.returncode = 0
return VLCPlayerService()
@pytest.fixture
def vlc_service_with_db(self):
"""Create a VLC service instance with database session factory."""
with patch("app.services.vlc_player.subprocess.run") as mock_run:
# Mock VLC executable detection
mock_run.return_value.returncode = 0
mock_session_factory = Mock()
return VLCPlayerService(mock_session_factory)
@pytest.fixture
def sample_sound(self):
"""Create a sample sound for testing."""
return Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test_audio.mp3",
duration=5000,
size=1024,
hash="test_hash",
is_normalized=False,
normalized_filename=None,
)
@pytest.fixture
def normalized_sound(self):
"""Create a normalized sound for testing."""
return Sound(
id=2,
type="TTS",
name="Normalized Sound",
filename="original.wav",
duration=7500,
size=2048,
hash="normalized_hash",
is_normalized=True,
normalized_filename="normalized.mp3",
)
def test_init(self, vlc_service):
"""Test VLC service initialization."""
assert vlc_service.vlc_executable is not None
assert isinstance(vlc_service.vlc_executable, str)
@patch("app.services.vlc_player.subprocess.run")
def test_find_vlc_executable_found_in_path(self, mock_run):
"""Test VLC executable detection when found in PATH."""
mock_run.return_value.returncode = 0
service = VLCPlayerService()
assert service.vlc_executable == "vlc"
@patch("app.services.vlc_player.subprocess.run")
def test_find_vlc_executable_found_by_path(self, mock_run):
"""Test VLC executable detection when found by absolute path."""
mock_run.return_value.returncode = 1 # which command fails
# Mock Path to return True for the first absolute path
with patch("app.services.vlc_player.Path") as mock_path:
def path_side_effect(path_str):
mock_instance = Mock()
mock_instance.exists.return_value = str(path_str) == "/usr/bin/vlc"
return mock_instance
mock_path.side_effect = path_side_effect
service = VLCPlayerService()
assert service.vlc_executable == "/usr/bin/vlc"
@patch("app.services.vlc_player.subprocess.run")
@patch("app.services.vlc_player.Path")
def test_find_vlc_executable_fallback(self, mock_path, mock_run):
"""Test VLC executable detection fallback to default."""
# Mock all paths as non-existent
mock_path_instance = Mock()
mock_path_instance.exists.return_value = False
mock_path.return_value = mock_path_instance
# Mock which command as failing
mock_run.return_value.returncode = 1
service = VLCPlayerService()
assert service.vlc_executable == "vlc"
@pytest.mark.asyncio
@patch("app.services.vlc_player.asyncio.create_subprocess_exec")
async def test_play_sound_success(
self, mock_subprocess, vlc_service, sample_sound
):
"""Test successful sound playback."""
# Mock subprocess
mock_process = Mock()
mock_process.pid = 12345
mock_subprocess.return_value = mock_process
# Mock the file path utility to avoid Path issues
with patch("app.services.vlc_player.get_sound_file_path") as mock_get_path:
mock_path = Mock()
mock_path.exists.return_value = True
mock_get_path.return_value = mock_path
result = await vlc_service.play_sound(sample_sound)
assert result is True
mock_subprocess.assert_called_once()
args = mock_subprocess.call_args
# Check command arguments
cmd_args = args[1] # keyword arguments
assert "--play-and-exit" in args[0]
assert "--intf" in args[0]
assert "dummy" in args[0]
assert "--no-video" in args[0]
assert "--no-repeat" in args[0]
assert "--no-loop" in args[0]
assert cmd_args["stdout"] == asyncio.subprocess.DEVNULL
assert cmd_args["stderr"] == asyncio.subprocess.DEVNULL
@pytest.mark.asyncio
async def test_play_sound_file_not_found(
self, vlc_service, sample_sound
):
"""Test sound playback when file doesn't exist."""
# Mock the file path utility to return a non-existent path
with patch("app.services.vlc_player.get_sound_file_path") as mock_get_path:
mock_path = Mock()
mock_path.exists.return_value = False
mock_get_path.return_value = mock_path
result = await vlc_service.play_sound(sample_sound)
assert result is False
@pytest.mark.asyncio
@patch("app.services.vlc_player.asyncio.create_subprocess_exec")
async def test_play_sound_subprocess_error(
self, mock_subprocess, vlc_service, sample_sound
):
"""Test sound playback when subprocess fails."""
# Mock the file path utility to return an existing path
with patch("app.services.vlc_player.get_sound_file_path") as mock_get_path:
mock_path = Mock()
mock_path.exists.return_value = True
mock_get_path.return_value = mock_path
# Mock subprocess exception
mock_subprocess.side_effect = Exception("Subprocess failed")
result = await vlc_service.play_sound(sample_sound)
assert result is False
@pytest.mark.asyncio
@patch("app.services.vlc_player.asyncio.create_subprocess_exec")
async def test_stop_all_vlc_instances_success(self, mock_subprocess, vlc_service):
"""Test successful stopping of all VLC instances."""
# Mock pgrep process (find VLC processes)
mock_find_process = Mock()
mock_find_process.returncode = 0
mock_find_process.communicate = AsyncMock(
return_value=(b"12345\n67890\n", b"")
)
# Mock pkill process (kill VLC processes)
mock_kill_process = Mock()
mock_kill_process.communicate = AsyncMock(return_value=(b"", b""))
# Mock verify process (check remaining processes)
mock_verify_process = Mock()
mock_verify_process.returncode = 1 # No processes found
mock_verify_process.communicate = AsyncMock(return_value=(b"", b""))
# Set up subprocess mock to return different processes for each call
mock_subprocess.side_effect = [
mock_find_process,
mock_kill_process,
mock_verify_process,
]
result = await vlc_service.stop_all_vlc_instances()
assert result["success"] is True
assert result["processes_found"] == 2
assert result["processes_killed"] == 2
assert result["processes_remaining"] == 0
assert "Killed 2 VLC processes" in result["message"]
@pytest.mark.asyncio
@patch("app.services.vlc_player.asyncio.create_subprocess_exec")
async def test_stop_all_vlc_instances_no_processes(
self, mock_subprocess, vlc_service
):
"""Test stopping VLC instances when none are running."""
# Mock pgrep process (no VLC processes found)
mock_find_process = Mock()
mock_find_process.returncode = 1 # No processes found
mock_find_process.communicate = AsyncMock(return_value=(b"", b""))
mock_subprocess.return_value = mock_find_process
result = await vlc_service.stop_all_vlc_instances()
assert result["success"] is True
assert result["processes_found"] == 0
assert result["processes_killed"] == 0
assert result["message"] == "No VLC processes found"
@pytest.mark.asyncio
@patch("app.services.vlc_player.asyncio.create_subprocess_exec")
async def test_stop_all_vlc_instances_partial_kill(
self, mock_subprocess, vlc_service
):
"""Test stopping VLC instances when some processes remain."""
# Mock pgrep process (find VLC processes)
mock_find_process = Mock()
mock_find_process.returncode = 0
mock_find_process.communicate = AsyncMock(
return_value=(b"12345\n67890\n11111\n", b"")
)
# Mock pkill process (kill VLC processes)
mock_kill_process = Mock()
mock_kill_process.communicate = AsyncMock(return_value=(b"", b""))
# Mock verify process (one process remains)
mock_verify_process = Mock()
mock_verify_process.returncode = 0
mock_verify_process.communicate = AsyncMock(return_value=(b"11111\n", b""))
mock_subprocess.side_effect = [
mock_find_process,
mock_kill_process,
mock_verify_process,
]
result = await vlc_service.stop_all_vlc_instances()
assert result["success"] is True
assert result["processes_found"] == 3
assert result["processes_killed"] == 2
assert result["processes_remaining"] == 1
@pytest.mark.asyncio
@patch("app.services.vlc_player.asyncio.create_subprocess_exec")
async def test_stop_all_vlc_instances_error(self, mock_subprocess, vlc_service):
"""Test stopping VLC instances when an error occurs."""
# Mock subprocess exception
mock_subprocess.side_effect = Exception("Command failed")
result = await vlc_service.stop_all_vlc_instances()
assert result["success"] is False
assert result["processes_found"] == 0
assert result["processes_killed"] == 0
assert "error" in result
assert result["message"] == "Failed to stop VLC processes"
def test_get_vlc_player_service_singleton(self):
"""Test that get_vlc_player_service returns the same instance."""
with patch("app.services.vlc_player.VLCPlayerService") as mock_service_class:
mock_instance = Mock()
mock_service_class.return_value = mock_instance
# Clear the global instance
import app.services.vlc_player
app.services.vlc_player.vlc_player_service = None
# First call should create new instance
service1 = get_vlc_player_service()
assert service1 == mock_instance
mock_service_class.assert_called_once()
# Second call should return same instance
service2 = get_vlc_player_service()
assert service2 == mock_instance
assert service1 is service2
# Constructor should not be called again
mock_service_class.assert_called_once()
@pytest.mark.asyncio
@patch("app.services.vlc_player.asyncio.create_subprocess_exec")
async def test_play_sound_with_play_count_tracking(
self, mock_subprocess, vlc_service_with_db, sample_sound
):
"""Test sound playback with play count tracking."""
# Mock subprocess
mock_process = Mock()
mock_process.pid = 12345
mock_subprocess.return_value = mock_process
# Mock session and repositories
mock_session = AsyncMock()
vlc_service_with_db.db_session_factory.return_value = mock_session
# Mock repositories
mock_sound_repo = AsyncMock()
mock_user_repo = AsyncMock()
with patch("app.services.vlc_player.SoundRepository", return_value=mock_sound_repo):
with patch("app.services.vlc_player.UserRepository", return_value=mock_user_repo):
with patch("app.services.vlc_player.socket_manager") as mock_socket:
with patch("app.services.vlc_player.select") as mock_select:
# Mock the file path utility
with patch("app.services.vlc_player.get_sound_file_path") as mock_get_path:
mock_path = Mock()
mock_path.exists.return_value = True
mock_get_path.return_value = mock_path
# Mock sound repository responses
updated_sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=5000,
size=1024,
hash="test_hash",
play_count=1, # Updated count
)
mock_sound_repo.get_by_id.return_value = sample_sound
mock_sound_repo.update.return_value = updated_sound
# Mock admin user
admin_user = User(
id=1,
email="admin@test.com",
name="Admin User",
role="admin",
)
mock_user_repo.get_by_id.return_value = admin_user
# Mock socket broadcast
mock_socket.broadcast_to_all = AsyncMock()
result = await vlc_service_with_db.play_sound(sample_sound)
# Wait a bit for the async task to complete
await asyncio.sleep(0.1)
assert result is True
# Verify subprocess was called
mock_subprocess.assert_called_once()
# Note: The async task runs in the background, so we can't easily
# verify the database operations in this test without more complex
# mocking or using a real async test framework setup
@pytest.mark.asyncio
async def test_record_play_count_success(self, vlc_service_with_db):
"""Test successful play count recording."""
# Mock session and repositories
mock_session = AsyncMock()
vlc_service_with_db.db_session_factory.return_value = mock_session
mock_sound_repo = AsyncMock()
mock_user_repo = AsyncMock()
# Create test sound and user
test_sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=5000,
size=1024,
hash="test_hash",
play_count=0,
)
admin_user = User(
id=1,
email="admin@test.com",
name="Admin User",
role="admin",
)
with patch("app.services.vlc_player.SoundRepository", return_value=mock_sound_repo):
with patch("app.services.vlc_player.UserRepository", return_value=mock_user_repo):
with patch("app.services.vlc_player.socket_manager") as mock_socket:
with patch("app.services.vlc_player.select") as mock_select:
# Setup mocks
mock_sound_repo.get_by_id.return_value = test_sound
mock_user_repo.get_by_id.return_value = admin_user
# Mock socket broadcast
mock_socket.broadcast_to_all = AsyncMock()
await vlc_service_with_db._record_play_count(1, "Test Sound")
# Verify sound repository calls
mock_sound_repo.get_by_id.assert_called_once_with(1)
mock_sound_repo.update.assert_called_once_with(
test_sound, {"play_count": 1}
)
# Verify user repository calls
mock_user_repo.get_by_id.assert_called_once_with(1)
# Verify session operations
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
mock_session.close.assert_called_once()
# Verify socket broadcast
mock_socket.broadcast_to_all.assert_called_once_with(
"sound_played",
{
"sound_id": 1,
"sound_name": "Test Sound",
"user_id": 1,
"play_count": 1,
},
)
@pytest.mark.asyncio
async def test_record_play_count_no_session_factory(self, vlc_service):
"""Test play count recording when no session factory is available."""
# This should not raise an error and should log a warning
await vlc_service._record_play_count(1, "Test Sound")
# The method should return early without doing anything
@pytest.mark.asyncio
async def test_record_play_count_always_creates_record(self, vlc_service_with_db):
"""Test play count recording always creates a new SoundPlayed record."""
# Mock session and repositories
mock_session = AsyncMock()
vlc_service_with_db.db_session_factory.return_value = mock_session
mock_sound_repo = AsyncMock()
mock_user_repo = AsyncMock()
# Create test sound and user
test_sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=5000,
size=1024,
hash="test_hash",
play_count=5,
)
admin_user = User(
id=1,
email="admin@test.com",
name="Admin User",
role="admin",
)
with patch("app.services.vlc_player.SoundRepository", return_value=mock_sound_repo):
with patch("app.services.vlc_player.UserRepository", return_value=mock_user_repo):
with patch("app.services.vlc_player.socket_manager") as mock_socket:
# Setup mocks
mock_sound_repo.get_by_id.return_value = test_sound
mock_user_repo.get_by_id.return_value = admin_user
# Mock socket broadcast
mock_socket.broadcast_to_all = AsyncMock()
await vlc_service_with_db._record_play_count(1, "Test Sound")
# Verify sound play count was updated
mock_sound_repo.update.assert_called_once_with(
test_sound, {"play_count": 6}
)
# Verify new SoundPlayed record was always added
mock_session.add.assert_called_once()
# Verify commit happened
mock_session.commit.assert_called_once()
def test_uses_shared_sound_path_utility(self, vlc_service, sample_sound):
"""Test that VLC service uses the shared sound path utility."""
with patch("app.services.vlc_player.get_sound_file_path") as mock_path:
mock_file_path = Mock(spec=Path)
mock_file_path.exists.return_value = False # File doesn't exist
mock_path.return_value = mock_file_path
# This should fail because file doesn't exist
result = asyncio.run(vlc_service.play_sound(sample_sound))
# Verify the utility was called and returned False
mock_path.assert_called_once_with(sample_sound)
assert result is False

View File

@@ -7,7 +7,8 @@ from unittest.mock import patch
import pytest import pytest
from app.utils.audio import get_audio_duration, get_file_hash, get_file_size from app.models.sound import Sound
from app.utils.audio import get_audio_duration, get_file_hash, get_file_size, get_sound_file_path
class TestAudioUtils: class TestAudioUtils:
@@ -290,3 +291,120 @@ class TestAudioUtils:
# Should raise FileNotFoundError for nonexistent file # Should raise FileNotFoundError for nonexistent file
with pytest.raises(FileNotFoundError): with pytest.raises(FileNotFoundError):
get_file_size(nonexistent_path) get_file_size(nonexistent_path)
def test_get_sound_file_path_sdb_original(self):
"""Test getting sound file path for SDB type original file."""
sound = Sound(
id=1,
name="Test Sound",
filename="test.mp3",
type="SDB",
is_normalized=False,
)
result = get_sound_file_path(sound)
expected = Path("sounds/originals/soundboard/test.mp3")
assert result == expected
def test_get_sound_file_path_sdb_normalized(self):
"""Test getting sound file path for SDB type normalized file."""
sound = Sound(
id=1,
name="Test Sound",
filename="original.mp3",
normalized_filename="normalized.mp3",
type="SDB",
is_normalized=True,
)
result = get_sound_file_path(sound)
expected = Path("sounds/normalized/soundboard/normalized.mp3")
assert result == expected
def test_get_sound_file_path_tts_original(self):
"""Test getting sound file path for TTS type original file."""
sound = Sound(
id=2,
name="TTS Sound",
filename="tts_file.wav",
type="TTS",
is_normalized=False,
)
result = get_sound_file_path(sound)
expected = Path("sounds/originals/text_to_speech/tts_file.wav")
assert result == expected
def test_get_sound_file_path_tts_normalized(self):
"""Test getting sound file path for TTS type normalized file."""
sound = Sound(
id=2,
name="TTS Sound",
filename="original.wav",
normalized_filename="normalized.mp3",
type="TTS",
is_normalized=True,
)
result = get_sound_file_path(sound)
expected = Path("sounds/normalized/text_to_speech/normalized.mp3")
assert result == expected
def test_get_sound_file_path_ext_original(self):
"""Test getting sound file path for EXT type original file."""
sound = Sound(
id=3,
name="Extracted Sound",
filename="extracted.mp3",
type="EXT",
is_normalized=False,
)
result = get_sound_file_path(sound)
expected = Path("sounds/originals/extracted/extracted.mp3")
assert result == expected
def test_get_sound_file_path_ext_normalized(self):
"""Test getting sound file path for EXT type normalized file."""
sound = Sound(
id=3,
name="Extracted Sound",
filename="original.mp3",
normalized_filename="normalized.mp3",
type="EXT",
is_normalized=True,
)
result = get_sound_file_path(sound)
expected = Path("sounds/normalized/extracted/normalized.mp3")
assert result == expected
def test_get_sound_file_path_unknown_type_fallback(self):
"""Test getting sound file path for unknown type falls back to lowercase."""
sound = Sound(
id=4,
name="Unknown Type Sound",
filename="unknown.mp3",
type="CUSTOM",
is_normalized=False,
)
result = get_sound_file_path(sound)
expected = Path("sounds/originals/custom/unknown.mp3")
assert result == expected
def test_get_sound_file_path_normalized_without_filename(self):
"""Test getting sound file path when normalized but no normalized_filename."""
sound = Sound(
id=5,
name="Test Sound",
filename="original.mp3",
normalized_filename=None,
type="SDB",
is_normalized=True, # True but no normalized_filename
)
result = get_sound_file_path(sound)
# Should fall back to original file
expected = Path("sounds/originals/soundboard/original.mp3")
assert result == expected