Compare commits

...

2 Commits

9 changed files with 525 additions and 75 deletions

View File

@@ -12,8 +12,8 @@ def init_database():
# Seed plans if they don't exist
seed_plans()
# Migrate existing users to have plans
migrate_users_to_plans()
# Create default Main playlist if it doesn't exist
seed_main_playlist()
def seed_plans():
@@ -55,69 +55,35 @@ def seed_plans():
print(f"Seeded {len(plans_data)} plans into database")
def migrate_users_to_plans():
"""Assign plans to existing users who don't have one."""
from app.models.user import User
def seed_main_playlist():
"""Create the default Main playlist if it doesn't exist."""
from app.models.playlist import Playlist
try:
# Find users without plans
users_without_plans = User.query.filter(User.plan_id.is_(None)).all()
# Check if Main playlist already exists
main_playlist = Playlist.query.filter_by(name="Main", user_id=None).first()
# Find users with plans but NULL credits (only if credits column exists)
# Note: We only migrate users with NULL credits, not 0 credits
# 0 credits means they spent them, NULL means they never got assigned
try:
users_without_credits = User.query.filter(
User.plan_id.isnot(None),
User.credits.is_(None),
).all()
except Exception:
# Credits column doesn't exist yet, will be handled by create_all
users_without_credits = []
if not users_without_plans and not users_without_credits:
return
# Get default and pro plans
default_plan = Plan.get_default_plan()
pro_plan = Plan.get_pro_plan()
# Get the first user (admin) from all users ordered by ID
first_user = User.query.order_by(User.id).first()
updated_count = 0
# Assign plans to users without plans
for user in users_without_plans:
# First user gets pro plan, others get free plan
if user.id == first_user.id:
user.plan_id = pro_plan.id
# Only set credits if the column exists
try:
user.credits = pro_plan.credits
except Exception:
pass
else:
user.plan_id = default_plan.id
# Only set credits if the column exists
try:
user.credits = default_plan.credits
except Exception:
pass
updated_count += 1
# Assign credits to users with plans but no credits
for user in users_without_credits:
user.credits = user.plan.credits
updated_count += 1
if updated_count > 0:
if main_playlist is None:
# Create the Main playlist
main_playlist = Playlist.create_playlist(
name="Main",
description="Default main playlist for all sounds",
genre=None,
user_id=None, # System playlist
is_main=True,
is_deletable=False,
is_current=True,
commit=True,
)
print("Created default Main playlist")
else:
# Ensure the existing Main playlist has correct properties
if (
not main_playlist.is_main
or main_playlist.is_deletable
or not main_playlist.is_current
):
main_playlist.is_main = True
main_playlist.is_deletable = False
main_playlist.is_current = True
db.session.commit()
print(
f"Updated {updated_count} existing users with plans and credits",
)
except Exception:
# If there's any error (like missing columns), just skip migration
# The database will be properly created by create_all()
pass
print("Updated existing Main playlist properties")

View File

@@ -1,8 +1,10 @@
"""Database models."""
from .plan import Plan
from .playlist import Playlist
from .playlist_sound import PlaylistSound
from .sound import Sound
from .user import User
from .user_oauth import UserOAuth
__all__ = ["Plan", "Sound", "User", "UserOAuth"]
__all__ = ["Plan", "Playlist", "PlaylistSound", "Sound", "User", "UserOAuth"]

View File

@@ -9,7 +9,7 @@ from app.database import db
class Plan(db.Model):
"""Plan model for user subscription plans."""
__tablename__ = "plans"
__tablename__ = "plan"
id = Column(Integer, primary_key=True)
code = Column(String(50), unique=True, nullable=False, index=True)

278
app/models/playlist.py Normal file
View File

