Files
sdb2-backend/app/repositories/sound.py
JSC 5ed19c8f0f Add comprehensive tests for playlist service and refactor socket service tests
- Introduced a new test suite for the PlaylistService covering various functionalities including creation, retrieval, updating, and deletion of playlists.
- Added tests for handling sounds within playlists, ensuring correct behavior when adding/removing sounds and managing current playlists.
- Refactored socket service tests for improved readability by adjusting function signatures.
- Cleaned up unnecessary whitespace in sound normalizer and sound scanner tests for consistency.
- Enhanced audio utility tests to ensure accurate hash and size calculations, including edge cases for nonexistent files.
- Removed redundant blank lines in cookie utility tests for cleaner code.
2025-07-29 19:25:46 +02:00

150 lines
5.3 KiB
Python

"""Sound repository for database operations."""
from typing import Any
from sqlalchemy import desc, func
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.logging import get_logger
from app.models.sound import Sound
logger = get_logger(__name__)
class SoundRepository:
"""Repository for sound operations."""
def __init__(self, session: AsyncSession) -> None:
"""Initialize the sound repository."""
self.session = session
async def get_by_id(self, sound_id: int) -> Sound | None:
"""Get a sound by ID."""
try:
statement = select(Sound).where(Sound.id == sound_id)
result = await self.session.exec(statement)
return result.first()
except Exception:
logger.exception("Failed to get sound by ID: %s", sound_id)
raise
async def get_by_filename(self, filename: str) -> Sound | None:
"""Get a sound by filename."""
try:
statement = select(Sound).where(Sound.filename == filename)
result = await self.session.exec(statement)
return result.first()
except Exception:
logger.exception("Failed to get sound by filename: %s", filename)
raise
async def get_by_hash(self, hash_value: str) -> Sound | None:
"""Get a sound by hash."""
try:
statement = select(Sound).where(Sound.hash == hash_value)
result = await self.session.exec(statement)
return result.first()
except Exception:
logger.exception("Failed to get sound by hash")
raise
async def get_by_type(self, sound_type: str) -> list[Sound]:
"""Get all sounds by type."""
try:
statement = select(Sound).where(Sound.type == sound_type)
result = await self.session.exec(statement)
return list(result.all())
except Exception:
logger.exception("Failed to get sounds by type: %s", sound_type)
raise
async def create(self, sound_data: dict[str, Any]) -> Sound:
"""Create a new sound."""
try:
sound = Sound(**sound_data)
self.session.add(sound)
await self.session.commit()
await self.session.refresh(sound)
except Exception:
await self.session.rollback()
logger.exception("Failed to create sound")
raise
else:
logger.info("Created new sound: %s", sound.name)
return sound
async def update(self, sound: Sound, update_data: dict[str, Any]) -> Sound:
"""Update a sound."""
try:
for field, value in update_data.items():
setattr(sound, field, value)
await self.session.commit()
await self.session.refresh(sound)
except Exception:
await self.session.rollback()
logger.exception("Failed to update sound")
raise
else:
logger.info("Updated sound: %s", sound.name)
return sound
async def delete(self, sound: Sound) -> None:
"""Delete a sound."""
try:
await self.session.delete(sound)
await self.session.commit()
logger.info("Deleted sound: %s", sound.name)
except Exception:
await self.session.rollback()
logger.exception("Failed to delete sound")
raise
async def search_by_name(self, query: str) -> list[Sound]:
"""Search sounds by name (case-insensitive)."""
try:
statement = select(Sound).where(
func.lower(Sound.name).like(f"%{query.lower()}%"),
)
result = await self.session.exec(statement)
return list(result.all())
except Exception:
logger.exception("Failed to search sounds by name: %s", query)
raise
async def get_popular_sounds(self, limit: int = 10) -> list[Sound]:
"""Get the most played sounds."""
try:
statement = select(Sound).order_by(desc(Sound.play_count)).limit(limit)
result = await self.session.exec(statement)
return list(result.all())
except Exception:
logger.exception("Failed to get popular sounds")
raise
async def get_unnormalized_sounds(self) -> list[Sound]:
"""Get all sounds that haven't been normalized yet."""
try:
statement = select(Sound).where(Sound.is_normalized == False) # noqa: E712
result = await self.session.exec(statement)
return list(result.all())
except Exception:
logger.exception("Failed to get unnormalized sounds")
raise
async def get_unnormalized_sounds_by_type(self, sound_type: str) -> list[Sound]:
"""Get unnormalized sounds by type."""
try:
statement = select(Sound).where(
Sound.type == sound_type,
Sound.is_normalized == False, # noqa: E712
)
result = await self.session.exec(statement)
return list(result.all())
except Exception:
logger.exception(
"Failed to get unnormalized sounds by type: %s", sound_type
)
raise