"""Tests for playlist repository.""" from collections.abc import AsyncGenerator import pytest import pytest_asyncio from sqlmodel.ext.asyncio.session import AsyncSession from app.models.playlist import Playlist from app.models.sound import Sound from app.models.user import User from app.repositories.playlist import PlaylistRepository class TestPlaylistRepository: """Test playlist repository operations.""" @pytest_asyncio.fixture async def playlist_repository( self, test_session: AsyncSession, ) -> AsyncGenerator[PlaylistRepository, None]: """Create a playlist repository instance.""" yield PlaylistRepository(test_session) @pytest_asyncio.fixture async def test_playlist( self, test_session: AsyncSession, test_user: User, ) -> Playlist: """Create a test playlist.""" playlist = Playlist( user_id=test_user.id, name="Test Playlist", description="A test playlist", genre="test", is_main=False, is_current=False, is_deletable=True, ) test_session.add(playlist) await test_session.commit() await test_session.refresh(playlist) return playlist @pytest_asyncio.fixture async def main_playlist( self, test_session: AsyncSession, ) -> Playlist: """Create a main playlist.""" playlist = Playlist( user_id=None, name="Main Playlist", description="Main playlist", is_main=True, is_current=False, is_deletable=False, ) test_session.add(playlist) await test_session.commit() await test_session.refresh(playlist) return playlist @pytest_asyncio.fixture async def test_sound( self, test_session: AsyncSession, ) -> Sound: """Create a test sound.""" sound = Sound( name="Test Sound", filename="test.mp3", type="SDB", duration=5000, size=1024, hash="test_hash", play_count=0, ) test_session.add(sound) await test_session.commit() await test_session.refresh(sound) return sound @pytest.mark.asyncio async def test_get_by_id_existing( self, playlist_repository: PlaylistRepository, test_playlist: Playlist, ) -> None: """Test getting playlist by ID when playlist exists.""" assert test_playlist.id is not None playlist = await playlist_repository.get_by_id(test_playlist.id) assert playlist is not None assert playlist.id == test_playlist.id assert playlist.name == test_playlist.name assert playlist.description == test_playlist.description @pytest.mark.asyncio async def test_get_by_id_nonexistent( self, playlist_repository: PlaylistRepository, ) -> None: """Test getting playlist by ID when playlist doesn't exist.""" playlist = await playlist_repository.get_by_id(99999) assert playlist is None @pytest.mark.asyncio async def test_get_by_name_existing( self, playlist_repository: PlaylistRepository, test_playlist: Playlist, ) -> None: """Test getting playlist by name when playlist exists.""" playlist = await playlist_repository.get_by_name(test_playlist.name) assert playlist is not None assert playlist.id == test_playlist.id assert playlist.name == test_playlist.name @pytest.mark.asyncio async def test_get_by_name_nonexistent( self, playlist_repository: PlaylistRepository, ) -> None: """Test getting playlist by name when playlist doesn't exist.""" playlist = await playlist_repository.get_by_name("Nonexistent Playlist") assert playlist is None @pytest.mark.asyncio async def test_get_by_user_id( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ensure_plans, ) -> None: """Test getting playlists by user ID.""" # Create test user within this test from app.utils.auth import PasswordUtils user = User( email="test@example.com", name="Test User", password_hash=PasswordUtils.hash_password("password123"), role="user", is_active=True, plan_id=ensure_plans[0].id, credits=100, ) test_session.add(user) await test_session.commit() await test_session.refresh(user) # Extract user ID immediately after refresh user_id = user.id # Create test playlist for this user playlist = Playlist( user_id=user_id, name="Test Playlist", description="A test playlist", genre="test", is_main=False, is_current=False, is_deletable=True, ) test_session.add(playlist) await test_session.commit() # Test the repository method playlists = await playlist_repository.get_by_user_id(user_id) # Should only return user's playlists, not the main playlist (user_id=None) assert len(playlists) == 1 assert playlists[0].name == "Test Playlist" @pytest.mark.asyncio async def test_get_main_playlist( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ) -> None: """Test getting main playlist.""" # Create main playlist within this test main_playlist = Playlist( user_id=None, name="Main Playlist", description="Main playlist", is_main=True, is_current=False, is_deletable=False, ) test_session.add(main_playlist) await test_session.commit() await test_session.refresh(main_playlist) # Extract ID before async call main_playlist_id = main_playlist.id # Test the repository method playlist = await playlist_repository.get_main_playlist() assert playlist is not None assert playlist.id == main_playlist_id assert playlist.is_main is True @pytest.mark.asyncio async def test_get_current_playlist( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ensure_plans, ) -> None: """Test getting current playlist when none is set.""" # Create test user within this test from app.utils.auth import PasswordUtils user = User( email="test2@example.com", name="Test User 2", password_hash=PasswordUtils.hash_password("password123"), role="user", is_active=True, plan_id=ensure_plans[0].id, credits=100, ) test_session.add(user) await test_session.commit() await test_session.refresh(user) # Extract user ID immediately after refresh user_id = user.id # Test the repository method - should return None when no current playlist playlist = await playlist_repository.get_current_playlist(user_id) # Should return None since no user playlist is marked as current assert playlist is None @pytest.mark.asyncio async def test_create_playlist( self, playlist_repository: PlaylistRepository, test_user: User, ) -> None: """Test creating a new playlist.""" playlist_data = { "user_id": test_user.id, "name": "New Playlist", "description": "A new playlist", "genre": "rock", "is_main": False, "is_current": False, "is_deletable": True, } playlist = await playlist_repository.create(playlist_data) assert playlist.id is not None assert playlist.name == "New Playlist" assert playlist.description == "A new playlist" assert playlist.genre == "rock" assert playlist.is_main is False assert playlist.is_current is False assert playlist.is_deletable is True @pytest.mark.asyncio async def test_update_playlist( self, playlist_repository: PlaylistRepository, test_playlist: Playlist, ) -> None: """Test updating a playlist.""" update_data = { "name": "Updated Playlist", "description": "Updated description", "genre": "jazz", } updated_playlist = await playlist_repository.update(test_playlist, update_data) assert updated_playlist.name == "Updated Playlist" assert updated_playlist.description == "Updated description" assert updated_playlist.genre == "jazz" @pytest.mark.asyncio async def test_delete_playlist( self, playlist_repository: PlaylistRepository, test_playlist: Playlist, ) -> None: """Test deleting a playlist.""" playlist_id = test_playlist.id await playlist_repository.delete(test_playlist) # Verify playlist is deleted deleted_playlist = await playlist_repository.get_by_id(playlist_id) assert deleted_playlist is None @pytest.mark.asyncio async def test_search_by_name( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ensure_plans, ) -> None: """Test searching playlists by name.""" # Create test user within this test from app.utils.auth import PasswordUtils user = User( email="test3@example.com", name="Test User 3", password_hash=PasswordUtils.hash_password("password123"), role="user", is_active=True, plan_id=ensure_plans[0].id, credits=100, ) test_session.add(user) await test_session.commit() await test_session.refresh(user) # Extract user ID immediately after refresh user_id = user.id # Create test playlist test_playlist = Playlist( user_id=user_id, name="Test Playlist", description="A test playlist", genre="test", is_main=False, is_current=False, is_deletable=True, ) test_session.add(test_playlist) # Create main playlist main_playlist = Playlist( user_id=None, name="Main Playlist", description="Main playlist", is_main=True, is_current=False, is_deletable=False, ) test_session.add(main_playlist) await test_session.commit() # Search for all playlists (no user filter) all_results = await playlist_repository.search_by_name("playlist") assert len(all_results) >= 2 # Should include both user and main playlists # Search with user filter user_results = await playlist_repository.search_by_name("playlist", user_id) assert len(user_results) == 1 # Only user's playlists, not main playlist # Search for specific playlist test_results = await playlist_repository.search_by_name("test", user_id) assert len(test_results) == 1 assert test_results[0].name == "Test Playlist" @pytest.mark.asyncio async def test_add_sound_to_playlist( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ensure_plans, ) -> None: """Test adding a sound to a playlist.""" # Create test user within this test from app.utils.auth import PasswordUtils user = User( email="test4@example.com", name="Test User 4", password_hash=PasswordUtils.hash_password("password123"), role="user", is_active=True, plan_id=ensure_plans[0].id, credits=100, ) test_session.add(user) await test_session.commit() await test_session.refresh(user) # Create test playlist playlist = Playlist( user_id=user.id, name="Test Playlist", description="A test playlist", genre="test", is_main=False, is_current=False, is_deletable=True, ) test_session.add(playlist) # Create test sound sound = Sound( name="Test Sound", filename="test.mp3", type="SDB", duration=5000, size=1024, hash="test_hash", play_count=0, ) test_session.add(sound) await test_session.commit() await test_session.refresh(playlist) await test_session.refresh(sound) # Extract IDs before async call playlist_id = playlist.id sound_id = sound.id # Test the repository method playlist_sound = await playlist_repository.add_sound_to_playlist( playlist_id, sound_id ) assert playlist_sound.playlist_id == playlist_id assert playlist_sound.sound_id == sound_id assert playlist_sound.position == 0 @pytest.mark.asyncio async def test_add_sound_to_playlist_with_position( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ensure_plans, ) -> None: """Test adding a sound to a playlist with specific position.""" # Create test user within this test from app.utils.auth import PasswordUtils user = User( email="test5@example.com", name="Test User 5", password_hash=PasswordUtils.hash_password("password123"), role="user", is_active=True, plan_id=ensure_plans[0].id, credits=100, ) test_session.add(user) await test_session.commit() await test_session.refresh(user) # Extract user ID immediately after refresh user_id = user.id # Create test playlist playlist = Playlist( user_id=user_id, name="Test Playlist", description="A test playlist", genre="test", is_main=False, is_current=False, is_deletable=True, ) test_session.add(playlist) # Create test sound sound = Sound( name="Test Sound", filename="test.mp3", type="SDB", duration=5000, size=1024, hash="test_hash", play_count=0, ) test_session.add(sound) await test_session.commit() await test_session.refresh(playlist) await test_session.refresh(sound) # Extract IDs before async call playlist_id = playlist.id sound_id = sound.id # Test the repository method playlist_sound = await playlist_repository.add_sound_to_playlist( playlist_id, sound_id, position=5 ) assert playlist_sound.position == 5 @pytest.mark.asyncio async def test_remove_sound_from_playlist( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ensure_plans, ) -> None: """Test removing a sound from a playlist.""" # Create objects within this test from app.utils.auth import PasswordUtils user = User( email="test@example.com", name="Test User", password_hash=PasswordUtils.hash_password("password123"), role="user", is_active=True, plan_id=ensure_plans[0].id, credits=100, ) test_session.add(user) await test_session.commit() await test_session.refresh(user) user_id = user.id playlist = Playlist( user_id=user_id, name="Test Playlist", description="A test playlist", genre="test", is_main=False, is_current=False, is_deletable=True, ) test_session.add(playlist) sound = Sound( name="Test Sound", filename="test.mp3", type="SDB", duration=5000, size=1024, hash="test_hash", play_count=0, ) test_session.add(sound) await test_session.commit() await test_session.refresh(playlist) await test_session.refresh(sound) # Extract IDs before async calls playlist_id = playlist.id sound_id = sound.id # First add the sound await playlist_repository.add_sound_to_playlist(playlist_id, sound_id) # Verify it was added assert await playlist_repository.is_sound_in_playlist( playlist_id, sound_id ) # Remove the sound await playlist_repository.remove_sound_from_playlist( playlist_id, sound_id ) # Verify it was removed assert not await playlist_repository.is_sound_in_playlist( playlist_id, sound_id ) @pytest.mark.asyncio async def test_get_playlist_sounds( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ensure_plans, ) -> None: """Test getting sounds in a playlist.""" # Create objects within this test from app.utils.auth import PasswordUtils user = User( email="test@example.com", name="Test User", password_hash=PasswordUtils.hash_password("password123"), role="user", is_active=True, plan_id=ensure_plans[0].id, credits=100, ) test_session.add(user) await test_session.commit() await test_session.refresh(user) user_id = user.id playlist = Playlist( user_id=user_id, name="Test Playlist", description="A test playlist", genre="test", is_main=False, is_current=False, is_deletable=True, ) test_session.add(playlist) sound = Sound( name="Test Sound", filename="test.mp3", type="SDB", duration=5000, size=1024, hash="test_hash", play_count=0, ) test_session.add(sound) await test_session.commit() await test_session.refresh(playlist) await test_session.refresh(sound) # Extract IDs before async calls playlist_id = playlist.id sound_id = sound.id # Initially empty sounds = await playlist_repository.get_playlist_sounds(playlist_id) assert len(sounds) == 0 # Add sound await playlist_repository.add_sound_to_playlist(playlist_id, sound_id) # Check sounds sounds = await playlist_repository.get_playlist_sounds(playlist_id) assert len(sounds) == 1 assert sounds[0].id == sound_id @pytest.mark.asyncio async def test_get_playlist_sound_count( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ensure_plans, ) -> None: """Test getting sound count in a playlist.""" # Create objects within this test from app.utils.auth import PasswordUtils user = User( email="test@example.com", name="Test User", password_hash=PasswordUtils.hash_password("password123"), role="user", is_active=True, plan_id=ensure_plans[0].id, credits=100, ) test_session.add(user) await test_session.commit() await test_session.refresh(user) user_id = user.id playlist = Playlist( user_id=user_id, name="Test Playlist", description="A test playlist", genre="test", is_main=False, is_current=False, is_deletable=True, ) test_session.add(playlist) sound = Sound( name="Test Sound", filename="test.mp3", type="SDB", duration=5000, size=1024, hash="test_hash", play_count=0, ) test_session.add(sound) await test_session.commit() await test_session.refresh(playlist) await test_session.refresh(sound) # Extract IDs before async calls playlist_id = playlist.id sound_id = sound.id # Initially empty count = await playlist_repository.get_playlist_sound_count(playlist_id) assert count == 0 # Add sound await playlist_repository.add_sound_to_playlist(playlist_id, sound_id) # Check count count = await playlist_repository.get_playlist_sound_count(playlist_id) assert count == 1 @pytest.mark.asyncio async def test_is_sound_in_playlist( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ensure_plans, ) -> None: """Test checking if sound is in playlist.""" # Create objects within this test from app.utils.auth import PasswordUtils user = User( email="test@example.com", name="Test User", password_hash=PasswordUtils.hash_password("password123"), role="user", is_active=True, plan_id=ensure_plans[0].id, credits=100, ) test_session.add(user) await test_session.commit() await test_session.refresh(user) user_id = user.id playlist = Playlist( user_id=user_id, name="Test Playlist", description="A test playlist", genre="test", is_main=False, is_current=False, is_deletable=True, ) test_session.add(playlist) sound = Sound( name="Test Sound", filename="test.mp3", type="SDB", duration=5000, size=1024, hash="test_hash", play_count=0, ) test_session.add(sound) await test_session.commit() await test_session.refresh(playlist) await test_session.refresh(sound) # Extract IDs before async calls playlist_id = playlist.id sound_id = sound.id # Initially not in playlist assert not await playlist_repository.is_sound_in_playlist( playlist_id, sound_id ) # Add sound await playlist_repository.add_sound_to_playlist(playlist_id, sound_id) # Now in playlist assert await playlist_repository.is_sound_in_playlist( playlist_id, sound_id ) @pytest.mark.asyncio async def test_reorder_playlist_sounds( self, playlist_repository: PlaylistRepository, test_session: AsyncSession, ensure_plans, ) -> None: """Test reordering sounds in a playlist.""" # Create objects within this test from app.utils.auth import PasswordUtils user = User( email="test@example.com", name="Test User", password_hash=PasswordUtils.hash_password("password123"), role="user", is_active=True, plan_id=ensure_plans[0].id, credits=100, ) test_session.add(user) await test_session.commit() await test_session.refresh(user) user_id = user.id playlist = Playlist( user_id=user_id, name="Test Playlist", description="A test playlist", genre="test", is_main=False, is_current=False, is_deletable=True, ) test_session.add(playlist) # Create multiple sounds sound1 = Sound(name="Sound 1", filename="sound1.mp3", type="SDB", hash="hash1") sound2 = Sound(name="Sound 2", filename="sound2.mp3", type="SDB", hash="hash2") test_session.add_all([playlist, sound1, sound2]) await test_session.commit() await test_session.refresh(playlist) await test_session.refresh(sound1) await test_session.refresh(sound2) # Extract IDs before async calls playlist_id = playlist.id sound1_id = sound1.id sound2_id = sound2.id # Add sounds to playlist await playlist_repository.add_sound_to_playlist( playlist_id, sound1_id, position=0 ) await playlist_repository.add_sound_to_playlist( playlist_id, sound2_id, position=1 ) # Reorder sounds - use different positions to avoid constraint issues sound_positions = [(sound1_id, 10), (sound2_id, 5)] await playlist_repository.reorder_playlist_sounds( playlist_id, sound_positions ) # Verify new order sounds = await playlist_repository.get_playlist_sounds(playlist_id) assert len(sounds) == 2 assert sounds[0].id == sound2_id # sound2 now at position 5 assert sounds[1].id == sound1_id # sound1 now at position 10