1059 lines
33 KiB
Python
1059 lines
33 KiB
Python
"""Tests for playlist repository."""
|
|
|
|
from collections.abc import AsyncGenerator
|
|
from typing import Any
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from app.models.playlist import Playlist
|
|
from app.models.sound import Sound
|
|
from app.models.user import User
|
|
from app.repositories.playlist import PlaylistRepository
|
|
from app.utils.auth import PasswordUtils
|
|
|
|
# Constants
|
|
TEST_POSITION = 5
|
|
TEST_TOTAL_SOUNDS = 3
|
|
ONE_PLAYLIST = 1
|
|
ZERO_SOUNDS = 0
|
|
ONE_SOUND = 1
|
|
TWO_SOUNDS = 2
|
|
DEFAULT_POSITION = 0
|
|
|
|
|
|
class TestPlaylistRepository:
|
|
"""Test playlist repository operations."""
|
|
|
|
@pytest_asyncio.fixture
|
|
async def playlist_repository(
|
|
self,
|
|
test_session: AsyncSession,
|
|
) -> AsyncGenerator[PlaylistRepository, None]:
|
|
"""Create a playlist repository instance."""
|
|
yield PlaylistRepository(test_session)
|
|
|
|
@pytest_asyncio.fixture
|
|
async def test_playlist(
|
|
self,
|
|
test_session: AsyncSession,
|
|
test_user: User,
|
|
) -> Playlist:
|
|
"""Create a test playlist."""
|
|
playlist = Playlist(
|
|
user_id=test_user.id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
return playlist
|
|
|
|
@pytest_asyncio.fixture
|
|
async def main_playlist(
|
|
self,
|
|
test_session: AsyncSession,
|
|
) -> Playlist:
|
|
"""Create a main playlist."""
|
|
playlist = Playlist(
|
|
user_id=None,
|
|
name="Main Playlist",
|
|
description="Main playlist",
|
|
is_main=True,
|
|
is_current=False,
|
|
is_deletable=False,
|
|
)
|
|
test_session.add(playlist)
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
return playlist
|
|
|
|
@pytest_asyncio.fixture
|
|
async def test_sound(
|
|
self,
|
|
test_session: AsyncSession,
|
|
) -> Sound:
|
|
"""Create a test sound."""
|
|
sound = Sound(
|
|
name="Test Sound",
|
|
filename="test.mp3",
|
|
type="SDB",
|
|
duration=5000,
|
|
size=1024,
|
|
hash="test_hash",
|
|
play_count=0,
|
|
)
|
|
test_session.add(sound)
|
|
await test_session.commit()
|
|
await test_session.refresh(sound)
|
|
return sound
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_id_existing(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_playlist: Playlist,
|
|
) -> None:
|
|
"""Test getting playlist by ID when playlist exists."""
|
|
assert test_playlist.id is not None
|
|
playlist = await playlist_repository.get_by_id(test_playlist.id)
|
|
|
|
assert playlist is not None
|
|
assert playlist.id == test_playlist.id
|
|
assert playlist.name == test_playlist.name
|
|
assert playlist.description == test_playlist.description
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_id_nonexistent(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
) -> None:
|
|
"""Test getting playlist by ID when playlist doesn't exist."""
|
|
playlist = await playlist_repository.get_by_id(99999)
|
|
assert playlist is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_name_existing(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_playlist: Playlist,
|
|
) -> None:
|
|
"""Test getting playlist by name when playlist exists."""
|
|
playlist = await playlist_repository.get_by_name(test_playlist.name)
|
|
|
|
assert playlist is not None
|
|
assert playlist.id == test_playlist.id
|
|
assert playlist.name == test_playlist.name
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_name_nonexistent(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
) -> None:
|
|
"""Test getting playlist by name when playlist doesn't exist."""
|
|
playlist = await playlist_repository.get_by_name("Nonexistent Playlist")
|
|
assert playlist is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_user_id(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test getting playlists by user ID."""
|
|
# Create test user within this test
|
|
user = User(
|
|
email="test@example.com",
|
|
name="Test User",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
|
|
# Extract user ID immediately after refresh
|
|
user_id = user.id
|
|
|
|
# Create test playlist for this user
|
|
playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
await test_session.commit()
|
|
|
|
# Test the repository method
|
|
playlists = await playlist_repository.get_by_user_id(user_id)
|
|
|
|
# Should only return user's playlists, not the main playlist (user_id=None)
|
|
assert len(playlists) == ONE_PLAYLIST
|
|
assert playlists[0].name == "Test Playlist"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_main_playlist(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
) -> None:
|
|
"""Test getting main playlist."""
|
|
# Create main playlist within this test
|
|
main_playlist = Playlist(
|
|
user_id=None,
|
|
name="Main Playlist",
|
|
description="Main playlist",
|
|
is_main=True,
|
|
is_current=False,
|
|
is_deletable=False,
|
|
)
|
|
test_session.add(main_playlist)
|
|
await test_session.commit()
|
|
await test_session.refresh(main_playlist)
|
|
|
|
# Extract ID before async call
|
|
main_playlist_id = main_playlist.id
|
|
|
|
# Test the repository method
|
|
playlist = await playlist_repository.get_main_playlist()
|
|
|
|
assert playlist is not None
|
|
assert playlist.id == main_playlist_id
|
|
assert playlist.is_main is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_current_playlist(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test getting current playlist when none is set globally."""
|
|
# Test the repository method - should return None when no current playlist is set globally
|
|
playlist = await playlist_repository.get_current_playlist()
|
|
|
|
# Should return None since no playlist is marked as current globally
|
|
assert playlist is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_playlist(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_user: User,
|
|
) -> None:
|
|
"""Test creating a new playlist."""
|
|
playlist_data = {
|
|
"user_id": test_user.id,
|
|
"name": "New Playlist",
|
|
"description": "A new playlist",
|
|
"genre": "rock",
|
|
"is_main": False,
|
|
"is_current": False,
|
|
"is_deletable": True,
|
|
}
|
|
|
|
playlist = await playlist_repository.create(playlist_data)
|
|
|
|
assert playlist.id is not None
|
|
assert playlist.name == "New Playlist"
|
|
assert playlist.description == "A new playlist"
|
|
assert playlist.genre == "rock"
|
|
assert playlist.is_main is False
|
|
assert playlist.is_current is False
|
|
assert playlist.is_deletable is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_playlist(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_playlist: Playlist,
|
|
) -> None:
|
|
"""Test updating a playlist."""
|
|
update_data = {
|
|
"name": "Updated Playlist",
|
|
"description": "Updated description",
|
|
"genre": "jazz",
|
|
}
|
|
|
|
updated_playlist = await playlist_repository.update(test_playlist, update_data)
|
|
|
|
assert updated_playlist.name == "Updated Playlist"
|
|
assert updated_playlist.description == "Updated description"
|
|
assert updated_playlist.genre == "jazz"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_playlist(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_playlist: Playlist,
|
|
) -> None:
|
|
"""Test deleting a playlist."""
|
|
playlist_id = test_playlist.id
|
|
await playlist_repository.delete(test_playlist)
|
|
|
|
# Verify playlist is deleted
|
|
deleted_playlist = await playlist_repository.get_by_id(playlist_id)
|
|
assert deleted_playlist is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_by_name(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test searching playlists by name."""
|
|
# Create test user within this test
|
|
user = User(
|
|
email="test3@example.com",
|
|
name="Test User 3",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
|
|
# Extract user ID immediately after refresh
|
|
user_id = user.id
|
|
|
|
# Create test playlist
|
|
test_playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(test_playlist)
|
|
|
|
# Create main playlist
|
|
main_playlist = Playlist(
|
|
user_id=None,
|
|
name="Main Playlist",
|
|
description="Main playlist",
|
|
is_main=True,
|
|
is_current=False,
|
|
is_deletable=False,
|
|
)
|
|
test_session.add(main_playlist)
|
|
await test_session.commit()
|
|
|
|
# Search for all playlists (no user filter)
|
|
all_results = await playlist_repository.search_by_name("playlist")
|
|
assert len(all_results) >= 2 # Should include both user and main playlists
|
|
|
|
# Search with user filter
|
|
user_results = await playlist_repository.search_by_name("playlist", user_id)
|
|
# Only user's playlists, not main playlist
|
|
assert len(user_results) == ONE_PLAYLIST
|
|
|
|
# Search for specific playlist
|
|
test_results = await playlist_repository.search_by_name("test", user_id)
|
|
assert len(test_results) == ONE_PLAYLIST
|
|
assert test_results[0].name == "Test Playlist"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_sound_to_playlist(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test adding a sound to a playlist."""
|
|
# Create test user within this test
|
|
user = User(
|
|
email="test4@example.com",
|
|
name="Test User 4",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
|
|
# Create test playlist
|
|
playlist = Playlist(
|
|
user_id=user.id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
|
|
# Create test sound
|
|
sound = Sound(
|
|
name="Test Sound",
|
|
filename="test.mp3",
|
|
type="SDB",
|
|
duration=5000,
|
|
size=1024,
|
|
hash="test_hash",
|
|
play_count=0,
|
|
)
|
|
test_session.add(sound)
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
await test_session.refresh(sound)
|
|
|
|
# Extract IDs before async call
|
|
playlist_id = playlist.id
|
|
sound_id = sound.id
|
|
|
|
# Test the repository method
|
|
playlist_sound = await playlist_repository.add_sound_to_playlist(
|
|
playlist_id,
|
|
sound_id,
|
|
)
|
|
|
|
assert playlist_sound.playlist_id == playlist_id
|
|
assert playlist_sound.sound_id == sound_id
|
|
assert playlist_sound.position == DEFAULT_POSITION
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_sound_to_playlist_with_position(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test adding a sound to a playlist with specific position."""
|
|
# Create test user within this test
|
|
user = User(
|
|
email="test5@example.com",
|
|
name="Test User 5",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
|
|
# Extract user ID immediately after refresh
|
|
user_id = user.id
|
|
|
|
# Create test playlist
|
|
playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
|
|
# Create test sound
|
|
sound = Sound(
|
|
name="Test Sound",
|
|
filename="test.mp3",
|
|
type="SDB",
|
|
duration=5000,
|
|
size=1024,
|
|
hash="test_hash",
|
|
play_count=0,
|
|
)
|
|
test_session.add(sound)
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
await test_session.refresh(sound)
|
|
|
|
# Extract IDs before async call
|
|
playlist_id = playlist.id
|
|
sound_id = sound.id
|
|
|
|
# Test the repository method
|
|
playlist_sound = await playlist_repository.add_sound_to_playlist(
|
|
playlist_id,
|
|
sound_id,
|
|
position=5,
|
|
)
|
|
|
|
assert playlist_sound.position == TEST_POSITION
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_sound_to_playlist_with_position_shifting(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test adding a sound to a playlist with position shifting when positions are occupied."""
|
|
# Create test user
|
|
user = User(
|
|
email="test_shifting@example.com",
|
|
name="Test User Shifting",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
user_id = user.id
|
|
|
|
# Create test playlist
|
|
playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist Shifting",
|
|
description="A test playlist for position shifting",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
|
|
# Create multiple sounds
|
|
sounds = []
|
|
for i in range(3):
|
|
sound = Sound(
|
|
name=f"Test Sound {i}",
|
|
filename=f"test_{i}.mp3",
|
|
type="SDB",
|
|
duration=5000,
|
|
size=1024,
|
|
hash=f"test_hash_{i}",
|
|
play_count=0,
|
|
)
|
|
test_session.add(sound)
|
|
sounds.append(sound)
|
|
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
for sound in sounds:
|
|
await test_session.refresh(sound)
|
|
|
|
playlist_id = playlist.id
|
|
sound_ids = [s.id for s in sounds]
|
|
|
|
# Add first two sounds sequentially (positions 0, 1)
|
|
await playlist_repository.add_sound_to_playlist(playlist_id, sound_ids[0]) # position 0
|
|
await playlist_repository.add_sound_to_playlist(playlist_id, sound_ids[1]) # position 1
|
|
|
|
# Now insert third sound at position 1 - should shift existing sound at position 1 to position 2
|
|
await playlist_repository.add_sound_to_playlist(playlist_id, sound_ids[2], position=1)
|
|
|
|
# Verify the final positions
|
|
playlist_sounds = await playlist_repository.get_playlist_sound_entries(playlist_id)
|
|
|
|
assert len(playlist_sounds) == 3
|
|
assert playlist_sounds[0].sound_id == sound_ids[0] # Original sound 0 stays at position 0
|
|
assert playlist_sounds[0].position == 0
|
|
assert playlist_sounds[1].sound_id == sound_ids[2] # New sound 2 inserted at position 1
|
|
assert playlist_sounds[1].position == 1
|
|
assert playlist_sounds[2].sound_id == sound_ids[1] # Original sound 1 shifted to position 2
|
|
assert playlist_sounds[2].position == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_sound_to_playlist_at_position_zero(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test adding a sound at position 0 when playlist already has sounds."""
|
|
# Create test user
|
|
user = User(
|
|
email="test_position_zero@example.com",
|
|
name="Test User Position Zero",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
user_id = user.id
|
|
|
|
# Create test playlist
|
|
playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist Position Zero",
|
|
description="A test playlist for position zero insertion",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
|
|
# Create multiple sounds
|
|
sounds = []
|
|
for i in range(3):
|
|
sound = Sound(
|
|
name=f"Test Sound {i}",
|
|
filename=f"test_zero_{i}.mp3",
|
|
type="SDB",
|
|
duration=5000,
|
|
size=1024,
|
|
hash=f"test_hash_zero_{i}",
|
|
play_count=0,
|
|
)
|
|
test_session.add(sound)
|
|
sounds.append(sound)
|
|
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
for sound in sounds:
|
|
await test_session.refresh(sound)
|
|
|
|
playlist_id = playlist.id
|
|
sound_ids = [s.id for s in sounds]
|
|
|
|
# Add first two sounds sequentially (positions 0, 1)
|
|
await playlist_repository.add_sound_to_playlist(playlist_id, sound_ids[0]) # position 0
|
|
await playlist_repository.add_sound_to_playlist(playlist_id, sound_ids[1]) # position 1
|
|
|
|
# Now insert third sound at position 0 - should shift existing sounds to positions 1, 2
|
|
await playlist_repository.add_sound_to_playlist(playlist_id, sound_ids[2], position=0)
|
|
|
|
# Verify the final positions
|
|
playlist_sounds = await playlist_repository.get_playlist_sound_entries(playlist_id)
|
|
|
|
assert len(playlist_sounds) == 3
|
|
assert playlist_sounds[0].sound_id == sound_ids[2] # New sound 2 inserted at position 0
|
|
assert playlist_sounds[0].position == 0
|
|
assert playlist_sounds[1].sound_id == sound_ids[0] # Original sound 0 shifted to position 1
|
|
assert playlist_sounds[1].position == 1
|
|
assert playlist_sounds[2].sound_id == sound_ids[1] # Original sound 1 shifted to position 2
|
|
assert playlist_sounds[2].position == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_remove_sound_from_playlist(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test removing a sound from a playlist."""
|
|
# Create objects within this test
|
|
user = User(
|
|
email="test@example.com",
|
|
name="Test User",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
|
|
user_id = user.id
|
|
|
|
playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
|
|
sound = Sound(
|
|
name="Test Sound",
|
|
filename="test.mp3",
|
|
type="SDB",
|
|
duration=5000,
|
|
size=1024,
|
|
hash="test_hash",
|
|
play_count=0,
|
|
)
|
|
test_session.add(sound)
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
await test_session.refresh(sound)
|
|
|
|
# Extract IDs before async calls
|
|
playlist_id = playlist.id
|
|
sound_id = sound.id
|
|
|
|
# First add the sound
|
|
await playlist_repository.add_sound_to_playlist(playlist_id, sound_id)
|
|
|
|
# Verify it was added
|
|
assert await playlist_repository.is_sound_in_playlist(
|
|
playlist_id,
|
|
sound_id,
|
|
)
|
|
|
|
# Remove the sound
|
|
await playlist_repository.remove_sound_from_playlist(
|
|
playlist_id,
|
|
sound_id,
|
|
)
|
|
|
|
# Verify it was removed
|
|
assert not await playlist_repository.is_sound_in_playlist(
|
|
playlist_id,
|
|
sound_id,
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_playlist_sounds(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test getting sounds in a playlist."""
|
|
# Create objects within this test
|
|
user = User(
|
|
email="test@example.com",
|
|
name="Test User",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
|
|
user_id = user.id
|
|
|
|
playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
|
|
sound = Sound(
|
|
name="Test Sound",
|
|
filename="test.mp3",
|
|
type="SDB",
|
|
duration=5000,
|
|
size=1024,
|
|
hash="test_hash",
|
|
play_count=0,
|
|
)
|
|
test_session.add(sound)
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
await test_session.refresh(sound)
|
|
|
|
# Extract IDs before async calls
|
|
playlist_id = playlist.id
|
|
sound_id = sound.id
|
|
|
|
# Initially empty
|
|
sounds = await playlist_repository.get_playlist_sounds(playlist_id)
|
|
assert len(sounds) == ZERO_SOUNDS
|
|
|
|
# Add sound
|
|
await playlist_repository.add_sound_to_playlist(playlist_id, sound_id)
|
|
|
|
# Check sounds
|
|
sounds = await playlist_repository.get_playlist_sounds(playlist_id)
|
|
assert len(sounds) == ONE_SOUND
|
|
assert sounds[0].id == sound_id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_playlist_sound_count(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test getting sound count in a playlist."""
|
|
# Create objects within this test
|
|
user = User(
|
|
email="test@example.com",
|
|
name="Test User",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
|
|
user_id = user.id
|
|
|
|
playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
|
|
sound = Sound(
|
|
name="Test Sound",
|
|
filename="test.mp3",
|
|
type="SDB",
|
|
duration=5000,
|
|
size=1024,
|
|
hash="test_hash",
|
|
play_count=0,
|
|
)
|
|
test_session.add(sound)
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
await test_session.refresh(sound)
|
|
|
|
# Extract IDs before async calls
|
|
playlist_id = playlist.id
|
|
sound_id = sound.id
|
|
|
|
# Initially empty
|
|
count = await playlist_repository.get_playlist_sound_count(playlist_id)
|
|
assert count == 0
|
|
|
|
# Add sound
|
|
await playlist_repository.add_sound_to_playlist(playlist_id, sound_id)
|
|
|
|
# Check count
|
|
count = await playlist_repository.get_playlist_sound_count(playlist_id)
|
|
assert count == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_is_sound_in_playlist(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test checking if sound is in playlist."""
|
|
# Create objects within this test
|
|
user = User(
|
|
email="test@example.com",
|
|
name="Test User",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
|
|
user_id = user.id
|
|
|
|
playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
|
|
sound = Sound(
|
|
name="Test Sound",
|
|
filename="test.mp3",
|
|
type="SDB",
|
|
duration=5000,
|
|
size=1024,
|
|
hash="test_hash",
|
|
play_count=0,
|
|
)
|
|
test_session.add(sound)
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
await test_session.refresh(sound)
|
|
|
|
# Extract IDs before async calls
|
|
playlist_id = playlist.id
|
|
sound_id = sound.id
|
|
|
|
# Initially not in playlist
|
|
assert not await playlist_repository.is_sound_in_playlist(
|
|
playlist_id,
|
|
sound_id,
|
|
)
|
|
|
|
# Add sound
|
|
await playlist_repository.add_sound_to_playlist(playlist_id, sound_id)
|
|
|
|
# Now in playlist
|
|
assert await playlist_repository.is_sound_in_playlist(
|
|
playlist_id,
|
|
sound_id,
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reorder_playlist_sounds(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test reordering sounds in a playlist."""
|
|
# Create objects within this test
|
|
user = User(
|
|
email="test@example.com",
|
|
name="Test User",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
|
|
user_id = user.id
|
|
|
|
playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
|
|
# Create multiple sounds
|
|
sound1 = Sound(name="Sound 1", filename="sound1.mp3", type="SDB", hash="hash1")
|
|
sound2 = Sound(name="Sound 2", filename="sound2.mp3", type="SDB", hash="hash2")
|
|
test_session.add_all([playlist, sound1, sound2])
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
await test_session.refresh(sound1)
|
|
await test_session.refresh(sound2)
|
|
|
|
# Extract IDs before async calls
|
|
playlist_id = playlist.id
|
|
sound1_id = sound1.id
|
|
sound2_id = sound2.id
|
|
|
|
# Add sounds to playlist
|
|
await playlist_repository.add_sound_to_playlist(
|
|
playlist_id,
|
|
sound1_id,
|
|
position=0,
|
|
)
|
|
await playlist_repository.add_sound_to_playlist(
|
|
playlist_id,
|
|
sound2_id,
|
|
position=1,
|
|
)
|
|
|
|
# Reorder sounds - use different positions to avoid constraint issues
|
|
sound_positions = [(sound1_id, 10), (sound2_id, 5)]
|
|
await playlist_repository.reorder_playlist_sounds(
|
|
playlist_id,
|
|
sound_positions,
|
|
)
|
|
|
|
# Verify new order
|
|
sounds = await playlist_repository.get_playlist_sounds(playlist_id)
|
|
assert len(sounds) == TWO_SOUNDS
|
|
assert sounds[0].id == sound2_id # sound2 now at position 5
|
|
assert sounds[1].id == sound1_id # sound1 now at position 10
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reorder_playlist_sounds_position_swap(
|
|
self,
|
|
playlist_repository: PlaylistRepository,
|
|
test_session: AsyncSession,
|
|
ensure_plans: Any,
|
|
) -> None:
|
|
"""Test reordering sounds with position swapping (regression test)."""
|
|
# Create objects within this test
|
|
user = User(
|
|
email="test@example.com",
|
|
name="Test User",
|
|
password_hash=PasswordUtils.hash_password("password123"),
|
|
role="user",
|
|
is_active=True,
|
|
plan_id=ensure_plans[0].id,
|
|
credits=100,
|
|
)
|
|
test_session.add(user)
|
|
await test_session.commit()
|
|
await test_session.refresh(user)
|
|
|
|
user_id = user.id
|
|
|
|
playlist = Playlist(
|
|
user_id=user_id,
|
|
name="Test Playlist",
|
|
description="A test playlist",
|
|
genre="test",
|
|
is_main=False,
|
|
is_current=False,
|
|
is_deletable=True,
|
|
)
|
|
test_session.add(playlist)
|
|
|
|
# Create multiple sounds
|
|
sound1 = Sound(name="Sound 1", filename="sound1.mp3", type="SDB", hash="hash1")
|
|
sound2 = Sound(name="Sound 2", filename="sound2.mp3", type="SDB", hash="hash2")
|
|
test_session.add_all([playlist, sound1, sound2])
|
|
await test_session.commit()
|
|
await test_session.refresh(playlist)
|
|
await test_session.refresh(sound1)
|
|
await test_session.refresh(sound2)
|
|
|
|
# Extract IDs before async calls
|
|
playlist_id = playlist.id
|
|
sound1_id = sound1.id
|
|
sound2_id = sound2.id
|
|
|
|
# Add sounds to playlist at positions 0 and 1
|
|
await playlist_repository.add_sound_to_playlist(
|
|
playlist_id,
|
|
sound1_id,
|
|
position=0,
|
|
)
|
|
await playlist_repository.add_sound_to_playlist(
|
|
playlist_id,
|
|
sound2_id,
|
|
position=1,
|
|
)
|
|
|
|
# Verify initial order
|
|
sounds = await playlist_repository.get_playlist_sounds(playlist_id)
|
|
assert len(sounds) == TWO_SOUNDS
|
|
assert sounds[0].id == sound1_id # sound1 at position 0
|
|
assert sounds[1].id == sound2_id # sound2 at position 1
|
|
|
|
# Swap positions - this used to cause unique constraint violation
|
|
sound_positions = [(sound1_id, 1), (sound2_id, 0)]
|
|
await playlist_repository.reorder_playlist_sounds(
|
|
playlist_id,
|
|
sound_positions,
|
|
)
|
|
|
|
# Verify swapped order
|
|
sounds = await playlist_repository.get_playlist_sounds(playlist_id)
|
|
assert len(sounds) == TWO_SOUNDS
|
|
assert sounds[0].id == sound2_id # sound2 now at position 0
|
|
assert sounds[1].id == sound1_id # sound1 now at position 1
|