"""Playlist-Sound relationship model for managing sound order in playlists.""" from datetime import datetime from typing import TYPE_CHECKING, Optional from zoneinfo import ZoneInfo from sqlalchemy import DateTime, ForeignKey, Integer, UniqueConstraint from sqlalchemy.orm import Mapped, mapped_column, relationship from app.database import db if TYPE_CHECKING: from app.models.playlist import Playlist from app.models.sound import Sound class PlaylistSound(db.Model): """Model for playlist-sound relationships with ordering.""" __tablename__ = "playlist_sound" id: Mapped[int] = mapped_column(Integer, primary_key=True) playlist_id: Mapped[int] = mapped_column( Integer, ForeignKey("playlist.id"), nullable=False ) sound_id: Mapped[int] = mapped_column( Integer, ForeignKey("sound.id"), nullable=False ) order: Mapped[int] = mapped_column(Integer, nullable=False) added_at: Mapped[datetime] = mapped_column( DateTime, default=lambda: datetime.now(tz=ZoneInfo("UTC")), nullable=False, ) # Relationships playlist: Mapped["Playlist"] = relationship( "Playlist", back_populates="playlist_sounds" ) sound: Mapped["Sound"] = relationship( "Sound", back_populates="playlist_sounds" ) # Constraints __table_args__ = ( UniqueConstraint( "playlist_id", "sound_id", name="unique_playlist_sound" ), UniqueConstraint("playlist_id", "order", name="unique_playlist_order"), ) def __repr__(self) -> str: """String representation of the playlist-sound relationship.""" return f"" def to_dict(self) -> dict: """Convert playlist-sound relationship to dictionary representation.""" return { "id": self.id, "playlist_id": self.playlist_id, "sound_id": self.sound_id, "order": self.order, "added_at": self.added_at.isoformat() if self.added_at else None, "sound": self.sound.to_dict() if self.sound else None, } @classmethod def create_playlist_sound( cls, playlist_id: int, sound_id: int, order: int, commit: bool = True, ) -> "PlaylistSound": """Create a new playlist-sound relationship.""" playlist_sound = cls( playlist_id=playlist_id, sound_id=sound_id, order=order, ) db.session.add(playlist_sound) if commit: db.session.commit() return playlist_sound @classmethod def find_by_playlist(cls, playlist_id: int) -> list["PlaylistSound"]: """Find all sounds in a playlist ordered by their position.""" return ( cls.query.filter_by(playlist_id=playlist_id) .order_by(cls.order) .all() ) @classmethod def find_by_sound(cls, sound_id: int) -> list["PlaylistSound"]: """Find all playlists containing a specific sound.""" return cls.query.filter_by(sound_id=sound_id).all() @classmethod def find_by_playlist_and_sound( cls, playlist_id: int, sound_id: int ) -> Optional["PlaylistSound"]: """Find a specific playlist-sound relationship.""" return cls.query.filter_by( playlist_id=playlist_id, sound_id=sound_id ).first() @classmethod def get_next_order(cls, playlist_id: int) -> int: """Get the next order number for a playlist.""" max_order = ( db.session.query(db.func.max(cls.order)) .filter_by(playlist_id=playlist_id) .scalar() ) return (max_order or 0) + 1 @classmethod def reorder_playlist( cls, playlist_id: int, sound_orders: list[dict], commit: bool = True ) -> None: """Reorder all sounds in a playlist. Args: playlist_id: ID of the playlist sound_orders: List of dicts with 'sound_id' and 'order' keys """ for item in sound_orders: playlist_sound = cls.query.filter_by( playlist_id=playlist_id, sound_id=item["sound_id"] ).first() if playlist_sound: playlist_sound.order = item["order"] if commit: db.session.commit() def move_to_position(self, new_order: int, commit: bool = True) -> None: """Move this sound to a new position in the playlist.""" old_order = self.order if new_order == old_order: return # Get all other sounds in the playlist other_sounds = ( PlaylistSound.query.filter_by(playlist_id=self.playlist_id) .filter(PlaylistSound.id != self.id) .order_by(PlaylistSound.order) .all() ) # Remove this sound from its current position remaining_sounds = [ps for ps in other_sounds if ps.order != old_order] # Insert at new position if new_order <= len(remaining_sounds): remaining_sounds.insert(new_order - 1, self) else: remaining_sounds.append(self) # Update all order values for i, ps in enumerate(remaining_sounds, 1): ps.order = i if commit: db.session.commit() def get_previous_sound(self) -> Optional["PlaylistSound"]: """Get the previous sound in the playlist.""" return ( PlaylistSound.query.filter_by(playlist_id=self.playlist_id) .filter(PlaylistSound.order < self.order) .order_by(PlaylistSound.order.desc()) .first() ) def get_next_sound(self) -> Optional["PlaylistSound"]: """Get the next sound in the playlist.""" return ( PlaylistSound.query.filter_by(playlist_id=self.playlist_id) .filter(PlaylistSound.order > self.order) .order_by(PlaylistSound.order.asc()) .first() )