feat: Implement search and sorting functionality for playlists in API and repository
Some checks failed
Backend CI / lint (push) Failing after 4m54s
Backend CI / test (push) Failing after 4m25s

This commit is contained in:
JSC
2025-08-10 19:30:14 +02:00
parent aa9a73ac1d
commit 357fbcecac
3 changed files with 227 additions and 7 deletions

View File

@@ -1,19 +1,39 @@
"""Playlist repository for database operations."""
from enum import Enum
from sqlalchemy import func
from sqlalchemy.orm import selectinload
from sqlmodel import select
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.logging import get_logger
from app.models.playlist import Playlist
from app.models.playlist_sound import PlaylistSound
from app.models.sound import Sound
from app.models.user import User
from app.repositories.base import BaseRepository
logger = get_logger(__name__)
class PlaylistSortField(str, Enum):
"""Playlist sort field enumeration."""
NAME = "name"
GENRE = "genre"
CREATED_AT = "created_at"
UPDATED_AT = "updated_at"
SOUND_COUNT = "sound_count"
TOTAL_DURATION = "total_duration"
class SortOrder(str, Enum):
"""Sort order enumeration."""
ASC = "asc"
DESC = "desc"
class PlaylistRepository(BaseRepository[Playlist]):
"""Repository for playlist operations."""
@@ -237,3 +257,153 @@ class PlaylistRepository(BaseRepository[Playlist]):
playlist_id,
)
raise
async def search_and_sort(
self,
search_query: str | None = None,
sort_by: PlaylistSortField | None = None,
sort_order: SortOrder = SortOrder.ASC,
user_id: int | None = None,
include_stats: bool = False,
limit: int | None = None,
offset: int = 0,
) -> list[dict]:
"""Search and sort playlists with optional statistics."""
try:
if include_stats and sort_by in (PlaylistSortField.SOUND_COUNT, PlaylistSortField.TOTAL_DURATION):
# Use subquery for sorting by stats
subquery = (
select(
Playlist.id,
Playlist.name,
Playlist.description,
Playlist.genre,
Playlist.user_id,
Playlist.is_main,
Playlist.is_current,
Playlist.is_deletable,
Playlist.created_at,
Playlist.updated_at,
func.count(PlaylistSound.id).label("sound_count"),
func.coalesce(func.sum(Sound.duration), 0).label("total_duration"),
User.name.label("user_name"),
)
.select_from(Playlist)
.join(User, Playlist.user_id == User.id, isouter=True)
.join(PlaylistSound, Playlist.id == PlaylistSound.playlist_id, isouter=True)
.join(Sound, PlaylistSound.sound_id == Sound.id, isouter=True)
.group_by(Playlist.id, User.name)
)
# Apply filters
if search_query and search_query.strip():
search_pattern = f"%{search_query.strip().lower()}%"
subquery = subquery.where(func.lower(Playlist.name).like(search_pattern))
if user_id is not None:
subquery = subquery.where(Playlist.user_id == user_id)
# Apply sorting
if sort_by == PlaylistSortField.SOUND_COUNT:
if sort_order == SortOrder.DESC:
subquery = subquery.order_by(func.count(PlaylistSound.id).desc())
else:
subquery = subquery.order_by(func.count(PlaylistSound.id).asc())
elif sort_by == PlaylistSortField.TOTAL_DURATION:
if sort_order == SortOrder.DESC:
subquery = subquery.order_by(func.coalesce(func.sum(Sound.duration), 0).desc())
else:
subquery = subquery.order_by(func.coalesce(func.sum(Sound.duration), 0).asc())
else:
# Default sorting by name
subquery = subquery.order_by(Playlist.name.asc())
else:
# Simple query without stats-based sorting
subquery = (
select(
Playlist.id,
Playlist.name,
Playlist.description,
Playlist.genre,
Playlist.user_id,
Playlist.is_main,
Playlist.is_current,
Playlist.is_deletable,
Playlist.created_at,
Playlist.updated_at,
func.count(PlaylistSound.id).label("sound_count"),
func.coalesce(func.sum(Sound.duration), 0).label("total_duration"),
User.name.label("user_name"),
)
.select_from(Playlist)
.join(User, Playlist.user_id == User.id, isouter=True)
.join(PlaylistSound, Playlist.id == PlaylistSound.playlist_id, isouter=True)
.join(Sound, PlaylistSound.sound_id == Sound.id, isouter=True)
.group_by(Playlist.id, User.name)
)
# Apply filters
if search_query and search_query.strip():
search_pattern = f"%{search_query.strip().lower()}%"
subquery = subquery.where(func.lower(Playlist.name).like(search_pattern))
if user_id is not None:
subquery = subquery.where(Playlist.user_id == user_id)
# Apply sorting
if sort_by:
if sort_by == PlaylistSortField.NAME:
sort_column = Playlist.name
elif sort_by == PlaylistSortField.GENRE:
sort_column = Playlist.genre
elif sort_by == PlaylistSortField.CREATED_AT:
sort_column = Playlist.created_at
elif sort_by == PlaylistSortField.UPDATED_AT:
sort_column = Playlist.updated_at
else:
sort_column = Playlist.name
if sort_order == SortOrder.DESC:
subquery = subquery.order_by(sort_column.desc())
else:
subquery = subquery.order_by(sort_column.asc())
else:
# Default sorting by name ascending
subquery = subquery.order_by(Playlist.name.asc())
# Apply pagination
if offset > 0:
subquery = subquery.offset(offset)
if limit is not None:
subquery = subquery.limit(limit)
result = await self.session.exec(subquery)
rows = result.all()
# Convert to dictionary format
playlists = []
for row in rows:
playlists.append({
"id": row.id,
"name": row.name,
"description": row.description,
"genre": row.genre,
"user_id": row.user_id,
"user_name": row.user_name,
"is_main": row.is_main,
"is_current": row.is_current,
"is_deletable": row.is_deletable,
"created_at": row.created_at,
"updated_at": row.updated_at,
"sound_count": row.sound_count or 0,
"total_duration": row.total_duration or 0,
})
return playlists
except Exception:
logger.exception(
"Failed to search and sort playlists: query=%s, sort_by=%s, sort_order=%s",
search_query, sort_by, sort_order
)
raise