@@ -0,0 +1,278 @@
"""Playlist model for managing sound playlists."""
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from zoneinfo import ZoneInfo
from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import db
if TYPE_CHECKING:
from app.models.playlist_sound import PlaylistSound
from app.models.user import User
class Playlist(db.Model):
"""Model for playlists containing sounds."""
__tablename__ = "playlist"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
genre: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
user_id: Mapped[Optional[int]] = mapped_column(
Integer, ForeignKey("user.id"), nullable=True
)
is_main: Mapped[bool] = mapped_column(
Boolean, default=False, nullable=False
)
is_deletable: Mapped[bool] = mapped_column(
Boolean, default=True, nullable=True
)
is_current: Mapped[bool] = mapped_column(
Boolean, default=False, nullable=False
)
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(tz=ZoneInfo("UTC")),
nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(tz=ZoneInfo("UTC")),
onupdate=lambda: datetime.now(tz=ZoneInfo("UTC")),
nullable=False,
)
# Relationships
user: Mapped[Optional["User"]] = relationship(
"User", back_populates="playlists"
)
playlist_sounds: Mapped[list["PlaylistSound"]] = relationship(
"PlaylistSound", back_populates="playlist", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
"""String representation of the playlist."""
return f"<Playlist(id={self.id}, name='{self.name}', user_id={self.user_id})>"
def to_dict(self) -> dict:
"""Convert playlist to dictionary representation."""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"genre": self.genre,
"user_id": self.user_id,
"is_main": self.is_main,
"is_deletable": self.is_deletable,
"is_current": self.is_current,
"created_at": (
self.created_at.isoformat() if self.created_at else None
),
"updated_at": (
self.updated_at.isoformat() if self.updated_at else None
),
"sound_count": (
len(self.playlist_sounds) if self.playlist_sounds else 0
),
}
def to_detailed_dict(self) -> dict:
"""Convert playlist to detailed dictionary with sounds."""
playlist_dict = self.to_dict()
playlist_dict["sounds"] = [
{
"sound": ps.sound.to_dict() if ps.sound else None,
"order": ps.order,
"added_at": ps.added_at.isoformat() if ps.added_at else None,
}
for ps in sorted(self.playlist_sounds, key=lambda x: x.order)
]
return playlist_dict
@classmethod
def create_playlist(
cls,
name: str,
description: Optional[str] = None,
genre: Optional[str] = None,
user_id: Optional[int] = None,
is_main: bool = False,
is_deletable: bool = True,
is_current: bool = False,
commit: bool = True,
) -> "Playlist":
"""Create a new playlist."""
playlist = cls(
name=name,
description=description,
genre=genre,
user_id=user_id,
is_main=is_main,
is_deletable=is_deletable,
is_current=is_current,
)
db.session.add(playlist)
if commit:
db.session.commit()
return playlist
@classmethod
def find_by_name(
cls, name: str, user_id: Optional[int] = None
) -> Optional["Playlist"]:
"""Find playlist by name, optionally filtered by user."""
query = cls.query.filter_by(name=name)
if user_id is not None:
query = query.filter_by(user_id=user_id)
return query.first()
@classmethod
def find_by_user(cls, user_id: int) -> list["Playlist"]:
"""Find all playlists for a user."""
return cls.query.filter_by(user_id=user_id).order_by(cls.name).all()
@classmethod
def find_system_playlists(cls) -> list["Playlist"]:
"""Find all system playlists (user_id is None)."""
return cls.query.filter_by(user_id=None).order_by(cls.name).all()
@classmethod
def find_current_playlist(
cls, user_id: Optional[int] = None
) -> Optional["Playlist"]:
"""Find the current active playlist."""
query = cls.query.filter_by(is_current=True)
if user_id is not None:
query = query.filter_by(user_id=user_id)
return query.first()
@classmethod
def find_main_playlist(
cls, user_id: Optional[int] = None
) -> Optional["Playlist"]:
"""Find the main playlist."""
query = cls.query.filter_by(is_main=True)
if user_id is not None:
query = query.filter_by(user_id=user_id)
return query.first()
def set_as_current(self, commit: bool = True) -> None:
"""Set this playlist as the current one and unset others."""
# Unset other current playlists for the same user/system
if self.user_id is not None:
Playlist.query.filter_by(
user_id=self.user_id, is_current=True
).update({"is_current": False})
else:
Playlist.query.filter_by(user_id=None, is_current=True).update(
{"is_current": False}
)
self.is_current = True
if commit:
db.session.commit()
def add_sound(
self, sound_id: int, order: Optional[int] = None, commit: bool = True
) -> "PlaylistSound":
"""Add a sound to the playlist."""
from app.models.playlist_sound import PlaylistSound
if order is None:
# Get the next order number
max_order = (
db.session.query(db.func.max(PlaylistSound.order))
.filter_by(playlist_id=self.id)
.scalar()
)
order = (max_order or 0) + 1
playlist_sound = PlaylistSound(
playlist_id=self.id, sound_id=sound_id, order=order
)
db.session.add(playlist_sound)
if commit:
db.session.commit()
return playlist_sound
def remove_sound(self, sound_id: int, commit: bool = True) -> bool:
"""Remove a sound from the playlist."""
from app.models.playlist_sound import PlaylistSound
playlist_sound = PlaylistSound.query.filter_by(
playlist_id=self.id, sound_id=sound_id
).first()
if playlist_sound:
db.session.delete(playlist_sound)
if commit:
db.session.commit()
return True
return False
def reorder_sounds(
self, sound_orders: list[dict], commit: bool = True
) -> None:
"""Reorder sounds in the playlist.
Args:
sound_orders: List of dicts with 'sound_id' and 'order' keys
"""
from app.models.playlist_sound import PlaylistSound
for item in sound_orders:
playlist_sound = PlaylistSound.query.filter_by(
playlist_id=self.id, sound_id=item["sound_id"]
).first()
if playlist_sound:
playlist_sound.order = item["order"]
if commit:
db.session.commit()
def get_total_duration(self) -> int:
"""Get total duration of all sounds in the playlist in milliseconds."""
from app.models.sound import Sound
total = (
db.session.query(db.func.sum(Sound.duration))
.join(self.playlist_sounds)
.filter(Sound.id.in_([ps.sound_id for ps in self.playlist_sounds]))
.scalar()
)
return total or 0
def duplicate(
self, new_name: str, user_id: Optional[int] = None, commit: bool = True
) -> "Playlist":
"""Create a duplicate of this playlist."""
new_playlist = Playlist.create_playlist(
name=new_name,
description=self.description,
genre=self.genre,
user_id=user_id,
is_main=False,
is_deletable=True,
is_current=False,
commit=commit,
)
# Copy all sounds with their order
for ps in self.playlist_sounds:
new_playlist.add_sound(ps.sound_id, ps.order, commit=False)
if commit:
db.session.commit()
return new_playlist

