"""Sound played tracking model.""" from datetime import datetime from zoneinfo import ZoneInfo from sqlalchemy import DateTime, ForeignKey, Integer, func, text from sqlalchemy.orm import Mapped, mapped_column, relationship from app.database import db class SoundPlayed(db.Model): """Model to track when users play sounds.""" __tablename__ = "sound_played" id: Mapped[int] = mapped_column(primary_key=True) # Foreign keys user_id: Mapped[int] = mapped_column( Integer, ForeignKey("users.id"), nullable=False, ) sound_id: Mapped[int] = mapped_column( Integer, ForeignKey("sounds.id"), nullable=False, ) # Timestamp played_at: Mapped[datetime] = mapped_column( DateTime, default=lambda: datetime.now(tz=ZoneInfo("UTC")), nullable=False, ) # Relationships user: Mapped["User"] = relationship("User", backref="sounds_played") sound: Mapped["Sound"] = relationship("Sound", backref="play_history") def __repr__(self) -> str: """Return string representation of SoundPlayed.""" return ( f"" ) def to_dict(self) -> dict: """Convert sound played record to dictionary.""" return { "id": self.id, "user_id": self.user_id, "sound_id": self.sound_id, "played_at": self.played_at.isoformat(), "user": ( { "id": self.user.id, "name": self.user.name, "email": self.user.email, } if self.user else None ), "sound": ( { "id": self.sound.id, "name": self.sound.name, "filename": self.sound.filename, "type": self.sound.type, } if self.sound else None ), } @classmethod def create_play_record( cls, user_id: int, sound_id: int, *, commit: bool = True, ) -> "SoundPlayed": """Create a new sound played record.""" play_record = cls( user_id=user_id, sound_id=sound_id, ) db.session.add(play_record) if commit: db.session.commit() return play_record @classmethod def get_user_plays( cls, user_id: int, limit: int = 50, offset: int = 0, ) -> list["SoundPlayed"]: """Get recent plays for a specific user.""" return ( cls.query.filter_by(user_id=user_id) .order_by(cls.played_at.desc()) .offset(offset) .limit(limit) .all() ) @classmethod def get_sound_plays( cls, sound_id: int, limit: int = 50, offset: int = 0, ) -> list["SoundPlayed"]: """Get recent plays for a specific sound.""" return ( cls.query.filter_by(sound_id=sound_id) .order_by(cls.played_at.desc()) .offset(offset) .limit(limit) .all() ) @classmethod def get_recent_plays( cls, limit: int = 100, offset: int = 0, ) -> list["SoundPlayed"]: """Get recent plays across all users and sounds.""" return ( cls.query.order_by(cls.played_at.desc()) .offset(offset) .limit(limit) .all() ) @classmethod def get_user_play_count(cls, user_id: int) -> int: """Get total play count for a user.""" return cls.query.filter_by(user_id=user_id).count() @classmethod def get_sound_play_count(cls, sound_id: int) -> int: """Get total play count for a sound.""" return cls.query.filter_by(sound_id=sound_id).count() @classmethod def get_popular_sounds( cls, limit: int = 10, days: int | None = None, ) -> list[dict]: """Get most popular sounds with play counts.""" from app.models.sound import Sound query = ( db.session.query( cls.sound_id, func.count(cls.id).label("play_count"), func.max(cls.played_at).label("last_played"), ) .group_by(cls.sound_id) .order_by(func.count(cls.id).desc()) ) if days: query = query.filter( cls.played_at >= text(f"datetime('now', '-{days} days')"), ) results = query.limit(limit).all() # Get sound details popular_sounds = [] for result in results: sound = Sound.query.get(result.sound_id) if sound: popular_sounds.append( { "sound": sound.to_dict(), "play_count": result.play_count, "last_played": ( result.last_played.isoformat() if result.last_played else None ), }, ) return popular_sounds @classmethod def get_user_stats(cls, user_id: int) -> dict: """Get comprehensive stats for a user.""" from app.models.sound import Sound total_plays = cls.query.filter_by(user_id=user_id).count() if total_plays == 0: return { "total_plays": 0, "unique_sounds": 0, "favorite_sound": None, "first_play": None, "last_play": None, } # Get unique sounds count unique_sounds = ( db.session.query(cls.sound_id) .filter_by(user_id=user_id) .distinct() .count() ) # Get favorite sound favorite_query = ( db.session.query( cls.sound_id, func.count(cls.id).label("play_count"), ) .filter_by(user_id=user_id) .group_by(cls.sound_id) .order_by(func.count(cls.id).desc()) .first() ) favorite_sound = None if favorite_query: sound = Sound.query.get(favorite_query.sound_id) if sound: favorite_sound = { "sound": sound.to_dict(), "play_count": favorite_query.play_count, } # Get first and last play dates first_play = ( cls.query.filter_by(user_id=user_id) .order_by(cls.played_at.asc()) .first() ) last_play = ( cls.query.filter_by(user_id=user_id) .order_by(cls.played_at.desc()) .first() ) return { "total_plays": total_plays, "unique_sounds": unique_sounds, "favorite_sound": favorite_sound, "first_play": ( first_play.played_at.isoformat() if first_play else None ), "last_play": ( last_play.played_at.isoformat() if last_play else None ), }