"""Tests for player service.""" import asyncio import threading import time from pathlib import Path from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest from sqlmodel.ext.asyncio.session import AsyncSession from app.models.sound import Sound from app.models.sound_played import SoundPlayed from app.models.user import User from app.services.player import ( PlayerMode, PlayerService, PlayerState, PlayerStatus, get_player_service, initialize_player_service, shutdown_player_service, ) from app.utils.audio import get_sound_file_path class TestPlayerState: """Test player state data structure.""" def test_init_creates_default_state(self): """Test that player state initializes with default values.""" state = PlayerState() assert state.status == PlayerStatus.STOPPED assert state.mode == PlayerMode.CONTINUOUS assert state.volume == 50 assert state.current_sound_id is None assert state.current_sound_index is None assert state.current_sound_position == 0 assert state.current_sound_duration == 0 assert state.current_sound is None assert state.playlist_id is None assert state.playlist_name == "" assert state.playlist_length == 0 assert state.playlist_duration == 0 assert state.playlist_sounds == [] def test_to_dict_serializes_correctly(self): """Test that player state serializes to dict correctly.""" state = PlayerState() state.status = PlayerStatus.PLAYING state.mode = PlayerMode.LOOP state.volume = 75 state.current_sound_id = 1 state.current_sound_index = 0 state.current_sound_position = 5000 state.current_sound_duration = 30000 state.playlist_id = 1 state.playlist_name = "Test Playlist" state.playlist_length = 5 state.playlist_duration = 150000 result = state.to_dict() assert result["status"] == "playing" assert result["mode"] == "loop" assert result["volume"] == 75 assert result["position"] == 5000 assert result["duration"] == 30000 assert result["index"] == 0 assert result["playlist"]["id"] == 1 assert result["playlist"]["name"] == "Test Playlist" assert result["playlist"]["length"] == 5 assert result["playlist"]["duration"] == 150000 def test_serialize_sound_with_sound_object(self): """Test serializing a sound object.""" state = PlayerState() sound = Sound( id=1, name="Test Song", filename="test.mp3", duration=30000, size=1024, type="SDB", thumbnail="test.jpg", play_count=5, ) result = state._serialize_sound(sound) assert result["id"] == 1 assert result["name"] == "Test Song" assert result["filename"] == "test.mp3" assert result["duration"] == 30000 assert result["size"] == 1024 assert result["type"] == "SDB" assert result["thumbnail"] == "test.jpg" assert result["play_count"] == 5 def test_serialize_sound_with_none(self): """Test serializing None sound.""" state = PlayerState() result = state._serialize_sound(None) assert result is None class TestPlayerService: """Test player service functionality.""" @pytest.fixture def mock_db_session_factory(self): """Create a mock database session factory.""" session = AsyncMock(spec=AsyncSession) return lambda: session @pytest.fixture def mock_vlc_instance(self): """Create a mock VLC instance.""" with patch("app.services.player.vlc") as mock_vlc: mock_instance = Mock() mock_player = Mock() mock_vlc.Instance.return_value = mock_instance mock_instance.media_player_new.return_value = mock_player mock_vlc.State = Mock() mock_vlc.State.Ended = "ended" yield mock_vlc @pytest.fixture def mock_socket_manager(self): """Create a mock socket manager.""" with patch("app.services.player.socket_manager") as mock: mock.broadcast_to_all = AsyncMock() yield mock @pytest.fixture def player_service(self, mock_db_session_factory, mock_vlc_instance, mock_socket_manager): """Create a player service instance for testing.""" service = PlayerService(mock_db_session_factory) return service def test_init_creates_player_service(self, mock_db_session_factory, mock_vlc_instance): """Test that player service initializes correctly.""" with patch("app.services.player.socket_manager"): service = PlayerService(mock_db_session_factory) assert service.db_session_factory is mock_db_session_factory assert isinstance(service.state, PlayerState) assert service._vlc_instance is not None assert service._player is not None assert service._is_running is False assert service._position_thread is None assert service._play_time_tracking == {} assert isinstance(service._lock, type(threading.Lock())) assert service._background_tasks == set() assert service._loop is None @pytest.mark.asyncio async def test_start_initializes_service(self, player_service, mock_vlc_instance): """Test that start method initializes the service.""" with patch.object(player_service, "reload_playlist", new_callable=AsyncMock): await player_service.start() assert player_service._is_running is True assert player_service._loop is not None assert player_service._position_thread is not None assert player_service._position_thread.daemon is True player_service._player.audio_set_volume.assert_called_once_with(50) @pytest.mark.asyncio async def test_stop_cleans_up_service(self, player_service): """Test that stop method cleans up the service.""" # Setup initial state player_service._is_running = True player_service._position_thread = Mock() player_service._position_thread.is_alive.return_value = True with patch.object(player_service, "_stop_playback", new_callable=AsyncMock): await player_service.stop() assert player_service._is_running is False player_service._position_thread.join.assert_called_once_with(timeout=2.0) player_service._player.release.assert_called_once() @pytest.mark.asyncio async def test_play_new_track(self, player_service, mock_vlc_instance): """Test playing a new track.""" # Setup test sound sound = Sound( id=1, name="Test Song", filename="test.mp3", duration=30000, size=1024, type="SDB", ) player_service.state.playlist_sounds = [sound] with patch("app.services.player.get_sound_file_path") as mock_path: mock_file_path = Mock(spec=Path) mock_file_path.exists.return_value = True mock_path.return_value = mock_file_path with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): mock_media = Mock() player_service._vlc_instance.media_new.return_value = mock_media player_service._player.play.return_value = 0 # Success await player_service.play(0) assert player_service.state.status == PlayerStatus.PLAYING assert player_service.state.current_sound == sound assert player_service.state.current_sound_id == 1 assert player_service.state.current_sound_index == 0 assert 1 in player_service._play_time_tracking @pytest.mark.asyncio async def test_play_resume_from_pause(self, player_service): """Test resuming playback from pause.""" # Setup paused state sound = Sound(id=1, name="Test Song", filename="test.mp3", duration=30000) player_service.state.status = PlayerStatus.PAUSED player_service.state.current_sound = sound player_service.state.current_sound_id = 1 player_service.state.current_sound_position = 5000 with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): player_service._player.play.return_value = 0 # Success await player_service.play() assert player_service.state.status == PlayerStatus.PLAYING player_service._player.play.assert_called_once() @pytest.mark.asyncio async def test_play_invalid_index(self, player_service): """Test playing with invalid index raises ValueError.""" player_service.state.playlist_sounds = [] with pytest.raises(ValueError, match="Invalid sound index"): await player_service.play(0) @pytest.mark.asyncio async def test_pause_when_playing(self, player_service): """Test pausing when currently playing.""" player_service.state.status = PlayerStatus.PLAYING with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): await player_service.pause() assert player_service.state.status == PlayerStatus.PAUSED player_service._player.pause.assert_called_once() @pytest.mark.asyncio async def test_pause_when_not_playing(self, player_service): """Test pausing when not playing does nothing.""" player_service.state.status = PlayerStatus.STOPPED with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock) as mock_broadcast: await player_service.pause() assert player_service.state.status == PlayerStatus.STOPPED mock_broadcast.assert_not_called() @pytest.mark.asyncio async def test_stop_playback(self, player_service): """Test stopping playback.""" player_service.state.status = PlayerStatus.PLAYING player_service.state.current_sound_position = 5000 with patch.object(player_service, "_process_play_count", new_callable=AsyncMock): with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): await player_service.stop_playback() assert player_service.state.status == PlayerStatus.STOPPED assert player_service.state.current_sound_position == 0 player_service._player.stop.assert_called_once() @pytest.mark.asyncio async def test_next_track(self, player_service): """Test skipping to next track.""" sound1 = Sound(id=1, name="Song 1", filename="song1.mp3") sound2 = Sound(id=2, name="Song 2", filename="song2.mp3") player_service.state.playlist_sounds = [sound1, sound2] player_service.state.current_sound_index = 0 with patch.object(player_service, "play", new_callable=AsyncMock) as mock_play: await player_service.next() mock_play.assert_called_once_with(1) @pytest.mark.asyncio async def test_previous_track(self, player_service): """Test going to previous track.""" sound1 = Sound(id=1, name="Song 1", filename="song1.mp3") sound2 = Sound(id=2, name="Song 2", filename="song2.mp3") player_service.state.playlist_sounds = [sound1, sound2] player_service.state.current_sound_index = 1 with patch.object(player_service, "play", new_callable=AsyncMock) as mock_play: await player_service.previous() mock_play.assert_called_once_with(0) @pytest.mark.asyncio async def test_seek_position(self, player_service): """Test seeking to specific position.""" player_service.state.status = PlayerStatus.PLAYING player_service.state.current_sound_duration = 30000 with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): await player_service.seek(15000) # Position should be 0.5 (50% of track) player_service._player.set_position.assert_called_once_with(0.5) assert player_service.state.current_sound_position == 15000 @pytest.mark.asyncio async def test_seek_when_stopped(self, player_service): """Test seeking when stopped does nothing.""" player_service.state.status = PlayerStatus.STOPPED with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock) as mock_broadcast: await player_service.seek(15000) player_service._player.set_position.assert_not_called() mock_broadcast.assert_not_called() @pytest.mark.asyncio async def test_set_volume(self, player_service): """Test setting volume.""" with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): await player_service.set_volume(75) assert player_service.state.volume == 75 player_service._player.audio_set_volume.assert_called_once_with(75) @pytest.mark.asyncio async def test_set_volume_clamping(self, player_service): """Test volume clamping to valid range.""" with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): # Test upper bound await player_service.set_volume(150) assert player_service.state.volume == 100 # Test lower bound await player_service.set_volume(-10) assert player_service.state.volume == 0 @pytest.mark.asyncio async def test_set_mode(self, player_service): """Test setting playback mode.""" with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): await player_service.set_mode(PlayerMode.LOOP) assert player_service.state.mode == PlayerMode.LOOP @pytest.mark.asyncio async def test_reload_playlist(self, player_service): """Test reloading playlist from database.""" mock_session = AsyncMock() player_service.db_session_factory = lambda: mock_session # Mock playlist repository with patch("app.services.player.PlaylistRepository") as mock_repo_class: mock_repo = AsyncMock() mock_repo_class.return_value = mock_repo # Mock playlist data mock_playlist = Mock() mock_playlist.id = 1 mock_playlist.name = "Test Playlist" mock_repo.get_main_playlist.return_value = mock_playlist # Mock sounds sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000) sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000) mock_sounds = [sound1, sound2] mock_repo.get_playlist_sounds.return_value = mock_sounds with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): await player_service.reload_playlist() assert player_service.state.playlist_id == 1 assert player_service.state.playlist_name == "Test Playlist" assert player_service.state.playlist_sounds == mock_sounds assert player_service.state.playlist_length == 2 assert player_service.state.playlist_duration == 75000 @pytest.mark.asyncio async def test_handle_playlist_id_changed(self, player_service): """Test handling when playlist ID changes.""" # Setup initial state player_service.state.status = PlayerStatus.PLAYING player_service.state.current_sound_id = 1 player_service.state.current_sound_index = 0 # Create test sounds sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000) sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000) sounds = [sound1, sound2] with patch.object(player_service, "_stop_playback", new_callable=AsyncMock) as mock_stop: await player_service._handle_playlist_id_changed(1, 2, sounds) # Should stop playback and set first track as current mock_stop.assert_called_once() assert player_service.state.current_sound_index == 0 assert player_service.state.current_sound == sound1 assert player_service.state.current_sound_id == 1 @pytest.mark.asyncio async def test_handle_playlist_id_changed_empty_playlist(self, player_service): """Test handling playlist ID change with empty playlist.""" player_service.state.status = PlayerStatus.PLAYING with patch.object(player_service, "_stop_playback", new_callable=AsyncMock) as mock_stop: await player_service._handle_playlist_id_changed(1, 2, []) mock_stop.assert_called_once() assert player_service.state.current_sound_index is None assert player_service.state.current_sound is None assert player_service.state.current_sound_id is None @pytest.mark.asyncio async def test_handle_same_playlist_track_exists_same_index(self, player_service): """Test handling same playlist when track exists at same index.""" sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000) sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000) sounds = [sound1, sound2] await player_service._handle_same_playlist_track_check(1, 0, sounds) # Should update sound object reference but keep same index assert player_service.state.current_sound_index == 0 # Should be set to 0 from new_index assert player_service.state.current_sound == sound1 @pytest.mark.asyncio async def test_handle_same_playlist_track_exists_different_index(self, player_service): """Test handling same playlist when track exists at different index.""" sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000) sound2 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000) sounds = [sound1, sound2] # Track with ID 1 is now at index 1 await player_service._handle_same_playlist_track_check(1, 0, sounds) # Should update index and sound reference assert player_service.state.current_sound_index == 1 assert player_service.state.current_sound == sound2 @pytest.mark.asyncio async def test_handle_same_playlist_track_not_found(self, player_service): """Test handling same playlist when current track no longer exists.""" player_service.state.status = PlayerStatus.PLAYING sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000) sound2 = Sound(id=3, name="Song 3", filename="song3.mp3", duration=60000) sounds = [sound1, sound2] # Track with ID 1 is missing with patch.object(player_service, "_handle_track_removed", new_callable=AsyncMock) as mock_removed: await player_service._handle_same_playlist_track_check(1, 0, sounds) mock_removed.assert_called_once_with(1, sounds) @pytest.mark.asyncio async def test_handle_track_removed_with_sounds(self, player_service): """Test handling when current track is removed with sounds available.""" player_service.state.status = PlayerStatus.PLAYING sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000) sounds = [sound1] with patch.object(player_service, "_stop_playback", new_callable=AsyncMock) as mock_stop: await player_service._handle_track_removed(1, sounds) mock_stop.assert_called_once() assert player_service.state.current_sound_index == 0 assert player_service.state.current_sound == sound1 assert player_service.state.current_sound_id == 2 @pytest.mark.asyncio async def test_handle_track_removed_empty_playlist(self, player_service): """Test handling when current track is removed with empty playlist.""" player_service.state.status = PlayerStatus.PLAYING with patch.object(player_service, "_stop_playback", new_callable=AsyncMock) as mock_stop: await player_service._handle_track_removed(1, []) mock_stop.assert_called_once() assert player_service.state.current_sound_index is None assert player_service.state.current_sound is None assert player_service.state.current_sound_id is None def test_update_playlist_state(self, player_service): """Test updating playlist state information.""" mock_playlist = Mock() mock_playlist.id = 5 mock_playlist.name = "New Playlist" sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000) sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000) sounds = [sound1, sound2] player_service._update_playlist_state(mock_playlist, sounds) assert player_service.state.playlist_id == 5 assert player_service.state.playlist_name == "New Playlist" assert player_service.state.playlist_sounds == sounds assert player_service.state.playlist_length == 2 assert player_service.state.playlist_duration == 75000 def test_find_sound_index_found(self, player_service): """Test finding sound index when sound exists.""" sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000) sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000) sounds = [sound1, sound2] index = player_service._find_sound_index(2, sounds) assert index == 1 def test_find_sound_index_not_found(self, player_service): """Test finding sound index when sound doesn't exist.""" sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000) sounds = [sound1] index = player_service._find_sound_index(999, sounds) assert index is None def test_set_first_track_as_current(self, player_service): """Test setting first track as current.""" sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000) sound2 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000) sounds = [sound1, sound2] player_service._set_first_track_as_current(sounds) assert player_service.state.current_sound_index == 0 assert player_service.state.current_sound == sound1 assert player_service.state.current_sound_id == 1 def test_clear_current_track(self, player_service): """Test clearing current track state.""" # Set some initial state player_service.state.current_sound_index = 2 player_service.state.current_sound = Mock() player_service.state.current_sound_id = 5 player_service._clear_current_track() assert player_service.state.current_sound_index is None assert player_service.state.current_sound is None assert player_service.state.current_sound_id is None @pytest.mark.asyncio async def test_reload_playlist_different_id_scenario(self, player_service): """Test complete reload scenario when playlist ID changes.""" # Setup current state player_service.state.playlist_id = 1 player_service.state.current_sound_id = 1 player_service.state.current_sound_index = 0 player_service.state.status = PlayerStatus.PLAYING mock_session = AsyncMock() player_service.db_session_factory = lambda: mock_session with patch("app.services.player.PlaylistRepository") as mock_repo_class: mock_repo = AsyncMock() mock_repo_class.return_value = mock_repo # Mock new playlist with different ID mock_playlist = Mock() mock_playlist.id = 2 # Different ID mock_playlist.name = "New Playlist" mock_repo.get_main_playlist.return_value = mock_playlist sound1 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000) mock_sounds = [sound1] mock_repo.get_playlist_sounds.return_value = mock_sounds with patch.object(player_service, "_stop_playback", new_callable=AsyncMock) as mock_stop: with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): await player_service.reload_playlist() # Should stop and reset to first track mock_stop.assert_called_once() assert player_service.state.playlist_id == 2 assert player_service.state.current_sound_index == 0 assert player_service.state.current_sound_id == 1 @pytest.mark.asyncio async def test_reload_playlist_same_id_track_moved(self, player_service): """Test reload when playlist ID same but track moved to different index.""" # Setup current state player_service.state.playlist_id = 1 player_service.state.current_sound_id = 2 # Currently playing track 2 player_service.state.current_sound_index = 1 # At index 1 mock_session = AsyncMock() player_service.db_session_factory = lambda: mock_session with patch("app.services.player.PlaylistRepository") as mock_repo_class: mock_repo = AsyncMock() mock_repo_class.return_value = mock_repo # Same playlist ID mock_playlist = Mock() mock_playlist.id = 1 mock_playlist.name = "Same Playlist" mock_repo.get_main_playlist.return_value = mock_playlist # Track 2 moved to index 0 sound1 = Sound(id=2, name="Song 2", filename="song2.mp3", duration=45000) sound2 = Sound(id=1, name="Song 1", filename="song1.mp3", duration=30000) mock_sounds = [sound1, sound2] # Track 2 now at index 0 mock_repo.get_playlist_sounds.return_value = mock_sounds with patch.object(player_service, "_broadcast_state", new_callable=AsyncMock): await player_service.reload_playlist() # Should update index but keep same track assert player_service.state.playlist_id == 1 assert player_service.state.current_sound_index == 0 # Updated index assert player_service.state.current_sound_id == 2 # Same track assert player_service.state.current_sound == sound1 def test_get_next_index_continuous_mode(self, player_service): """Test getting next index in continuous mode.""" player_service.state.mode = PlayerMode.CONTINUOUS player_service.state.playlist_sounds = [Mock(), Mock(), Mock()] # Test normal progression assert player_service._get_next_index(0) == 1 assert player_service._get_next_index(1) == 2 # Test end of playlist assert player_service._get_next_index(2) is None def test_get_next_index_loop_mode(self, player_service): """Test getting next index in loop mode.""" player_service.state.mode = PlayerMode.LOOP player_service.state.playlist_sounds = [Mock(), Mock(), Mock()] # Test normal progression assert player_service._get_next_index(0) == 1 assert player_service._get_next_index(1) == 2 # Test wrapping to beginning assert player_service._get_next_index(2) == 0 def test_get_next_index_loop_one_mode(self, player_service): """Test getting next index in loop one mode.""" player_service.state.mode = PlayerMode.LOOP_ONE player_service.state.playlist_sounds = [Mock(), Mock(), Mock()] # Should always return same index assert player_service._get_next_index(0) == 0 assert player_service._get_next_index(1) == 1 assert player_service._get_next_index(2) == 2 def test_get_next_index_single_mode(self, player_service): """Test getting next index in single mode.""" player_service.state.mode = PlayerMode.SINGLE player_service.state.playlist_sounds = [Mock(), Mock(), Mock()] # Should always return None assert player_service._get_next_index(0) is None assert player_service._get_next_index(1) is None assert player_service._get_next_index(2) is None def test_get_next_index_random_mode(self, player_service): """Test getting next index in random mode.""" player_service.state.mode = PlayerMode.RANDOM player_service.state.playlist_sounds = [Mock(), Mock(), Mock()] with patch("random.choice") as mock_choice: mock_choice.return_value = 2 result = player_service._get_next_index(0) assert result == 2 # Should exclude current index mock_choice.assert_called_once_with([1, 2]) def test_get_previous_index(self, player_service): """Test getting previous index.""" player_service.state.playlist_sounds = [Mock(), Mock(), Mock()] # Test normal progression assert player_service._get_previous_index(2) == 1 assert player_service._get_previous_index(1) == 0 # Test beginning without loop player_service.state.mode = PlayerMode.CONTINUOUS assert player_service._get_previous_index(0) is None # Test beginning with loop player_service.state.mode = PlayerMode.LOOP assert player_service._get_previous_index(0) == 2 def test_update_play_time(self, player_service): """Test updating play time tracking.""" # Setup state player_service.state.status = PlayerStatus.PLAYING player_service.state.current_sound_id = 1 player_service.state.current_sound_position = 5000 player_service.state.current_sound_duration = 30000 # Initialize tracking current_time = time.time() player_service._play_time_tracking[1] = { "total_time": 1000, "last_position": 4000, "last_update": current_time - 1.0, # 1 second ago "threshold_reached": False, } with patch("app.services.player.time.time", return_value=current_time): player_service._update_play_time() tracking = player_service._play_time_tracking[1] assert tracking["total_time"] == 2000 # Added 1 second (1000ms) assert tracking["last_position"] == 5000 assert tracking["last_update"] == current_time @pytest.mark.asyncio async def test_record_play_count(self, player_service): """Test recording play count for a sound.""" mock_session = AsyncMock() player_service.db_session_factory = lambda: mock_session # Mock repositories with patch("app.services.player.SoundRepository") as mock_sound_repo_class: mock_sound_repo = AsyncMock() mock_sound_repo_class.return_value = mock_sound_repo # Mock sound mock_sound = Mock() mock_sound.play_count = 5 mock_sound_repo.get_by_id.return_value = mock_sound await player_service._record_play_count(1) # Verify sound play count was updated mock_sound_repo.update.assert_called_once_with( mock_sound, {"play_count": 6} ) # Verify SoundPlayed record was created with None user_id for player mock_session.add.assert_called_once() mock_session.commit.assert_called_once() def test_get_state(self, player_service): """Test getting current player state.""" result = player_service.get_state() assert isinstance(result, dict) assert "status" in result assert "mode" in result assert "volume" in result def test_uses_shared_sound_path_utility(self, player_service): """Test that player service uses the shared sound path utility.""" sound = Sound( id=1, name="Test Song", filename="test.mp3", type="SDB", is_normalized=False, ) player_service.state.playlist_sounds = [sound] with patch("app.services.player.get_sound_file_path") as mock_path: mock_file_path = Mock(spec=Path) mock_file_path.exists.return_value = False # File doesn't exist mock_path.return_value = mock_file_path # This should fail because file doesn't exist result = asyncio.run(player_service.play(0)) # Verify the utility was called mock_path.assert_called_once_with(sound) class TestPlayerServiceGlobalFunctions: """Test global player service functions.""" @pytest.mark.asyncio async def test_initialize_player_service(self): """Test initializing global player service.""" mock_factory = Mock() with patch("app.services.player.PlayerService") as mock_service_class: mock_service = AsyncMock() mock_service_class.return_value = mock_service await initialize_player_service(mock_factory) mock_service_class.assert_called_once_with(mock_factory) mock_service.start.assert_called_once() @pytest.mark.asyncio async def test_shutdown_player_service(self): """Test shutting down global player service.""" # Mock global player service exists with patch("app.services.player.player_service") as mock_global: mock_service = AsyncMock() mock_global.__bool__ = lambda x: True # Service exists type(mock_global).__bool__ = lambda x: True with patch("app.services.player.player_service", mock_service): await shutdown_player_service() mock_service.stop.assert_called_once() def test_get_player_service_success(self): """Test getting player service when initialized.""" mock_service = Mock() with patch("app.services.player.player_service", mock_service): result = get_player_service() assert result is mock_service def test_get_player_service_not_initialized(self): """Test getting player service when not initialized.""" with patch("app.services.player.player_service", None): with pytest.raises(RuntimeError, match="Player service not initialized"): get_player_service()