View File

@@ -0,0 +1,188 @@
"""Playlist-Sound relationship model for managing sound order in playlists."""
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from zoneinfo import ZoneInfo
from sqlalchemy import DateTime, ForeignKey, Integer, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import db
if TYPE_CHECKING:
from app.models.playlist import Playlist
from app.models.sound import Sound
class PlaylistSound(db.Model):
"""Model for playlist-sound relationships with ordering."""
__tablename__ = "playlist_sound"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
playlist_id: Mapped[int] = mapped_column(
Integer, ForeignKey("playlist.id"), nullable=False
)
sound_id: Mapped[int] = mapped_column(
Integer, ForeignKey("sound.id"), nullable=False
)
order: Mapped[int] = mapped_column(Integer, nullable=False)
added_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(tz=ZoneInfo("UTC")),
nullable=False,
)
# Relationships
playlist: Mapped["Playlist"] = relationship(
"Playlist", back_populates="playlist_sounds"
)
sound: Mapped["Sound"] = relationship(
"Sound", back_populates="playlist_sounds"
)
# Constraints
__table_args__ = (
UniqueConstraint(
"playlist_id", "sound_id", name="unique_playlist_sound"
),
UniqueConstraint("playlist_id", "order", name="unique_playlist_order"),
)
def __repr__(self) -> str:
"""String representation of the playlist-sound relationship."""
return f"<PlaylistSound(playlist_id={self.playlist_id}, sound_id={self.sound_id}, order={self.order})>"
def to_dict(self) -> dict:
"""Convert playlist-sound relationship to dictionary representation."""
return {
"id": self.id,
"playlist_id": self.playlist_id,
"sound_id": self.sound_id,
"order": self.order,
"added_at": self.added_at.isoformat() if self.added_at else None,
"sound": self.sound.to_dict() if self.sound else None,
}
@classmethod
def create_playlist_sound(
cls,
playlist_id: int,
sound_id: int,
order: int,
commit: bool = True,
) -> "PlaylistSound":
"""Create a new playlist-sound relationship."""
playlist_sound = cls(
playlist_id=playlist_id,
sound_id=sound_id,
order=order,
)
db.session.add(playlist_sound)
if commit:
db.session.commit()
return playlist_sound
@classmethod
def find_by_playlist(cls, playlist_id: int) -> list["PlaylistSound"]:
"""Find all sounds in a playlist ordered by their position."""
return (
cls.query.filter_by(playlist_id=playlist_id)
.order_by(cls.order)
.all()
)
@classmethod
def find_by_sound(cls, sound_id: int) -> list["PlaylistSound"]:
"""Find all playlists containing a specific sound."""
return cls.query.filter_by(sound_id=sound_id).all()
@classmethod
def find_by_playlist_and_sound(
cls, playlist_id: int, sound_id: int
) -> Optional["PlaylistSound"]:
"""Find a specific playlist-sound relationship."""
return cls.query.filter_by(
playlist_id=playlist_id, sound_id=sound_id
).first()
@classmethod
def get_next_order(cls, playlist_id: int) -> int:
"""Get the next order number for a playlist."""
max_order = (
db.session.query(db.func.max(cls.order))
.filter_by(playlist_id=playlist_id)
.scalar()
)
return (max_order or 0) + 1
@classmethod
def reorder_playlist(
cls, playlist_id: int, sound_orders: list[dict], commit: bool = True
) -> None:
"""Reorder all sounds in a playlist.
Args:
playlist_id: ID of the playlist
sound_orders: List of dicts with 'sound_id' and 'order' keys
"""
for item in sound_orders:
playlist_sound = cls.query.filter_by(
playlist_id=playlist_id, sound_id=item["sound_id"]
).first()
if playlist_sound:
playlist_sound.order = item["order"]
if commit:
db.session.commit()
def move_to_position(self, new_order: int, commit: bool = True) -> None:
"""Move this sound to a new position in the playlist."""
old_order = self.order
if new_order == old_order:
return
# Get all other sounds in the playlist
other_sounds = (
PlaylistSound.query.filter_by(playlist_id=self.playlist_id)
.filter(PlaylistSound.id != self.id)
.order_by(PlaylistSound.order)
.all()
)
# Remove this sound from its current position
remaining_sounds = [ps for ps in other_sounds if ps.order != old_order]
# Insert at new position
if new_order <= len(remaining_sounds):
remaining_sounds.insert(new_order - 1, self)
else:
remaining_sounds.append(self)
# Update all order values
for i, ps in enumerate(remaining_sounds, 1):
ps.order = i
if commit:
db.session.commit()
def get_previous_sound(self) -> Optional["PlaylistSound"]:
"""Get the previous sound in the playlist."""
return (
PlaylistSound.query.filter_by(playlist_id=self.playlist_id)
.filter(PlaylistSound.order < self.order)
.order_by(PlaylistSound.order.desc())
.first()
)
def get_next_sound(self) -> Optional["PlaylistSound"]:
"""Get the next sound in the playlist."""
return (
PlaylistSound.query.filter_by(playlist_id=self.playlist_id)
.filter(PlaylistSound.order > self.order)
.order_by(PlaylistSound.order.asc())
.first()
)

