feat(auth): implement user plans and credits system with related endpoints

This commit is contained in:
JSC
2025-06-29 16:40:54 +02:00
parent 52c60db811
commit 91648a858e
10 changed files with 334 additions and 63 deletions

View File

@@ -1,6 +1,7 @@
"""Database models."""
from .plan import Plan
from .user import User
from .user_oauth import UserOAuth
__all__ = ["User", "UserOAuth"]
__all__ = ["Plan", "User", "UserOAuth"]

58
app/models/plan.py Normal file
View File

@@ -0,0 +1,58 @@
"""Plan model for user subscription plans."""
from sqlalchemy import Column, Integer, String, Text
from sqlalchemy.orm import relationship
from app.database import db
class Plan(db.Model):
"""Plan model for user subscription plans."""
__tablename__ = "plans"
id = Column(Integer, primary_key=True)
code = Column(String(50), unique=True, nullable=False, index=True)
name = Column(String(100), nullable=False)
description = Column(Text)
credits = Column(Integer, default=0, nullable=False)
max_credits = Column(Integer, default=0, nullable=False)
# Relationship with users
users = relationship("User", back_populates="plan", lazy="dynamic")
def __repr__(self) -> str:
"""String representation of Plan."""
return f"<Plan {self.code}: {self.name}>"
@classmethod
def find_by_code(cls, code: str) -> "Plan | None":
"""Find plan by code."""
return cls.query.filter_by(code=code).first()
@classmethod
def get_default_plan(cls) -> "Plan":
"""Get the default plan (free)."""
plan = cls.find_by_code("free")
if not plan:
raise ValueError("Default 'free' plan not found in database")
return plan
@classmethod
def get_pro_plan(cls) -> "Plan":
"""Get the pro plan."""
plan = cls.find_by_code("pro")
if not plan:
raise ValueError("'pro' plan not found in database")
return plan
def to_dict(self) -> dict:
"""Convert plan to dictionary."""
return {
"id": self.id,
"code": self.code,
"name": self.name,
"description": self.description,
"credits": self.credits,
"max_credits": self.max_credits,
}

View File

@@ -5,13 +5,14 @@ from datetime import datetime
from typing import Optional, TYPE_CHECKING
from werkzeug.security import check_password_hash, generate_password_hash
from sqlalchemy import String, DateTime
from sqlalchemy import String, DateTime, Integer, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import db
if TYPE_CHECKING:
from app.models.user_oauth import UserOAuth
from app.models.plan import Plan
class User(db.Model):
@@ -35,6 +36,12 @@ class User(db.Model):
# User status
is_active: Mapped[bool] = mapped_column(nullable=False, default=True)
# Plan relationship
plan_id: Mapped[int] = mapped_column(Integer, ForeignKey("plans.id"), nullable=False)
# User credits (populated from plan credits on creation)
credits: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
# API token for programmatic access
api_token: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
api_token_expires_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
@@ -51,6 +58,7 @@ class User(db.Model):
oauth_providers: Mapped[list["UserOAuth"]] = relationship(
"UserOAuth", back_populates="user", cascade="all, delete-orphan"
)
plan: Mapped["Plan"] = relationship("Plan", back_populates="users")
def __repr__(self) -> str:
"""String representation of User."""
@@ -69,6 +77,8 @@ class User(db.Model):
"api_token": self.api_token,
"api_token_expires_at": self.api_token_expires_at.isoformat() if self.api_token_expires_at else None,
"providers": [provider.provider for provider in self.oauth_providers],
"plan": self.plan.to_dict() if self.plan else None,
"credits": self.credits,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
@@ -159,6 +169,7 @@ class User(db.Model):
) -> tuple["User", "UserOAuth"]:
"""Find existing user or create new one from OAuth data."""
from app.models.user_oauth import UserOAuth
from app.models.plan import Plan
# First, try to find existing OAuth provider
oauth_provider = UserOAuth.find_by_provider_and_id(provider, provider_id)
@@ -178,16 +189,24 @@ class User(db.Model):
user = cls.find_by_email(email)
if not user:
# Check if this is the first user (admin)
# Check if this is the first user (admin with pro plan)
user_count = cls.query.count()
role = "admin" if user_count == 0 else "user"
# Assign plan: first user gets pro, others get free
if user_count == 0:
plan = Plan.get_pro_plan()
else:
plan = Plan.get_default_plan()
# Create new user
user = cls(
email=email,
name=name,
picture=picture,
role=role,
plan_id=plan.id,
credits=plan.credits, # Set credits from plan
)
user.generate_api_token() # Generate API token on creation
db.session.add(user)
@@ -209,20 +228,30 @@ class User(db.Model):
@classmethod
def create_with_password(cls, email: str, password: str, name: str) -> "User":
"""Create new user with email and password."""
from app.models.plan import Plan
# Check if user already exists
existing_user = cls.find_by_email(email)
if existing_user:
raise ValueError("User with this email already exists")
# Check if this is the first user (admin)
# Check if this is the first user (admin with pro plan)
user_count = cls.query.count()
role = "admin" if user_count == 0 else "user"
# Assign plan: first user gets pro, others get free
if user_count == 0:
plan = Plan.get_pro_plan()
else:
plan = Plan.get_default_plan()
# Create new user
user = cls(
email=email,
name=name,
role=role,
plan_id=plan.id,
credits=plan.credits, # Set credits from plan
)
user.set_password(password)
user.generate_api_token() # Generate API token on creation