963 lines
38 KiB
Python
963 lines
38 KiB
Python
"""Tests for player service."""
|
|
|
|
import asyncio
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, Mock, patch
|
|
|
|
import pytest
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from app.models.sound import Sound
|
|
from app.services.player import (
|
|
PlayerMode,
|
|
PlayerService,
|
|
PlayerState,
|
|
PlayerStatus,
|
|
get_player_service,
|
|
initialize_player_service,
|
|
shutdown_player_service,
|
|
)
|
|
|
|
|
|
class TestPlayerState:
|
|
"""Test player state data structure."""
|
|
|
|
def test_init_creates_default_state(self) -> None:
|
|
"""Test that player state initializes with default values."""
|
|
# Mock volume service to return a specific volume
|
|
with patch("app.services.player.volume_service") as mock_volume_service:
|
|
mock_volume_service.get_volume.return_value = 80
|
|
|
|
state = PlayerState()
|
|
|
|
assert state.status == PlayerStatus.STOPPED
|
|
assert state.mode == PlayerMode.CONTINUOUS
|
|
assert state.volume == 80
|
|
assert state.previous_volume == 80
|
|
mock_volume_service.get_volume.assert_called_once()
|
|
assert state.current_sound_id is None
|
|
assert state.current_sound_index is None
|
|
assert state.current_sound_position == 0
|
|
assert state.current_sound_duration == 0
|
|
assert state.current_sound is None
|
|
assert state.playlist_id is None
|
|
assert state.playlist_name == ""
|
|
assert state.playlist_length == 0
|
|
assert state.playlist_duration == 0
|
|
assert state.playlist_sounds == []
|
|
|
|
def test_to_dict_serializes_correctly(self) -> None:
|
|
"""Test that player state serializes to dict correctly."""
|
|
state = PlayerState()
|
|
state.status = PlayerStatus.PLAYING
|
|
state.mode = PlayerMode.LOOP
|
|
state.volume = 75
|
|
state.current_sound_id = 1
|
|
state.current_sound_index = 0
|
|
state.current_sound_position = 5000
|
|
state.current_sound_duration = 30000
|
|
state.playlist_id = 1
|
|
state.playlist_name = "Test Playlist"
|
|
state.playlist_length = 5
|
|
state.playlist_duration = 150000
|
|
|
|
result = state.to_dict()
|
|
|
|
assert result["status"] == "playing"
|
|
assert result["mode"] == "loop"
|
|
assert result["volume"] == 75
|
|
assert result["position"] == 5000
|
|
assert result["duration"] == 30000
|
|
assert result["index"] == 0
|
|
assert result["playlist"]["id"] == 1
|
|
assert result["playlist"]["name"] == "Test Playlist"
|
|
assert result["playlist"]["length"] == 5
|
|
assert result["playlist"]["duration"] == 150000
|
|
|
|
def test_serialize_sound_with_sound_object(self) -> None:
|
|
"""Test serializing a sound object."""
|
|
state = PlayerState()
|
|
sound = Sound(
|
|
id=1,
|
|
name="Test Song",
|
|
filename="test.mp3",
|
|
duration=30000,
|
|
size=1024,
|
|
type="SDB",
|
|
thumbnail="test.jpg",
|
|
play_count=5,
|
|
)
|
|
|
|
result = state._serialize_sound(sound)
|
|
|
|
assert result["id"] == 1
|
|
assert result["name"] == "Test Song"
|
|
assert result["filename"] == "test.mp3"
|
|
assert result["duration"] == 30000
|
|
assert result["size"] == 1024
|
|
assert result["type"] == "SDB"
|
|
assert result["thumbnail"] == "test.jpg"
|
|
assert result["play_count"] == 5
|
|
assert result["extract_url"] is None
|
|
|
|
def test_serialize_sound_with_extraction_url(self) -> None:
|
|
"""Test serializing a sound object with extraction URL."""
|
|
from app.models.extraction import Extraction
|
|
|
|
state = PlayerState()
|
|
sound = Sound(
|
|
id=1,
|
|
name="Test Song",
|
|
filename="test.mp3",
|
|
duration=30000,
|
|
size=1024,
|
|
type="EXT",
|
|
thumbnail="test.jpg",
|
|
play_count=5,
|
|
)
|
|
|
|
# Mock extraction relationship
|
|
extraction = Extraction(
|
|
id=1,
|
|
url="https://www.youtube.com/watch?v=test",
|
|
service="youtube",
|
|
service_id="test",
|
|
title="Test Song",
|
|
status="completed",
|
|
user_id=1,
|
|
sound_id=1,
|
|
)
|
|
sound.extractions = [extraction]
|
|
|
|
result = state._serialize_sound(sound)
|
|
|
|
assert result["id"] == 1
|
|
assert result["name"] == "Test Song"
|
|
assert result["filename"] == "test.mp3"
|
|
assert result["duration"] == 30000
|
|
assert result["size"] == 1024
|
|
assert result["type"] == "EXT"
|
|
assert result["thumbnail"] == "test.jpg"
|
|
assert result["play_count"] == 5
|
|
assert result["extract_url"] == "https://www.youtube.com/watch?v=test"
|
|
|
|
def test_serialize_sound_with_none(self) -> None:
|
|
"""Test serializing None sound."""
|
|
state = PlayerState()
|
|
result = state._serialize_sound(None)
|
|
assert result is None
|
|
|
|
|
|
class TestPlayerService:
|
|
"""Test player service functionality."""
|
|
|
|
@pytest.fixture
|
|
def mock_db_session_factory(self):
|
|
"""Create a mock database session factory."""
|
|
session = AsyncMock(spec=AsyncSession)
|
|
return lambda: session
|
|
|
|
@pytest.fixture
|
|
def mock_vlc_instance(self):
|
|
"""Create a mock VLC instance."""
|
|
with patch("app.services.player.vlc") as mock_vlc:
|
|
mock_instance = Mock()
|
|
mock_player = Mock()
|
|
mock_vlc.Instance.return_value = mock_instance
|
|
mock_instance.media_player_new.return_value = mock_player
|
|
mock_vlc.State = Mock()
|
|
mock_vlc.State.Ended = "ended"
|
|
yield mock_vlc
|
|
|
|
@pytest.fixture
|
|
def mock_socket_manager(self):
|
|
"""Create a mock socket manager."""
|
|
with patch("app.services.player.socket_manager") as mock:
|
|
mock.broadcast_to_all = AsyncMock()
|
|
yield mock
|
|
|
|
@pytest.fixture
|
|
def player_service(
|
|
self,
|
|
mock_db_session_factory,
|
|
mock_vlc_instance,
|
|
mock_socket_manager,
|
|
):
|
|
"""Create a player service instance for testing."""
|
|
with patch("app.services.player.volume_service") as mock_volume_service:
|
|
mock_volume_service.get_volume.return_value = 80
|
|
return PlayerService(mock_db_session_factory)
|
|
|
|
def test_init_creates_player_service(
|
|
self,
|
|
mock_db_session_factory,
|
|
mock_vlc_instance,
|
|
) -> None:
|
|
"""Test that player service initializes correctly."""
|
|
with patch("app.services.player.socket_manager"):
|
|
service = PlayerService(mock_db_session_factory)
|
|
|
|
assert service.db_session_factory is mock_db_session_factory
|
|
assert isinstance(service.state, PlayerState)
|
|
assert service._vlc_instance is not None
|
|
assert service._player is not None
|
|
assert service._is_running is False
|
|
assert service._position_thread is None
|
|
assert service._play_time_tracking == {}
|
|
assert isinstance(service._lock, type(threading.Lock()))
|
|
assert service._background_tasks == set()
|
|
assert service._loop is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_initializes_service(
|
|
self,
|
|
player_service,
|
|
mock_vlc_instance,
|
|
) -> None:
|
|
"""Test that start method initializes the service."""
|
|
with patch.object(player_service, "reload_playlist", new_callable=AsyncMock):
|
|
await player_service.start()
|
|
|
|
assert player_service._is_running is True
|
|
assert player_service._loop is not None
|
|
assert player_service._position_thread is not None
|
|
assert player_service._position_thread.daemon is True
|
|
# VLC is now always set to 100% volume
|
|
player_service._player.audio_set_volume.assert_called_once_with(100)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_cleans_up_service(self, player_service) -> None:
|
|
"""Test that stop method cleans up the service."""
|
|
# Setup initial state
|
|
player_service._is_running = True
|
|
player_service._position_thread = Mock()
|
|
player_service._position_thread.is_alive.return_value = True
|
|
|
|
with patch.object(player_service, "_stop_playback", new_callable=AsyncMock):
|
|
await player_service.stop()
|
|
|
|
assert player_service._is_running is False
|
|
player_service._position_thread.join.assert_called_once_with(timeout=2.0)
|
|
player_service._player.release.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_play_new_track(self, player_service, mock_vlc_instance) -> None:
|
|
"""Test playing a new track."""
|
|
# Setup test sound
|
|
sound = Sound(
|
|
id=1,
|
|
name="Test Song",
|
|
filename="test.mp3",
|
|
duration=30000,
|
|
size=1024,
|
|
type="SDB",
|
|
)
|
|
player_service.state.playlist_sounds = [sound]
|
|
|
|
with patch("app.services.player.get_sound_file_path") as mock_path:
|
|
mock_file_path = Mock(spec=Path)
|
|
mock_file_path.exists.return_value = True
|
|
mock_path.return_value = mock_file_path
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_broadcast_state",
|
|
new_callable=AsyncMock,
|
|
):
|
|
mock_media = Mock()
|
|
player_service._vlc_instance.media_new.return_value = mock_media
|
|
player_service._player.play.return_value = 0 # Success
|
|
|
|
await player_service.play(0)
|
|
|
|
assert player_service.state.status == PlayerStatus.PLAYING
|
|
assert player_service.state.current_sound == sound
|
|
assert player_service.state.current_sound_id == 1
|
|
assert player_service.state.current_sound_index == 0
|
|
assert 1 in player_service._play_time_tracking
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_play_resume_from_pause(self, player_service) -> None:
|
|
"""Test resuming playback from pause."""
|
|
# Setup paused state
|
|
sound = Sound(id=1, name="Test Song", filename="test.mp3", duration=30000)
|
|
player_service.state.status = PlayerStatus.PAUSED
|
|
player_service.state.current_sound = sound
|
|
player_service.state.current_sound_id = 1
|
|
player_service.state.current_sound_position = 5000
|
|
|
|
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
|
|
player_service._player.play.return_value = 0 # Success
|
|
|
|
await player_service.play()
|
|
|
|
assert player_service.state.status == PlayerStatus.PLAYING
|
|
player_service._player.play.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_play_invalid_index(self, player_service) -> None:
|
|
"""Test playing with invalid index raises ValueError."""
|
|
player_service.state.playlist_sounds = []
|
|
|
|
with pytest.raises(ValueError, match="Invalid sound index"):
|
|
await player_service.play(0)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pause_when_playing(self, player_service) -> None:
|
|
"""Test pausing when currently playing."""
|
|
player_service.state.status = PlayerStatus.PLAYING
|
|
|
|
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
|
|
await player_service.pause()
|
|
|
|
assert player_service.state.status == PlayerStatus.PAUSED
|
|
player_service._player.pause.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pause_when_not_playing(self, player_service) -> None:
|
|
"""Test pausing when not playing does nothing."""
|
|
player_service.state.status = PlayerStatus.STOPPED
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_broadcast_state",
|
|
new_callable=AsyncMock,
|
|
) as mock_broadcast:
|
|
await player_service.pause()
|
|
|
|
assert player_service.state.status == PlayerStatus.STOPPED
|
|
mock_broadcast.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_playback(self, player_service) -> None:
|
|
"""Test stopping playback."""
|
|
player_service.state.status = PlayerStatus.PLAYING
|
|
player_service.state.current_sound_position = 5000
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_process_play_count",
|
|
new_callable=AsyncMock,
|
|
):
|
|
with patch.object(
|
|
player_service,
|
|
"_broadcast_state",
|
|
new_callable=AsyncMock,
|
|
):
|
|
await player_service.stop_playback()
|
|
|
|
assert player_service.state.status == PlayerStatus.STOPPED
|
|
assert player_service.state.current_sound_position == 0
|
|
player_service._player.stop.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_next_track(self, player_service) -> None:
|
|
"""Test skipping to next track."""
|
|
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3")
|
|
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3")
|
|
player_service.state.playlist_sounds = [sound1, sound2]
|
|
player_service.state.current_sound_index = 0
|
|
|
|
with patch.object(player_service, "play", new_callable=AsyncMock) as mock_play:
|
|
await player_service.next()
|
|
mock_play.assert_called_once_with(1)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_previous_track(self, player_service) -> None:
|
|
"""Test going to previous track."""
|
|
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3")
|
|
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3")
|
|
player_service.state.playlist_sounds = [sound1, sound2]
|
|
player_service.state.current_sound_index = 1
|
|
|
|
with patch.object(player_service, "play", new_callable=AsyncMock) as mock_play:
|
|
await player_service.previous()
|
|
mock_play.assert_called_once_with(0)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_seek_position(self, player_service) -> None:
|
|
"""Test seeking to specific position."""
|
|
player_service.state.status = PlayerStatus.PLAYING
|
|
player_service.state.current_sound_duration = 30000
|
|
|
|
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
|
|
await player_service.seek(15000)
|
|
|
|
# Position should be 0.5 (50% of track)
|
|
player_service._player.set_position.assert_called_once_with(0.5)
|
|
assert player_service.state.current_sound_position == 15000
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_seek_when_stopped(self, player_service) -> None:
|
|
"""Test seeking when stopped does nothing."""
|
|
player_service.state.status = PlayerStatus.STOPPED
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_broadcast_state",
|
|
new_callable=AsyncMock,
|
|
) as mock_broadcast:
|
|
await player_service.seek(15000)
|
|
|
|
player_service._player.set_position.assert_not_called()
|
|
mock_broadcast.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_volume(self, player_service) -> None:
|
|
"""Test setting volume."""
|
|
with patch("app.services.player.volume_service") as mock_volume_service:
|
|
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
|
|
mock_volume_service.is_muted.return_value = False
|
|
|
|
await player_service.set_volume(75)
|
|
|
|
assert player_service.state.volume == 75
|
|
# VLC volume is always set to 100%, host volume is controlled separately
|
|
player_service._player.audio_set_volume.assert_called_once_with(100)
|
|
# Verify host volume was set
|
|
mock_volume_service.set_volume.assert_called_once_with(75)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_volume_clamping(self, player_service) -> None:
|
|
"""Test volume clamping to valid range."""
|
|
with patch("app.services.player.volume_service") as mock_volume_service:
|
|
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
|
|
mock_volume_service.is_muted.return_value = False
|
|
|
|
# Test upper bound
|
|
await player_service.set_volume(150)
|
|
assert player_service.state.volume == 100
|
|
|
|
# Test lower bound
|
|
await player_service.set_volume(-10)
|
|
assert player_service.state.volume == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_mode(self, player_service) -> None:
|
|
"""Test setting playback mode."""
|
|
with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock):
|
|
await player_service.set_mode(PlayerMode.LOOP)
|
|
|
|
assert player_service.state.mode == PlayerMode.LOOP
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reload_playlist(self, player_service) -> None:
|
|
"""Test reloading playlist from database."""
|
|
mock_session = AsyncMock()
|
|
player_service.db_session_factory = lambda: mock_session
|
|
|
|
# Mock playlist repository
|
|
with patch("app.services.player.PlaylistRepository") as mock_repo_class:
|
|
mock_repo = AsyncMock()
|
|
mock_repo_class.return_value = mock_repo
|
|
|
|
# Mock playlist data
|
|
mock_playlist = Mock()
|
|
mock_playlist.id = 1
|
|
mock_playlist.name = "Test Playlist"
|
|
mock_repo.get_current_playlist.return_value = (
|
|
mock_playlist # Return current playlist directly
|
|
)
|
|
|
|
# Mock sounds
|
|
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
|
|
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
|
|
mock_sounds = [sound1, sound2]
|
|
mock_repo.get_playlist_sounds.return_value = mock_sounds
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_broadcast_state",
|
|
new_callable=AsyncMock,
|
|
):
|
|
await player_service.reload_playlist()
|
|
|
|
assert player_service.state.playlist_id == 1
|
|
assert player_service.state.playlist_name == "Test Playlist"
|
|
assert player_service.state.playlist_sounds == mock_sounds
|
|
assert player_service.state.playlist_length == 2
|
|
assert player_service.state.playlist_duration == 75000
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_playlist_id_changed(self, player_service) -> None:
|
|
"""Test handling when playlist ID changes."""
|
|
# Setup initial state
|
|
player_service.state.status = PlayerStatus.PLAYING
|
|
player_service.state.current_sound_id = 1
|
|
player_service.state.current_sound_index = 0
|
|
|
|
# Create test sounds
|
|
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
|
|
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
|
|
sounds = [sound1, sound2]
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_stop_playback",
|
|
new_callable=AsyncMock,
|
|
) as mock_stop:
|
|
await player_service._handle_playlist_id_changed(1, 2, sounds)
|
|
|
|
# Should stop playback and set first track as current
|
|
mock_stop.assert_called_once()
|
|
assert player_service.state.current_sound_index == 0
|
|
assert player_service.state.current_sound == sound1
|
|
assert player_service.state.current_sound_id == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_playlist_id_changed_empty_playlist(
|
|
self,
|
|
player_service,
|
|
) -> None:
|
|
"""Test handling playlist ID change with empty playlist."""
|
|
player_service.state.status = PlayerStatus.PLAYING
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_stop_playback",
|
|
new_callable=AsyncMock,
|
|
) as mock_stop:
|
|
await player_service._handle_playlist_id_changed(1, 2, [])
|
|
|
|
mock_stop.assert_called_once()
|
|
assert player_service.state.current_sound_index is None
|
|
assert player_service.state.current_sound is None
|
|
assert player_service.state.current_sound_id is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_same_playlist_track_exists_same_index(
|
|
self,
|
|
player_service,
|
|
) -> None:
|
|
"""Test handling same playlist when track exists at same index."""
|
|
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
|
|
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
|
|
sounds = [sound1, sound2]
|
|
|
|
await player_service._handle_same_playlist_track_check(1, 0, sounds)
|
|
|
|
# Should update sound object reference but keep same index
|
|
assert (
|
|
player_service.state.current_sound_index == 0
|
|
) # Should be set to 0 from new_index
|
|
assert player_service.state.current_sound == sound1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_same_playlist_track_exists_different_index(
|
|
self,
|
|
player_service,
|
|
) -> None:
|
|
"""Test handling same playlist when track exists at different index."""
|
|
sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
|
|
sound2 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
|
|
sounds = [sound1, sound2] # Track with ID 1 is now at index 1
|
|
|
|
await player_service._handle_same_playlist_track_check(1, 0, sounds)
|
|
|
|
# Should update index and sound reference
|
|
assert player_service.state.current_sound_index == 1
|
|
assert player_service.state.current_sound == sound2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_same_playlist_track_not_found(self, player_service) -> None:
|
|
"""Test handling same playlist when current track no longer exists."""
|
|
player_service.state.status = PlayerStatus.PLAYING
|
|
sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
|
|
sound2 = Sound(id=3, name="Song 3", filename="song3.mp3", duration=60000)
|
|
sounds = [sound1, sound2] # Track with ID 1 is missing
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_handle_track_removed",
|
|
new_callable=AsyncMock,
|
|
) as mock_removed:
|
|
await player_service._handle_same_playlist_track_check(1, 0, sounds)
|
|
mock_removed.assert_called_once_with(1, sounds)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_track_removed_with_sounds(self, player_service) -> None:
|
|
"""Test handling when current track is removed with sounds available."""
|
|
player_service.state.status = PlayerStatus.PLAYING
|
|
sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
|
|
sounds = [sound1]
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_stop_playback",
|
|
new_callable=AsyncMock,
|
|
) as mock_stop:
|
|
await player_service._handle_track_removed(1, sounds)
|
|
|
|
mock_stop.assert_called_once()
|
|
assert player_service.state.current_sound_index == 0
|
|
assert player_service.state.current_sound == sound1
|
|
assert player_service.state.current_sound_id == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_track_removed_empty_playlist(self, player_service) -> None:
|
|
"""Test handling when current track is removed with empty playlist."""
|
|
player_service.state.status = PlayerStatus.PLAYING
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_stop_playback",
|
|
new_callable=AsyncMock,
|
|
) as mock_stop:
|
|
await player_service._handle_track_removed(1, [])
|
|
|
|
mock_stop.assert_called_once()
|
|
assert player_service.state.current_sound_index is None
|
|
assert player_service.state.current_sound is None
|
|
assert player_service.state.current_sound_id is None
|
|
|
|
def test_update_playlist_state(self, player_service) -> None:
|
|
"""Test updating playlist state information."""
|
|
mock_playlist = Mock()
|
|
mock_playlist.id = 5
|
|
mock_playlist.name = "New Playlist"
|
|
|
|
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
|
|
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
|
|
sounds = [sound1, sound2]
|
|
|
|
player_service._update_playlist_state(mock_playlist, sounds)
|
|
|
|
assert player_service.state.playlist_id == 5
|
|
assert player_service.state.playlist_name == "New Playlist"
|
|
assert player_service.state.playlist_sounds == sounds
|
|
assert player_service.state.playlist_length == 2
|
|
assert player_service.state.playlist_duration == 75000
|
|
|
|
def test_find_sound_index_found(self, player_service) -> None:
|
|
"""Test finding sound index when sound exists."""
|
|
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
|
|
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
|
|
sounds = [sound1, sound2]
|
|
|
|
index = player_service._find_sound_index(2, sounds)
|
|
assert index == 1
|
|
|
|
def test_find_sound_index_not_found(self, player_service) -> None:
|
|
"""Test finding sound index when sound doesn't exist."""
|
|
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
|
|
sounds = [sound1]
|
|
|
|
index = player_service._find_sound_index(999, sounds)
|
|
assert index is None
|
|
|
|
def test_set_first_track_as_current(self, player_service) -> None:
|
|
"""Test setting first track as current."""
|
|
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
|
|
sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
|
|
sounds = [sound1, sound2]
|
|
|
|
player_service._set_first_track_as_current(sounds)
|
|
|
|
assert player_service.state.current_sound_index == 0
|
|
assert player_service.state.current_sound == sound1
|
|
assert player_service.state.current_sound_id == 1
|
|
|
|
def test_clear_current_track(self, player_service) -> None:
|
|
"""Test clearing current track state."""
|
|
# Set some initial state
|
|
player_service.state.current_sound_index = 2
|
|
player_service.state.current_sound = Mock()
|
|
player_service.state.current_sound_id = 5
|
|
|
|
player_service._clear_current_track()
|
|
|
|
assert player_service.state.current_sound_index is None
|
|
assert player_service.state.current_sound is None
|
|
assert player_service.state.current_sound_id is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reload_playlist_different_id_scenario(self, player_service) -> None:
|
|
"""Test complete reload scenario when playlist ID changes."""
|
|
# Setup current state
|
|
player_service.state.playlist_id = 1
|
|
player_service.state.current_sound_id = 1
|
|
player_service.state.current_sound_index = 0
|
|
player_service.state.status = PlayerStatus.PLAYING
|
|
|
|
mock_session = AsyncMock()
|
|
player_service.db_session_factory = lambda: mock_session
|
|
|
|
with patch("app.services.player.PlaylistRepository") as mock_repo_class:
|
|
mock_repo = AsyncMock()
|
|
mock_repo_class.return_value = mock_repo
|
|
|
|
# Mock new playlist with different ID
|
|
mock_playlist = Mock()
|
|
mock_playlist.id = 2 # Different ID
|
|
mock_playlist.name = "New Playlist"
|
|
mock_repo.get_current_playlist.return_value = (
|
|
mock_playlist # Return current playlist directly
|
|
)
|
|
|
|
sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
|
|
mock_sounds = [sound1]
|
|
mock_repo.get_playlist_sounds.return_value = mock_sounds
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_stop_playback",
|
|
new_callable=AsyncMock,
|
|
) as mock_stop:
|
|
with patch.object(
|
|
player_service,
|
|
"_broadcast_state",
|
|
new_callable=AsyncMock,
|
|
):
|
|
await player_service.reload_playlist()
|
|
|
|
# Should stop and reset to first track
|
|
mock_stop.assert_called_once()
|
|
assert player_service.state.playlist_id == 2
|
|
assert player_service.state.current_sound_index == 0
|
|
assert player_service.state.current_sound_id == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reload_playlist_same_id_track_moved(self, player_service) -> None:
|
|
"""Test reload when playlist ID same but track moved to different index."""
|
|
# Setup current state
|
|
player_service.state.playlist_id = 1
|
|
player_service.state.current_sound_id = 2 # Currently playing track 2
|
|
player_service.state.current_sound_index = 1 # At index 1
|
|
|
|
mock_session = AsyncMock()
|
|
player_service.db_session_factory = lambda: mock_session
|
|
|
|
with patch("app.services.player.PlaylistRepository") as mock_repo_class:
|
|
mock_repo = AsyncMock()
|
|
mock_repo_class.return_value = mock_repo
|
|
|
|
# Same playlist ID
|
|
mock_playlist = Mock()
|
|
mock_playlist.id = 1
|
|
mock_playlist.name = "Same Playlist"
|
|
mock_repo.get_current_playlist.return_value = (
|
|
mock_playlist # Return current playlist directly
|
|
)
|
|
|
|
# Track 2 moved to index 0
|
|
sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000)
|
|
sound2 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000)
|
|
mock_sounds = [sound1, sound2] # Track 2 now at index 0
|
|
mock_repo.get_playlist_sounds.return_value = mock_sounds
|
|
|
|
with patch.object(
|
|
player_service,
|
|
"_broadcast_state",
|
|
new_callable=AsyncMock,
|
|
):
|
|
await player_service.reload_playlist()
|
|
|
|
# Should update index but keep same track
|
|
assert player_service.state.playlist_id == 1
|
|
assert player_service.state.current_sound_index == 0 # Updated index
|
|
assert player_service.state.current_sound_id == 2 # Same track
|
|
assert player_service.state.current_sound == sound1
|
|
|
|
def test_get_next_index_continuous_mode(self, player_service) -> None:
|
|
"""Test getting next index in continuous mode."""
|
|
player_service.state.mode = PlayerMode.CONTINUOUS
|
|
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
|
|
|
|
# Test normal progression
|
|
assert player_service._get_next_index(0) == 1
|
|
assert player_service._get_next_index(1) == 2
|
|
|
|
# Test end of playlist
|
|
assert player_service._get_next_index(2) is None
|
|
|
|
def test_get_next_index_loop_mode(self, player_service) -> None:
|
|
"""Test getting next index in loop mode."""
|
|
player_service.state.mode = PlayerMode.LOOP
|
|
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
|
|
|
|
# Test normal progression
|
|
assert player_service._get_next_index(0) == 1
|
|
assert player_service._get_next_index(1) == 2
|
|
|
|
# Test wrapping to beginning
|
|
assert player_service._get_next_index(2) == 0
|
|
|
|
def test_get_next_index_loop_one_mode(self, player_service) -> None:
|
|
"""Test getting next index in loop one mode."""
|
|
player_service.state.mode = PlayerMode.LOOP_ONE
|
|
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
|
|
|
|
# Should always return same index
|
|
assert player_service._get_next_index(0) == 0
|
|
assert player_service._get_next_index(1) == 1
|
|
assert player_service._get_next_index(2) == 2
|
|
|
|
def test_get_next_index_single_mode(self, player_service) -> None:
|
|
"""Test getting next index in single mode."""
|
|
player_service.state.mode = PlayerMode.SINGLE
|
|
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
|
|
|
|
# Should always return None
|
|
assert player_service._get_next_index(0) is None
|
|
assert player_service._get_next_index(1) is None
|
|
assert player_service._get_next_index(2) is None
|
|
|
|
def test_get_next_index_random_mode(self, player_service) -> None:
|
|
"""Test getting next index in random mode."""
|
|
player_service.state.mode = PlayerMode.RANDOM
|
|
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
|
|
|
|
with patch("random.choice") as mock_choice:
|
|
mock_choice.return_value = 2
|
|
|
|
result = player_service._get_next_index(0)
|
|
|
|
assert result == 2
|
|
# Should exclude current index
|
|
mock_choice.assert_called_once_with([1, 2])
|
|
|
|
def test_get_previous_index(self, player_service) -> None:
|
|
"""Test getting previous index."""
|
|
player_service.state.playlist_sounds = [Mock(), Mock(), Mock()]
|
|
|
|
# Test normal progression
|
|
assert player_service._get_previous_index(2) == 1
|
|
assert player_service._get_previous_index(1) == 0
|
|
|
|
# Test beginning without loop
|
|
player_service.state.mode = PlayerMode.CONTINUOUS
|
|
assert player_service._get_previous_index(0) is None
|
|
|
|
# Test beginning with loop
|
|
player_service.state.mode = PlayerMode.LOOP
|
|
assert player_service._get_previous_index(0) == 2
|
|
|
|
def test_update_play_time(self, player_service) -> None:
|
|
"""Test updating play time tracking."""
|
|
# Setup state
|
|
player_service.state.status = PlayerStatus.PLAYING
|
|
player_service.state.current_sound_id = 1
|
|
player_service.state.current_sound_position = 5000
|
|
player_service.state.current_sound_duration = 30000
|
|
|
|
# Initialize tracking
|
|
current_time = time.time()
|
|
player_service._play_time_tracking[1] = {
|
|
"total_time": 1000,
|
|
"last_position": 4000,
|
|
"last_update": current_time - 1.0, # 1 second ago
|
|
"threshold_reached": False,
|
|
}
|
|
|
|
with patch("app.services.player.time.time", return_value=current_time):
|
|
player_service._update_play_time()
|
|
|
|
tracking = player_service._play_time_tracking[1]
|
|
assert tracking["total_time"] == 2000 # Added 1 second (1000ms)
|
|
assert tracking["last_position"] == 5000
|
|
assert tracking["last_update"] == current_time
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_record_play_count(self, player_service) -> None:
|
|
"""Test recording play count for a sound."""
|
|
mock_session = AsyncMock()
|
|
player_service.db_session_factory = lambda: mock_session
|
|
|
|
# Mock repositories
|
|
with patch("app.services.player.SoundRepository") as mock_sound_repo_class:
|
|
mock_sound_repo = AsyncMock()
|
|
mock_sound_repo_class.return_value = mock_sound_repo
|
|
|
|
# Mock sound
|
|
mock_sound = Mock()
|
|
mock_sound.play_count = 5
|
|
mock_sound_repo.get_by_id.return_value = mock_sound
|
|
|
|
await player_service._record_play_count(1)
|
|
|
|
# Verify sound play count was updated
|
|
mock_sound_repo.update.assert_called_once_with(
|
|
mock_sound,
|
|
{"play_count": 6},
|
|
)
|
|
|
|
# Verify SoundPlayed record was created with None user_id for player
|
|
mock_session.add.assert_called_once()
|
|
mock_session.commit.assert_called_once()
|
|
|
|
def test_get_state(self, player_service) -> None:
|
|
"""Test getting current player state."""
|
|
result = player_service.get_state()
|
|
assert isinstance(result, dict)
|
|
assert "status" in result
|
|
assert "mode" in result
|
|
assert "volume" in result
|
|
|
|
def test_uses_shared_sound_path_utility(self, player_service) -> None:
|
|
"""Test that player service uses the shared sound path utility."""
|
|
sound = Sound(
|
|
id=1,
|
|
name="Test Song",
|
|
filename="test.mp3",
|
|
type="SDB",
|
|
is_normalized=False,
|
|
)
|
|
player_service.state.playlist_sounds = [sound]
|
|
|
|
with patch("app.services.player.get_sound_file_path") as mock_path:
|
|
mock_file_path = Mock(spec=Path)
|
|
mock_file_path.exists.return_value = False # File doesn't exist
|
|
mock_path.return_value = mock_file_path
|
|
|
|
# This should fail because file doesn't exist
|
|
asyncio.run(player_service.play(0))
|
|
# Verify the utility was called
|
|
mock_path.assert_called_once_with(sound)
|
|
|
|
|
|
class TestPlayerServiceGlobalFunctions:
|
|
"""Test global player service functions."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_initialize_player_service(self) -> None:
|
|
"""Test initializing global player service."""
|
|
mock_factory = Mock()
|
|
|
|
with patch("app.services.player.PlayerService") as mock_service_class:
|
|
mock_service = AsyncMock()
|
|
mock_service_class.return_value = mock_service
|
|
|
|
await initialize_player_service(mock_factory)
|
|
|
|
mock_service_class.assert_called_once_with(mock_factory)
|
|
mock_service.start.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shutdown_player_service(self) -> None:
|
|
"""Test shutting down global player service."""
|
|
# Mock global player service exists
|
|
with patch("app.services.player.player_service") as mock_global:
|
|
mock_service = AsyncMock()
|
|
mock_global.__bool__ = lambda x: True # Service exists
|
|
type(mock_global).__bool__ = lambda x: True
|
|
|
|
with patch("app.services.player.player_service", mock_service):
|
|
await shutdown_player_service()
|
|
mock_service.stop.assert_called_once()
|
|
|
|
def test_get_player_service_success(self) -> None:
|
|
"""Test getting player service when initialized."""
|
|
mock_service = Mock()
|
|
|
|
with patch("app.services.player.player_service", mock_service):
|
|
result = get_player_service()
|
|
assert result is mock_service
|
|
|
|
def test_get_player_service_not_initialized(self) -> None:
|
|
"""Test getting player service when not initialized."""
|
|
with patch("app.services.player.player_service", None):
|
|
with pytest.raises(RuntimeError, match="Player service not initialized"):
|
|
get_player_service()
|