Files
sdb-back/app/models/user.py

328 lines
11 KiB
Python

"""User model for authentication."""
import secrets
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from zoneinfo import ZoneInfo
from sqlalchemy import DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from werkzeug.security import check_password_hash, generate_password_hash
from app.database import db
if TYPE_CHECKING:
from app.models.plan import Plan
from app.models.playlist import Playlist
from app.models.user_oauth import UserOAuth
class User(db.Model):
"""User model for storing user information."""
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
# Primary user information (can be updated from any connected provider)
email: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
name: Mapped[str] = mapped_column(String(255), nullable=False)
picture: Mapped[str | None] = mapped_column(String(500), nullable=True)
# Password authentication (optional - users can use OAuth instead)
password_hash: Mapped[str | None] = mapped_column(
String(255),
nullable=True,
)
# Role-based access control
role: Mapped[str] = mapped_column(
String(50),
nullable=False,
default="user",
)
# User status
is_active: Mapped[bool] = mapped_column(nullable=False, default=True)
# Plan relationship
plan_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("plans.id"),
nullable=False,
)
# User credits (populated from plan credits on creation)
credits: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
# API token for programmatic access
api_token: Mapped[str | None] = mapped_column(String(255), nullable=True)
api_token_expires_at: Mapped[datetime | None] = mapped_column(
DateTime,
nullable=True,
)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(tz=ZoneInfo("UTC")),
nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(tz=ZoneInfo("UTC")),
onupdate=lambda: datetime.now(tz=ZoneInfo("UTC")),
nullable=False,
)
# Relationships
oauth_providers: Mapped[list["UserOAuth"]] = relationship(
"UserOAuth",
back_populates="user",
cascade="all, delete-orphan",
)
plan: Mapped["Plan"] = relationship("Plan", back_populates="users")
playlists: Mapped[list["Playlist"]] = relationship(
"Playlist",
back_populates="user",
cascade="all, delete-orphan",
)
def __repr__(self) -> str:
"""String representation of User."""
provider_count = len(self.oauth_providers)
return f"<User {self.email} ({provider_count} providers)>"
def to_dict(self) -> dict:
"""Convert user to dictionary."""
# Build comprehensive providers list
providers = [provider.provider for provider in self.oauth_providers]
if self.password_hash:
providers.append("password")
if self.api_token:
providers.append("api_token")
return {
"id": str(self.id),
"email": self.email,
"name": self.name,
"picture": self.picture,
"role": self.role,
"is_active": self.is_active,
"api_token": self.api_token,
"api_token_expires_at": (
self.api_token_expires_at.isoformat()
if self.api_token_expires_at
else None
),
"providers": providers,
"plan": self.plan.to_dict() if self.plan else None,
"credits": self.credits,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
def get_provider(self, provider_name: str) -> Optional["UserOAuth"]:
"""Get specific OAuth provider for this user."""
for provider in self.oauth_providers:
if provider.provider == provider_name:
return provider
return None
def has_provider(self, provider_name: str) -> bool:
"""Check if user has specific OAuth provider connected."""
return self.get_provider(provider_name) is not None
def update_from_provider(self, provider_data: dict) -> None:
"""Update user info from provider data (email, name, picture)."""
self.email = provider_data.get("email", self.email)
self.name = provider_data.get("name", self.name)
self.picture = provider_data.get("picture", self.picture)
self.updated_at = datetime.now(tz=ZoneInfo("UTC"))
db.session.commit()
def set_password(self, password: str) -> None:
"""Hash and set user password."""
self.password_hash = generate_password_hash(password)
self.updated_at = datetime.now(tz=ZoneInfo("UTC"))
def check_password(self, password: str) -> bool:
"""Check if provided password matches user's password."""
if not self.password_hash:
return False
return check_password_hash(self.password_hash, password)
def has_password(self) -> bool:
"""Check if user has a password set."""
return self.password_hash is not None
def generate_api_token(self) -> str:
"""Generate a new API token for the user."""
self.api_token = secrets.token_urlsafe(32)
self.api_token_expires_at = None # No expiration by default
self.updated_at = datetime.now(tz=ZoneInfo("UTC"))
return self.api_token
def is_api_token_valid(self) -> bool:
"""Check if the user's API token is valid (exists and not expired)."""
if not self.api_token:
return False
if self.api_token_expires_at is None:
return True # No expiration
return datetime.now(tz=ZoneInfo("UTC")) < self.api_token_expires_at
def revoke_api_token(self) -> None:
"""Revoke the user's API token."""
self.api_token = None
self.api_token_expires_at = None
self.updated_at = datetime.now(tz=ZoneInfo("UTC"))
def activate(self) -> None:
"""Activate the user account."""
self.is_active = True
self.updated_at = datetime.now(tz=ZoneInfo("UTC"))
def deactivate(self) -> None:
"""Deactivate the user account."""
self.is_active = False
self.updated_at = datetime.now(tz=ZoneInfo("UTC"))
@classmethod
def find_by_email(cls, email: str) -> Optional["User"]:
"""Find user by email address."""
return cls.query.filter_by(email=email).first()
@classmethod
def find_by_api_token(cls, api_token: str) -> Optional["User"]:
"""Find user by API token if token is valid."""
user = cls.query.filter_by(api_token=api_token).first()
if user and user.is_api_token_valid():
return user
return None
@classmethod
def find_or_create_from_oauth(
cls,
provider: str,
provider_id: str,
email: str,
name: str,
picture: str | None = None,
) -> tuple["User", "UserOAuth"]:
"""Find existing user or create new one from OAuth data."""
from app.models.plan import Plan
from app.models.user_oauth import UserOAuth
# First, try to find existing OAuth provider
oauth_provider = UserOAuth.find_by_provider_and_id(
provider,
provider_id,
)
if oauth_provider:
# Update existing provider and user info
user = oauth_provider.user
oauth_provider.email = email
oauth_provider.name = name
oauth_provider.picture = picture
oauth_provider.updated_at = datetime.now(tz=ZoneInfo("UTC"))
# Update user info with latest data
user.update_from_provider(
{"email": email, "name": name, "picture": picture},
)
else:
# Try to find user by email to link the new provider
user = cls.find_by_email(email)
if not user:
# Check if this is the first user (admin with pro plan)
user_count = cls.query.count()
role = "admin" if user_count == 0 else "user"
# Assign plan: first user gets pro, others get free
if user_count == 0:
plan = Plan.get_pro_plan()
else:
plan = Plan.get_default_plan()
# Create new user
user = cls(
email=email,
name=name,
picture=picture,
role=role,
plan_id=plan.id,
credits=plan.credits, # Set credits from plan
)
user.generate_api_token() # Generate API token on creation
db.session.add(user)
db.session.flush() # Flush to get user.id
# Create new OAuth provider
oauth_provider = UserOAuth.create_or_update(
user_id=user.id,
provider=provider,
provider_id=provider_id,
email=email,
name=name,
picture=picture,
)
db.session.commit()
return user, oauth_provider
@classmethod
def create_with_password(
cls,
email: str,
password: str,
name: str,
) -> "User":
"""Create new user with email and password."""
from app.models.plan import Plan
# Check if user already exists
existing_user = cls.find_by_email(email)
if existing_user:
raise ValueError("User with this email already exists")
# Check if this is the first user (admin with pro plan)
user_count = cls.query.count()
role = "admin" if user_count == 0 else "user"
# Assign plan: first user gets pro, others get free
if user_count == 0:
plan = Plan.get_pro_plan()
else:
plan = Plan.get_default_plan()
# Create new user
user = cls(
email=email,
name=name,
role=role,
plan_id=plan.id,
credits=plan.credits, # Set credits from plan
)
user.set_password(password)
user.generate_api_token() # Generate API token on creation
db.session.add(user)
db.session.commit()
return user
@classmethod
def authenticate_with_password(
cls,
email: str,
password: str,
) -> Optional["User"]:
"""Authenticate user with email and password."""
user = cls.find_by_email(email)
if user and user.check_password(password) and user.is_active:
return user
return None