Files
sdb2-backend/tests/services/test_player.py

821 lines
34 KiB
Python

"""Tests for player service."""
import asyncio
import threading
import time
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.sound import Sound
from app.models.sound_played import SoundPlayed
from app.models.user import User
from app.services.player import (
PlayerMode,
PlayerService,
PlayerState,
PlayerStatus,
get_player_service,
initialize_player_service,
shutdown_player_service,
)
from app.utils.audio import get_sound_file_path
class TestPlayerState:
"""Test player state data structure."""
def test_init_creates_default_state(self):
"""Test that player state initializes with default values."""
state = PlayerState()
assert state.status == PlayerStatus.STOPPED
assert state.mode == PlayerMode.CONTINUOUS
assert state.volume == 50
assert state.current_sound_id is None
assert state.current_sound_index is None
assert state.current_sound_position == 0
assert state.current_sound_duration == 0
assert state.current_sound is None
assert state.playlist_id is None
assert state.playlist_name == ""
assert state.playlist_length == 0
assert state.playlist_duration == 0
assert state.playlist_sounds == []
def test_to_dict_serializes_correctly(self):
"""Test that player state serializes to dict correctly."""
state = PlayerState()
state.status = PlayerStatus.PLAYING
state.mode = PlayerMode.LOOP
state.volume = 75
state.current_sound_id = 1
state.current_sound_index = 0
state.current_sound_position = 5000
state.current_sound_duration = 30000
state.playlist_id = 1
state.playlist_name = "Test Playlist"
state.playlist_length = 5
state.playlist_duration = 150000
result = state.to_dict()
assert result["status"] == "playing"
assert result["mode"] == "loop"
assert result["volume"] == 75
assert result["current_sound_id"] == 1
assert result["current_sound_index"] == 0
assert result["current_sound_position"] == 5000
assert result["current_sound_duration"] == 30000
assert result["playlist_id"] == 1
assert result["playlist_name"] == "Test Playlist"
assert result["playlist_length"] == 5
assert result["playlist_duration"] == 150000
def test_serialize_sound_with_sound_object(self):
"""Test serializing a sound object."""
state = PlayerState()
sound = Sound(
id=1,
name="Test Song",
filename="test.mp3",
duration=30000,
size=1024,
type="SDB",
thumbnail="test.jpg",
play_count=5,
)
result = state._serialize_sound(sound)
assert result["id"] == 1
assert result["name"] == "Test Song"
assert result["filename"] == "test.mp3"
assert result["duration"] == 30000
assert result["size"] == 1024
assert result["type"] == "SDB"
assert result["thumbnail"] == "test.jpg"
assert result["play_count"] == 5
def test_serialize_sound_with_none(self):
"""Test serializing None sound."""
state = PlayerState()
result = state._serialize_sound(None)
assert result is None
class TestPlayerService:
"""Test player service functionality."""
@pytest.fixture
def mock_db_session_factory(self):
"""Create a mock database session factory."""
session = AsyncMock(spec=AsyncSession)
return lambda: session
@pytest.fixture
def mock_vlc_instance(self):
"""Create a mock VLC instance."""
with patch("app.services.player.vlc") as mock_vlc:
mock_instance = Mock()
mock_player = Mock()
mock_vlc.Instance.return_value = mock_instance
mock_instance.media_player_new.return_value = mock_player
mock_vlc.State = Mock()
mock_vlc.State.Ended = "ended"
yield mock_vlc
@pytest.fixture
def mock_socket_manager(self):
"""Create a mock socket manager."""
with patch("app.services.player.socket_manager") as mock:
mock.broadcast_to_all = AsyncMock()
yield mock
@pytest.fixture
def player_service(self, mock_db_session_factory, mock_vlc_instance, mock_socket_manager):
"""Create a player service instance for testing."""
service = PlayerService(mock_db_session_factory)
return service
def test_init_creates_player_service(self, mock_db_session_factory, mock_vlc_instance):
"""Test that player service initializes correctly."""
with patch("app.services.player.socket_manager"):
service = PlayerService(mock_db_session_factory)
assert service.db_session_factory is mock_db_session_factory
assert isinstance(service.state, PlayerState)
assert service._vlc_instance is not None
assert service._player is not None
assert service._is_running is False
assert service._position_thread is None
assert service._play_time_tracking == {}
assert isinstance(service._lock, type(threading.Lock()))
assert service._background_tasks == set()
assert service._loop is None
@pytest.mark.asyncio
async def test_start_initializes_service(self, player_service, mock_vlc_instance):
"""Test that start method initializes the service."""
with patch.object(player_service, "reload_playlist", new_callable=AsyncMock):
await player_service.start()
assert player_service._is_running is True
assert player_service._loop is not None
assert player_service._position_thread is not None
assert player_service._position_thread.daemon is True
player_service._player.audio_set_volume.assert_called_once_with(50)
@pytest.mark.asyncio
async def test_stop_cleans_up_service(self, player_service):
"""Test that stop method cleans up the service."""
# Setup initial state
player_service._is_running = True
player_service._position_thread = Mock()
player_service._position_thread.is_alive.return_value = True
with patch.object(player_service, "_stop_playback", new_callable=AsyncMock):
await player_service.stop()
assert player_service._is_running is False
player_service._position_thread.join.assert_called_once_with(timeout=2.0)
player_service._player.release.assert_called_once()
@pytest.mark.asyncio
async def test_play_new_track(self, player_service, mock_vlc_instance):
"""Test playing a new track."""
# Setup test sound
sound = Sound(
id=1,
name="Test Song",
filename="test.mp3",
duration=30000,
size=1024,
type="SDB",
)
player_service.state.playlist_sounds = [sound]
with patch("app.services.player.get_sound_file_path") as mock_path:
mock_file_path = Mock(spec=Path)
mock_file_path.exists.return_value = True
mock_path.return_value = mock_file_path
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
mock_media = Mock()
player_service._vlc_instance.media_new.return_value = mock_media
player_service._player.play.return_value = 0 # Success
await player_service.play(0)
assert player_service.state.status == PlayerStatus.PLAYING
assert player_service.state.current_sound == sound
assert player_service.state.current_sound_id == 1
assert player_service.state.current_sound_index == 0
assert 1 in player_service._play_time_tracking
@pytest.mark.asyncio
async def test_play_resume_from_pause(self, player_service):
"""Test resuming playback from pause."""
# Setup paused state
sound = Sound(id=1, name="Test Song", filename="test.mp3", duration=30000)
player_service.state.status = PlayerStatus.PAUSED
player_service.state.current_sound = sound
player_service.state.current_sound_id = 1
player_service.state.current_sound_position = 5000
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
player_service._player.play.return_value = 0 # Success
await player_service.play()
assert player_service.state.status == PlayerStatus.PLAYING
player_service._player.play.assert_called_once()
@pytest.mark.asyncio
async def test_play_invalid_index(self, player_service):
"""Test playing with invalid index raises ValueError."""
player_service.state.playlist_sounds = []
with pytest.raises(ValueError, match="Invalid sound index"):
await player_service.play(0)
@pytest.mark.asyncio
async def test_pause_when_playing(self, player_service):
"""Test pausing when currently playing."""
player_service.state.status = PlayerStatus.PLAYING
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
await player_service.pause()
assert player_service.state.status == PlayerStatus.PAUSED
player_service._player.pause.assert_called_once()
@pytest.mark.asyncio
async def test_pause_when_not_playing(self, player_service):
"""Test pausing when not playing does nothing."""
player_service.state.status = PlayerStatus.STOPPED
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock) as mock_broadcast:
await player_service.pause()
assert player_service.state.status == PlayerStatus.STOPPED
mock_broadcast.assert_not_called()
@pytest.mark.asyncio
async def test_stop_playback(self, player_service):
"""Test stopping playback."""
player_service.state.status = PlayerStatus.PLAYING
player_service.state.current_sound_position = 5000
with patch.object(player_service, "_process_play_count", new_callable=AsyncMock):
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
await player_service.stop_playback()
assert player_service.state.status == PlayerStatus.STOPPED
assert player_service.state.current_sound_position == 0
player_service._player.stop.assert_called_once()
@pytest.mark.asyncio
async def test_next_track(self, player_service):
"""Test skipping to next track."""
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3")
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3")
player_service.state.playlist_sounds = [sound1, sound2]
player_service.state.current_sound_index = 0
with patch.object(player_service, "play", new_callable=AsyncMock) as mock_play:
await player_service.next()
mock_play.assert_called_once_with(1)
@pytest.mark.asyncio
async def test_previous_track(self, player_service):
"""Test going to previous track."""
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3")
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3")
player_service.state.playlist_sounds = [sound1, sound2]
player_service.state.current_sound_index = 1
with patch.object(player_service, "play", new_callable=AsyncMock) as mock_play:
await player_service.previous()
mock_play.assert_called_once_with(0)
@pytest.mark.asyncio
async def test_seek_position(self, player_service):
"""Test seeking to specific position."""
player_service.state.status = PlayerStatus.PLAYING
player_service.state.current_sound_duration = 30000
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
await player_service.seek(15000)
# Position should be 0.5 (50% of track)
player_service._player.set_position.assert_called_once_with(0.5)
assert player_service.state.current_sound_position == 15000
@pytest.mark.asyncio
async def test_seek_when_stopped(self, player_service):
"""Test seeking when stopped does nothing."""
player_service.state.status = PlayerStatus.STOPPED
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock) as mock_broadcast:
await player_service.seek(15000)
player_service._player.set_position.assert_not_called()
mock_broadcast.assert_not_called()
@pytest.mark.asyncio
async def test_set_volume(self, player_service):
"""Test setting volume."""
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
await player_service.set_volume(75)
assert player_service.state.volume == 75
player_service._player.audio_set_volume.assert_called_once_with(75)
@pytest.mark.asyncio
async def test_set_volume_clamping(self, player_service):
"""Test volume clamping to valid range."""
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
# Test upper bound
await player_service.set_volume(150)
assert player_service.state.volume == 100
# Test lower bound
await player_service.set_volume(-10)
assert player_service.state.volume == 0
@pytest.mark.asyncio
async def test_set_mode(self, player_service):
"""Test setting playback mode."""
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
await player_service.set_mode(PlayerMode.LOOP)
assert player_service.state.mode == PlayerMode.LOOP
@pytest.mark.asyncio
async def test_reload_playlist(self, player_service):
"""Test reloading playlist from database."""
mock_session = AsyncMock()
player_service.db_session_factory = lambda: mock_session
# Mock playlist repository
with patch("app.services.player.PlaylistRepository") as mock_repo_class:
mock_repo = AsyncMock()
mock_repo_class.return_value = mock_repo
# Mock playlist data
mock_playlist = Mock()
mock_playlist.id = 1
mock_playlist.name = "Test Playlist"
mock_repo.get_main_playlist.return_value = mock_playlist
# Mock sounds
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
mock_sounds = [sound1, sound2]
mock_repo.get_playlist_sounds.return_value = mock_sounds
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
await player_service.reload_playlist()
assert player_service.state.playlist_id == 1
assert player_service.state.playlist_name == "Test Playlist"
assert player_service.state.playlist_sounds == mock_sounds
assert player_service.state.playlist_length == 2
assert player_service.state.playlist_duration == 75000
@pytest.mark.asyncio
async def test_handle_playlist_id_changed(self, player_service):
"""Test handling when playlist ID changes."""
# Setup initial state
player_service.state.status = PlayerStatus.PLAYING
player_service.state.current_sound_id = 1
player_service.state.current_sound_index = 0
# Create test sounds
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
sounds = [sound1, sound2]
with patch.object(player_service, "_stop_playback", new_callable=AsyncMock) as mock_stop:
await player_service._handle_playlist_id_changed(1, 2, sounds)
# Should stop playback and set first track as current
mock_stop.assert_called_once()
assert player_service.state.current_sound_index == 0
assert player_service.state.current_sound == sound1
assert player_service.state.current_sound_id == 1
@pytest.mark.asyncio
async def test_handle_playlist_id_changed_empty_playlist(self, player_service):
"""Test handling playlist ID change with empty playlist."""
player_service.state.status = PlayerStatus.PLAYING
with patch.object(player_service, "_stop_playback", new_callable=AsyncMock) as mock_stop:
await player_service._handle_playlist_id_changed(1, 2, [])
mock_stop.assert_called_once()
assert player_service.state.current_sound_index is None
assert player_service.state.current_sound is None
assert player_service.state.current_sound_id is None
@pytest.mark.asyncio
async def test_handle_same_playlist_track_exists_same_index(self, player_service):
"""Test handling same playlist when track exists at same index."""
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
sounds = [sound1, sound2]
await player_service._handle_same_playlist_track_check(1, 0, sounds)
# Should update sound object reference but keep same index
assert player_service.state.current_sound_index == 0 # Should be set to 0 from new_index
assert player_service.state.current_sound == sound1
@pytest.mark.asyncio
async def test_handle_same_playlist_track_exists_different_index(self, player_service):
"""Test handling same playlist when track exists at different index."""
sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
sound2 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
sounds = [sound1, sound2] # Track with ID 1 is now at index 1
await player_service._handle_same_playlist_track_check(1, 0, sounds)
# Should update index and sound reference
assert player_service.state.current_sound_index == 1
assert player_service.state.current_sound == sound2
@pytest.mark.asyncio
async def test_handle_same_playlist_track_not_found(self, player_service):
"""Test handling same playlist when current track no longer exists."""
player_service.state.status = PlayerStatus.PLAYING
sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
sound2 = Sound(id=3, name="Song 3", filename="song3.mp3", duration=60000)
sounds = [sound1, sound2] # Track with ID 1 is missing
with patch.object(player_service, "_handle_track_removed", new_callable=AsyncMock) as mock_removed:
await player_service._handle_same_playlist_track_check(1, 0, sounds)
mock_removed.assert_called_once_with(1, sounds)
@pytest.mark.asyncio
async def test_handle_track_removed_with_sounds(self, player_service):
"""Test handling when current track is removed with sounds available."""
player_service.state.status = PlayerStatus.PLAYING
sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
sounds = [sound1]
with patch.object(player_service, "_stop_playback", new_callable=AsyncMock) as mock_stop:
await player_service._handle_track_removed(1, sounds)
mock_stop.assert_called_once()
assert player_service.state.current_sound_index == 0
assert player_service.state.current_sound == sound1
assert player_service.state.current_sound_id == 2
@pytest.mark.asyncio
async def test_handle_track_removed_empty_playlist(self, player_service):
"""Test handling when current track is removed with empty playlist."""
player_service.state.status = PlayerStatus.PLAYING
with patch.object(player_service, "_stop_playback", new_callable=AsyncMock) as mock_stop:
await player_service._handle_track_removed(1, [])
mock_stop.assert_called_once()
assert player_service.state.current_sound_index is None
assert player_service.state.current_sound is None
assert player_service.state.current_sound_id is None
def test_update_playlist_state(self, player_service):
"""Test updating playlist state information."""
mock_playlist = Mock()
mock_playlist.id = 5
mock_playlist.name = "New Playlist"
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
sounds = [sound1, sound2]
player_service._update_playlist_state(mock_playlist, sounds)
assert player_service.state.playlist_id == 5
assert player_service.state.playlist_name == "New Playlist"
assert player_service.state.playlist_sounds == sounds
assert player_service.state.playlist_length == 2
assert player_service.state.playlist_duration == 75000
def test_find_sound_index_found(self, player_service):
"""Test finding sound index when sound exists."""
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
sounds = [sound1, sound2]
index = player_service._find_sound_index(2, sounds)
assert index == 1
def test_find_sound_index_not_found(self, player_service):
"""Test finding sound index when sound doesn't exist."""
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
sounds = [sound1]
index = player_service._find_sound_index(999, sounds)
assert index is None
def test_set_first_track_as_current(self, player_service):
"""Test setting first track as current."""
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
sounds = [sound1, sound2]
player_service._set_first_track_as_current(sounds)
assert player_service.state.current_sound_index == 0
assert player_service.state.current_sound == sound1
assert player_service.state.current_sound_id == 1
def test_clear_current_track(self, player_service):
"""Test clearing current track state."""
# Set some initial state
player_service.state.current_sound_index = 2
player_service.state.current_sound = Mock()
player_service.state.current_sound_id = 5
player_service._clear_current_track()
assert player_service.state.current_sound_index is None
assert player_service.state.current_sound is None
assert player_service.state.current_sound_id is None
@pytest.mark.asyncio
async def test_reload_playlist_different_id_scenario(self, player_service):
"""Test complete reload scenario when playlist ID changes."""
# Setup current state
player_service.state.playlist_id = 1
player_service.state.current_sound_id = 1
player_service.state.current_sound_index = 0
player_service.state.status = PlayerStatus.PLAYING
mock_session = AsyncMock()
player_service.db_session_factory = lambda: mock_session
with patch("app.services.player.PlaylistRepository") as mock_repo_class:
mock_repo = AsyncMock()
mock_repo_class.return_value = mock_repo
# Mock new playlist with different ID
mock_playlist = Mock()
mock_playlist.id = 2 # Different ID
mock_playlist.name = "New Playlist"
mock_repo.get_main_playlist.return_value = mock_playlist
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
mock_sounds = [sound1]
mock_repo.get_playlist_sounds.return_value = mock_sounds
with patch.object(player_service, "_stop_playback", new_callable=AsyncMock) as mock_stop:
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
await player_service.reload_playlist()
# Should stop and reset to first track
mock_stop.assert_called_once()
assert player_service.state.playlist_id == 2
assert player_service.state.current_sound_index == 0
assert player_service.state.current_sound_id == 1
@pytest.mark.asyncio
async def test_reload_playlist_same_id_track_moved(self, player_service):
"""Test reload when playlist ID same but track moved to different index."""
# Setup current state
player_service.state.playlist_id = 1
player_service.state.current_sound_id = 2 # Currently playing track 2
player_service.state.current_sound_index = 1 # At index 1
mock_session = AsyncMock()
player_service.db_session_factory = lambda: mock_session
with patch("app.services.player.PlaylistRepository") as mock_repo_class:
mock_repo = AsyncMock()
mock_repo_class.return_value = mock_repo
# Same playlist ID
mock_playlist = Mock()
mock_playlist.id = 1
mock_playlist.name = "Same Playlist"
mock_repo.get_main_playlist.return_value = mock_playlist
# Track 2 moved to index 0
sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
sound2 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
mock_sounds = [sound1, sound2] # Track 2 now at index 0
mock_repo.get_playlist_sounds.return_value = mock_sounds
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
await player_service.reload_playlist()
# Should update index but keep same track
assert player_service.state.playlist_id == 1
assert player_service.state.current_sound_index == 0 # Updated index
assert player_service.state.current_sound_id == 2 # Same track
assert player_service.state.current_sound == sound1
def test_get_next_index_continuous_mode(self, player_service):
"""Test getting next index in continuous mode."""
player_service.state.mode = PlayerMode.CONTINUOUS
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
# Test normal progression
assert player_service._get_next_index(0) == 1
assert player_service._get_next_index(1) == 2
# Test end of playlist
assert player_service._get_next_index(2) is None
def test_get_next_index_loop_mode(self, player_service):
"""Test getting next index in loop mode."""
player_service.state.mode = PlayerMode.LOOP
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
# Test normal progression
assert player_service._get_next_index(0) == 1
assert player_service._get_next_index(1) == 2
# Test wrapping to beginning
assert player_service._get_next_index(2) == 0
def test_get_next_index_loop_one_mode(self, player_service):
"""Test getting next index in loop one mode."""
player_service.state.mode = PlayerMode.LOOP_ONE
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
# Should always return same index
assert player_service._get_next_index(0) == 0
assert player_service._get_next_index(1) == 1
assert player_service._get_next_index(2) == 2
def test_get_next_index_single_mode(self, player_service):
"""Test getting next index in single mode."""
player_service.state.mode = PlayerMode.SINGLE
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
# Should always return None
assert player_service._get_next_index(0) is None
assert player_service._get_next_index(1) is None
assert player_service._get_next_index(2) is None
def test_get_next_index_random_mode(self, player_service):
"""Test getting next index in random mode."""
player_service.state.mode = PlayerMode.RANDOM
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
with patch("random.choice") as mock_choice:
mock_choice.return_value = 2
result = player_service._get_next_index(0)
assert result == 2
# Should exclude current index
mock_choice.assert_called_once_with([1, 2])
def test_get_previous_index(self, player_service):
"""Test getting previous index."""
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
# Test normal progression
assert player_service._get_previous_index(2) == 1
assert player_service._get_previous_index(1) == 0
# Test beginning without loop
player_service.state.mode = PlayerMode.CONTINUOUS
assert player_service._get_previous_index(0) is None
# Test beginning with loop
player_service.state.mode = PlayerMode.LOOP
assert player_service._get_previous_index(0) == 2
def test_update_play_time(self, player_service):
"""Test updating play time tracking."""
# Setup state
player_service.state.status = PlayerStatus.PLAYING
player_service.state.current_sound_id = 1
player_service.state.current_sound_position = 5000
player_service.state.current_sound_duration = 30000
# Initialize tracking
current_time = time.time()
player_service._play_time_tracking[1] = {
"total_time": 1000,
"last_position": 4000,
"last_update": current_time - 1.0, # 1 second ago
"threshold_reached": False,
}
with patch("app.services.player.time.time", return_value=current_time):
player_service._update_play_time()
tracking = player_service._play_time_tracking[1]
assert tracking["total_time"] == 2000 # Added 1 second (1000ms)
assert tracking["last_position"] == 5000
assert tracking["last_update"] == current_time
@pytest.mark.asyncio
async def test_record_play_count(self, player_service):
"""Test recording play count for a sound."""
mock_session = AsyncMock()
player_service.db_session_factory = lambda: mock_session
# Mock repositories
with patch("app.services.player.SoundRepository") as mock_sound_repo_class:
mock_sound_repo = AsyncMock()
mock_sound_repo_class.return_value = mock_sound_repo
# Mock sound
mock_sound = Mock()
mock_sound.play_count = 5
mock_sound_repo.get_by_id.return_value = mock_sound
await player_service._record_play_count(1)
# Verify sound play count was updated
mock_sound_repo.update.assert_called_once_with(
mock_sound, {"play_count": 6}
)
# Verify SoundPlayed record was created with None user_id for player
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
def test_get_state(self, player_service):
"""Test getting current player state."""
result = player_service.get_state()
assert isinstance(result, dict)
assert "status" in result
assert "mode" in result
assert "volume" in result
def test_uses_shared_sound_path_utility(self, player_service):
"""Test that player service uses the shared sound path utility."""
sound = Sound(
id=1,
name="Test Song",
filename="test.mp3",
type="SDB",
is_normalized=False,
)
player_service.state.playlist_sounds = [sound]
with patch("app.services.player.get_sound_file_path") as mock_path:
mock_file_path = Mock(spec=Path)
mock_file_path.exists.return_value = False # File doesn't exist
mock_path.return_value = mock_file_path
# This should fail because file doesn't exist
result = asyncio.run(player_service.play(0))
# Verify the utility was called
mock_path.assert_called_once_with(sound)
class TestPlayerServiceGlobalFunctions:
"""Test global player service functions."""
@pytest.mark.asyncio
async def test_initialize_player_service(self):
"""Test initializing global player service."""
mock_factory = Mock()
with patch("app.services.player.PlayerService") as mock_service_class:
mock_service = AsyncMock()
mock_service_class.return_value = mock_service
await initialize_player_service(mock_factory)
mock_service_class.assert_called_once_with(mock_factory)
mock_service.start.assert_called_once()
@pytest.mark.asyncio
async def test_shutdown_player_service(self):
"""Test shutting down global player service."""
# Mock global player service exists
with patch("app.services.player.player_service") as mock_global:
mock_service = AsyncMock()
mock_global.__bool__ = lambda x: True # Service exists
type(mock_global).__bool__ = lambda x: True
with patch("app.services.player.player_service", mock_service):
await shutdown_player_service()
mock_service.stop.assert_called_once()
def test_get_player_service_success(self):
"""Test getting player service when initialized."""
mock_service = Mock()
with patch("app.services.player.player_service", mock_service):
result = get_player_service()
assert result is mock_service
def test_get_player_service_not_initialized(self):
"""Test getting player service when not initialized."""
with patch("app.services.player.player_service", None):
with pytest.raises(RuntimeError, match="Player service not initialized"):
get_player_service()