"""Playlist model for managing sound playlists.""" from datetime import datetime from typing import TYPE_CHECKING, Optional from zoneinfo import ZoneInfo from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String, Text from sqlalchemy.orm import Mapped, mapped_column, relationship from app.database import db if TYPE_CHECKING: from app.models.playlist_sound import PlaylistSound from app.models.user import User class Playlist(db.Model): """Model for playlists containing sounds.""" __tablename__ = "playlist" id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String(255), nullable=False) description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) genre: Mapped[Optional[str]] = mapped_column(String(100), nullable=True) user_id: Mapped[Optional[int]] = mapped_column( Integer, ForeignKey("user.id"), nullable=True ) is_main: Mapped[bool] = mapped_column( Boolean, default=False, nullable=False ) is_deletable: Mapped[bool] = mapped_column( Boolean, default=True, nullable=True ) is_current: Mapped[bool] = mapped_column( Boolean, default=False, nullable=False ) created_at: Mapped[datetime] = mapped_column( DateTime, default=lambda: datetime.now(tz=ZoneInfo("UTC")), nullable=False, ) updated_at: Mapped[datetime] = mapped_column( DateTime, default=lambda: datetime.now(tz=ZoneInfo("UTC")), onupdate=lambda: datetime.now(tz=ZoneInfo("UTC")), nullable=False, ) # Relationships user: Mapped[Optional["User"]] = relationship( "User", back_populates="playlists" ) playlist_sounds: Mapped[list["PlaylistSound"]] = relationship( "PlaylistSound", back_populates="playlist", cascade="all, delete-orphan" ) def __repr__(self) -> str: """String representation of the playlist.""" return f"" def to_dict(self) -> dict: """Convert playlist to dictionary representation.""" return { "id": self.id, "name": self.name, "description": self.description, "genre": self.genre, "user_id": self.user_id, "is_main": self.is_main, "is_deletable": self.is_deletable, "is_current": self.is_current, "created_at": ( self.created_at.isoformat() if self.created_at else None ), "updated_at": ( self.updated_at.isoformat() if self.updated_at else None ), "sound_count": ( len(self.playlist_sounds) if self.playlist_sounds else 0 ), } def to_detailed_dict(self) -> dict: """Convert playlist to detailed dictionary with sounds.""" playlist_dict = self.to_dict() playlist_dict["sounds"] = [ { "sound": ps.sound.to_dict() if ps.sound else None, "order": ps.order, "added_at": ps.added_at.isoformat() if ps.added_at else None, } for ps in sorted(self.playlist_sounds, key=lambda x: x.order) ] return playlist_dict @classmethod def create_playlist( cls, name: str, description: Optional[str] = None, genre: Optional[str] = None, user_id: Optional[int] = None, is_main: bool = False, is_deletable: bool = True, is_current: bool = False, commit: bool = True, ) -> "Playlist": """Create a new playlist.""" playlist = cls( name=name, description=description, genre=genre, user_id=user_id, is_main=is_main, is_deletable=is_deletable, is_current=is_current, ) db.session.add(playlist) if commit: db.session.commit() return playlist @classmethod def find_by_name( cls, name: str, user_id: Optional[int] = None ) -> Optional["Playlist"]: """Find playlist by name, optionally filtered by user.""" query = cls.query.filter_by(name=name) if user_id is not None: query = query.filter_by(user_id=user_id) return query.first() @classmethod def find_by_user(cls, user_id: int) -> list["Playlist"]: """Find all playlists for a user.""" return cls.query.filter_by(user_id=user_id).order_by(cls.name).all() @classmethod def find_system_playlists(cls) -> list["Playlist"]: """Find all system playlists (user_id is None).""" return cls.query.filter_by(user_id=None).order_by(cls.name).all() @classmethod def find_current_playlist( cls, user_id: Optional[int] = None ) -> Optional["Playlist"]: """Find the current active playlist.""" query = cls.query.filter_by(is_current=True) if user_id is not None: query = query.filter_by(user_id=user_id) return query.first() @classmethod def find_main_playlist( cls, user_id: Optional[int] = None ) -> Optional["Playlist"]: """Find the main playlist.""" query = cls.query.filter_by(is_main=True) if user_id is not None: query = query.filter_by(user_id=user_id) return query.first() def set_as_current(self, commit: bool = True) -> None: """Set this playlist as the current one and unset others.""" # Unset other current playlists for the same user/system if self.user_id is not None: Playlist.query.filter_by( user_id=self.user_id, is_current=True ).update({"is_current": False}) else: Playlist.query.filter_by(user_id=None, is_current=True).update( {"is_current": False} ) self.is_current = True if commit: db.session.commit() def add_sound( self, sound_id: int, order: Optional[int] = None, commit: bool = True ) -> "PlaylistSound": """Add a sound to the playlist.""" from app.models.playlist_sound import PlaylistSound if order is None: # Get the next order number max_order = ( db.session.query(db.func.max(PlaylistSound.order)) .filter_by(playlist_id=self.id) .scalar() ) order = (max_order or 0) + 1 playlist_sound = PlaylistSound( playlist_id=self.id, sound_id=sound_id, order=order ) db.session.add(playlist_sound) if commit: db.session.commit() return playlist_sound def remove_sound(self, sound_id: int, commit: bool = True) -> bool: """Remove a sound from the playlist.""" from app.models.playlist_sound import PlaylistSound playlist_sound = PlaylistSound.query.filter_by( playlist_id=self.id, sound_id=sound_id ).first() if playlist_sound: db.session.delete(playlist_sound) if commit: db.session.commit() return True return False def reorder_sounds( self, sound_orders: list[dict], commit: bool = True ) -> None: """Reorder sounds in the playlist. Args: sound_orders: List of dicts with 'sound_id' and 'order' keys """ from app.models.playlist_sound import PlaylistSound for item in sound_orders: playlist_sound = PlaylistSound.query.filter_by( playlist_id=self.id, sound_id=item["sound_id"] ).first() if playlist_sound: playlist_sound.order = item["order"] if commit: db.session.commit() def get_total_duration(self) -> int: """Get total duration of all sounds in the playlist in milliseconds.""" from app.models.sound import Sound total = ( db.session.query(db.func.sum(Sound.duration)) .join(self.playlist_sounds) .filter(Sound.id.in_([ps.sound_id for ps in self.playlist_sounds])) .scalar() ) return total or 0 def duplicate( self, new_name: str, user_id: Optional[int] = None, commit: bool = True ) -> "Playlist": """Create a duplicate of this playlist.""" new_playlist = Playlist.create_playlist( name=new_name, description=self.description, genre=self.genre, user_id=user_id, is_main=False, is_deletable=True, is_current=False, commit=commit, ) # Copy all sounds with their order for ps in self.playlist_sounds: new_playlist.add_sound(ps.sound_id, ps.order, commit=False) if commit: db.session.commit() return new_playlist