View File

@@ -2,14 +2,17 @@
from datetime import datetime
from enum import Enum
from typing import Optional
from typing import TYPE_CHECKING, Optional
from zoneinfo import ZoneInfo
from sqlalchemy import Boolean, DateTime, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import db
if TYPE_CHECKING:
from app.models.playlist_sound import PlaylistSound
class SoundType(Enum):
"""Sound type enumeration."""
@@ -22,7 +25,7 @@ class SoundType(Enum):
class Sound(db.Model):
"""Sound model for storing sound file information."""
__tablename__ = "sounds"
__tablename__ = "sound"
id: Mapped[int] = mapped_column(primary_key=True)
@@ -87,6 +90,13 @@ class Sound(db.Model):
nullable=False,
)
# Relationships
playlist_sounds: Mapped[list["PlaylistSound"]] = relationship(
"PlaylistSound",
back_populates="sound",
cascade="all, delete-orphan",
)
def __repr__(self) -> str:
"""String representation of Sound."""
return f"<Sound {self.name} ({self.type}) - {self.play_count} plays>"

View File

@@ -19,12 +19,12 @@ class SoundPlayed(db.Model):
# Foreign keys
user_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("users.id"),
ForeignKey("user.id"),
nullable=False,
)
sound_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("sounds.id"),
ForeignKey("sound.id"),
nullable=False,
)

View File

@@ -13,13 +13,14 @@ from app.database import db
if TYPE_CHECKING:
from app.models.plan import Plan
from app.models.playlist import Playlist
from app.models.user_oauth import UserOAuth
class User(db.Model):
"""User model for storing user information."""
__tablename__ = "users"
__tablename__ = "user"
id: Mapped[int] = mapped_column(primary_key=True)
@@ -47,7 +48,7 @@ class User(db.Model):
# Plan relationship
plan_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("plans.id"),
ForeignKey("plan.id"),
nullable=False,
)
@@ -81,6 +82,11 @@ class User(db.Model):
cascade="all, delete-orphan",
)
plan: Mapped["Plan"] = relationship("Plan", back_populates="users")
playlists: Mapped[list["Playlist"]] = relationship(
"Playlist",
back_populates="user",
cascade="all, delete-orphan",
)
def __repr__(self) -> str:
"""String representation of User."""

View File

@@ -21,7 +21,7 @@ class UserOAuth(db.Model):
id: Mapped[int] = mapped_column(primary_key=True)
# User relationship
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
user_id: Mapped[int] = mapped_column(ForeignKey("user.id"), nullable=False)
# OAuth provider information
provider: Mapped[str] = mapped_column(String(50), nullable=False)