Files
sdb2-backend/app/services/playlist.py
JSC d926779fe4
Some checks failed
Backend CI / lint (push) Failing after 5m7s
Backend CI / test (push) Successful in 5m14s
feat: Implement playlist reordering with position swapping and reload player on current playlist changes
2025-08-01 17:49:29 +02:00

384 lines
14 KiB
Python

"""Playlist service for business logic operations."""
from typing import Any
from fastapi import HTTPException, status
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.logging import get_logger
from app.models.playlist import Playlist
from app.models.sound import Sound
from app.repositories.playlist import PlaylistRepository
from app.repositories.sound import SoundRepository
logger = get_logger(__name__)
async def _reload_player_playlist() -> None:
"""Reload the player playlist after current playlist changes."""
try:
# Import here to avoid circular import issues
from app.services.player import get_player_service # noqa: PLC0415
player = get_player_service()
await player.reload_playlist()
logger.debug("Player playlist reloaded after current playlist change")
except Exception: # noqa: BLE001
# Don't fail the playlist operation if player reload fails
logger.warning("Failed to reload player playlist", exc_info=True)
async def _is_current_playlist(session: AsyncSession, playlist_id: int) -> bool:
"""Check if the given playlist is the current playlist."""
try:
from app.repositories.playlist import PlaylistRepository # noqa: PLC0415
playlist_repo = PlaylistRepository(session)
current_playlist = await playlist_repo.get_current_playlist()
return current_playlist is not None and current_playlist.id == playlist_id
except Exception: # noqa: BLE001
logger.warning("Failed to check if playlist is current", exc_info=True)
return False
class PlaylistService:
"""Service for playlist operations."""
def __init__(self, session: AsyncSession) -> None:
"""Initialize the playlist service."""
self.session = session
self.playlist_repo = PlaylistRepository(session)
self.sound_repo = SoundRepository(session)
async def get_playlist_by_id(self, playlist_id: int) -> Playlist:
"""Get a playlist by ID."""
playlist = await self.playlist_repo.get_by_id(playlist_id)
if not playlist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Playlist not found",
)
return playlist
async def get_user_playlists(self, user_id: int) -> list[Playlist]:
"""Get all playlists for a user."""
return await self.playlist_repo.get_by_user_id(user_id)
async def get_all_playlists(self) -> list[Playlist]:
"""Get all playlists from all users."""
return await self.playlist_repo.get_all()
async def get_main_playlist(self) -> Playlist:
"""Get the global main playlist."""
main_playlist = await self.playlist_repo.get_main_playlist()
if not main_playlist:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Main playlist not found. Make sure to run database seeding.",
)
return main_playlist
async def get_current_playlist(self) -> Playlist:
"""Get the global current playlist, fallback to main playlist."""
current_playlist = await self.playlist_repo.get_current_playlist()
if current_playlist:
return current_playlist
# Fallback to main playlist if no current playlist is set
return await self.get_main_playlist()
async def create_playlist( # noqa: PLR0913
self,
user_id: int,
name: str,
description: str | None = None,
genre: str | None = None,
*,
is_main: bool = False,
is_current: bool = False,
is_deletable: bool = True,
) -> Playlist:
"""Create a new playlist."""
# Check if name already exists for this user
existing_playlist = await self.playlist_repo.get_by_name(name)
if existing_playlist and existing_playlist.user_id == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A playlist with this name already exists",
)
# If this is set as current, unset the previous current playlist
if is_current:
await self._unset_current_playlist()
playlist_data = {
"user_id": user_id,
"name": name,
"description": description,
"genre": genre,
"is_main": is_main,
"is_current": is_current,
"is_deletable": is_deletable,
}
playlist = await self.playlist_repo.create(playlist_data)
logger.info("Created playlist '%s' for user %s", name, user_id)
# If this was set as current, reload player playlist
if is_current:
await _reload_player_playlist()
return playlist
async def update_playlist( # noqa: PLR0913
self,
playlist_id: int,
user_id: int,
*,
name: str | None = None,
description: str | None = None,
genre: str | None = None,
is_current: bool | None = None,
) -> Playlist:
"""Update a playlist."""
playlist = await self.get_playlist_by_id(playlist_id)
update_data: dict[str, Any] = {}
if name is not None:
# Check if new name conflicts with existing playlist
existing_playlist = await self.playlist_repo.get_by_name(name)
if (
existing_playlist
and existing_playlist.id != playlist_id
and existing_playlist.user_id == user_id
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A playlist with this name already exists",
)
update_data["name"] = name
if description is not None:
update_data["description"] = description
if genre is not None:
update_data["genre"] = genre
if is_current is not None:
if is_current:
await self._unset_current_playlist()
update_data["is_current"] = is_current
if update_data:
playlist = await self.playlist_repo.update(playlist, update_data)
logger.info("Updated playlist %s for user %s", playlist_id, user_id)
# If is_current was changed, reload player playlist
if "is_current" in update_data:
await _reload_player_playlist()
return playlist
async def delete_playlist(self, playlist_id: int, user_id: int) -> None:
"""Delete a playlist."""
playlist = await self.get_playlist_by_id(playlist_id)
if not playlist.is_deletable:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This playlist cannot be deleted",
)
# Check if this was the current playlist before deleting
was_current = playlist.is_current
await self.playlist_repo.delete(playlist)
logger.info("Deleted playlist %s for user %s", playlist_id, user_id)
# If the deleted playlist was current, reload player to use main playlist fallback
if was_current:
await _reload_player_playlist()
async def search_playlists(self, query: str, user_id: int) -> list[Playlist]:
"""Search user's playlists by name."""
return await self.playlist_repo.search_by_name(query, user_id)
async def search_all_playlists(self, query: str) -> list[Playlist]:
"""Search all playlists by name."""
return await self.playlist_repo.search_by_name(query)
async def get_playlist_sounds(self, playlist_id: int) -> list[Sound]:
"""Get all sounds in a playlist."""
await self.get_playlist_by_id(playlist_id) # Verify playlist exists
return await self.playlist_repo.get_playlist_sounds(playlist_id)
async def add_sound_to_playlist(
self,
playlist_id: int,
sound_id: int,
user_id: int,
position: int | None = None,
) -> None:
"""Add a sound to a playlist."""
# Verify playlist exists
await self.get_playlist_by_id(playlist_id)
# Verify sound exists
sound = await self.sound_repo.get_by_id(sound_id)
if not sound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Sound not found",
)
# Check if sound is already in playlist
if await self.playlist_repo.is_sound_in_playlist(playlist_id, sound_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Sound is already in this playlist",
)
await self.playlist_repo.add_sound_to_playlist(playlist_id, sound_id, position)
logger.info(
"Added sound %s to playlist %s for user %s",
sound_id,
playlist_id,
user_id,
)
# If this is the current playlist, reload player
if await _is_current_playlist(self.session, playlist_id):
await _reload_player_playlist()
async def remove_sound_from_playlist(
self,
playlist_id: int,
sound_id: int,
user_id: int,
) -> None:
"""Remove a sound from a playlist."""
# Verify playlist exists
await self.get_playlist_by_id(playlist_id)
# Check if sound is in playlist
if not await self.playlist_repo.is_sound_in_playlist(playlist_id, sound_id):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Sound not found in this playlist",
)
await self.playlist_repo.remove_sound_from_playlist(playlist_id, sound_id)
logger.info(
"Removed sound %s from playlist %s for user %s",
sound_id,
playlist_id,
user_id,
)
# If this is the current playlist, reload player
if await _is_current_playlist(self.session, playlist_id):
await _reload_player_playlist()
async def reorder_playlist_sounds(
self,
playlist_id: int,
user_id: int,
sound_positions: list[tuple[int, int]],
) -> None:
"""Reorder sounds in a playlist."""
# Verify playlist exists
await self.get_playlist_by_id(playlist_id)
# Validate all sounds are in the playlist
for sound_id, _ in sound_positions:
if not await self.playlist_repo.is_sound_in_playlist(playlist_id, sound_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Sound {sound_id} is not in this playlist",
)
await self.playlist_repo.reorder_playlist_sounds(playlist_id, sound_positions)
logger.info("Reordered sounds in playlist %s for user %s", playlist_id, user_id)
# If this is the current playlist, reload player
if await _is_current_playlist(self.session, playlist_id):
await _reload_player_playlist()
async def get_playlist_stats(self, playlist_id: int) -> dict[str, Any]:
"""Get statistics for a playlist."""
await self.get_playlist_by_id(playlist_id) # Verify playlist exists
sound_count = await self.playlist_repo.get_playlist_sound_count(playlist_id)
sounds = await self.playlist_repo.get_playlist_sounds(playlist_id)
total_duration = sum(sound.duration or 0 for sound in sounds)
total_plays = sum(sound.play_count or 0 for sound in sounds)
return {
"sound_count": sound_count,
"total_duration_ms": total_duration,
"total_play_count": total_plays,
}
async def add_sound_to_main_playlist(self, sound_id: int, user_id: int) -> None:
"""Add a sound to the global main playlist."""
main_playlist = await self.get_main_playlist()
if main_playlist.id is None:
msg = "Main playlist has no ID, cannot add sound"
raise ValueError(msg)
# Extract ID before async operations to avoid session issues
main_playlist_id = main_playlist.id
# Check if sound is already in main playlist
if not await self.playlist_repo.is_sound_in_playlist(
main_playlist_id,
sound_id,
):
await self.playlist_repo.add_sound_to_playlist(main_playlist_id, sound_id)
logger.info(
"Added sound %s to main playlist for user %s",
sound_id,
user_id,
)
# If main playlist is current, reload player
if await _is_current_playlist(self.session, main_playlist_id):
await _reload_player_playlist()
# Current playlist methods (global by default)
async def set_current_playlist(self, playlist_id: int) -> Playlist:
"""Set a playlist as the current playlist (app-wide)."""
playlist = await self.get_playlist_by_id(playlist_id)
# Unset any existing current playlist globally
await self._unset_current_playlist()
# Set new current playlist
playlist = await self.playlist_repo.update(playlist, {"is_current": True})
logger.info("Set playlist %s as current playlist", playlist_id)
# Reload player playlist to reflect the change
await _reload_player_playlist()
return playlist
async def unset_current_playlist(self) -> None:
"""Unset the current playlist (main playlist becomes fallback)."""
await self._unset_current_playlist()
logger.info("Unset current playlist, main playlist is now fallback")
# Reload player playlist to reflect the change (will fallback to main)
await _reload_player_playlist()
async def _unset_current_playlist(self) -> None:
"""Unset any current playlist globally."""
current_playlist = await self.playlist_repo.get_current_playlist()
if current_playlist:
await self.playlist_repo.update(current_playlist, {"is_current": False})