"""User model for authentication.""" import secrets from datetime import datetime from typing import Optional, TYPE_CHECKING from werkzeug.security import check_password_hash, generate_password_hash from sqlalchemy import String, DateTime, Integer, ForeignKey from sqlalchemy.orm import Mapped, mapped_column, relationship from app.database import db if TYPE_CHECKING: from app.models.user_oauth import UserOAuth from app.models.plan import Plan class User(db.Model): """User model for storing user information.""" __tablename__ = "users" id: Mapped[int] = mapped_column(primary_key=True) # Primary user information (can be updated from any connected provider) email: Mapped[str] = mapped_column(String(255), nullable=False, unique=True) name: Mapped[str] = mapped_column(String(255), nullable=False) picture: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) # Password authentication (optional - users can use OAuth instead) password_hash: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) # Role-based access control role: Mapped[str] = mapped_column(String(50), nullable=False, default="user") # User status is_active: Mapped[bool] = mapped_column(nullable=False, default=True) # Plan relationship plan_id: Mapped[int] = mapped_column(Integer, ForeignKey("plans.id"), nullable=False) # User credits (populated from plan credits on creation) credits: Mapped[int] = mapped_column(Integer, nullable=False, default=0) # API token for programmatic access api_token: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) api_token_expires_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) # Timestamps created_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, nullable=False ) updated_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False ) # Relationships oauth_providers: Mapped[list["UserOAuth"]] = relationship( "UserOAuth", back_populates="user", cascade="all, delete-orphan" ) plan: Mapped["Plan"] = relationship("Plan", back_populates="users") def __repr__(self) -> str: """String representation of User.""" provider_count = len(self.oauth_providers) return f"" def to_dict(self) -> dict: """Convert user to dictionary.""" # Build comprehensive providers list providers = [provider.provider for provider in self.oauth_providers] if self.password_hash: providers.append("password") if self.api_token: providers.append("api_token") return { "id": str(self.id), "email": self.email, "name": self.name, "picture": self.picture, "role": self.role, "is_active": self.is_active, "api_token": self.api_token, "api_token_expires_at": self.api_token_expires_at.isoformat() if self.api_token_expires_at else None, "providers": providers, "plan": self.plan.to_dict() if self.plan else None, "credits": self.credits, "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat(), } def get_provider(self, provider_name: str) -> Optional["UserOAuth"]: """Get specific OAuth provider for this user.""" for provider in self.oauth_providers: if provider.provider == provider_name: return provider return None def has_provider(self, provider_name: str) -> bool: """Check if user has specific OAuth provider connected.""" return self.get_provider(provider_name) is not None def update_from_provider(self, provider_data: dict) -> None: """Update user info from provider data (email, name, picture).""" self.email = provider_data.get("email", self.email) self.name = provider_data.get("name", self.name) self.picture = provider_data.get("picture", self.picture) self.updated_at = datetime.utcnow() db.session.commit() def set_password(self, password: str) -> None: """Hash and set user password.""" self.password_hash = generate_password_hash(password) self.updated_at = datetime.utcnow() def check_password(self, password: str) -> bool: """Check if provided password matches user's password.""" if not self.password_hash: return False return check_password_hash(self.password_hash, password) def has_password(self) -> bool: """Check if user has a password set.""" return self.password_hash is not None def generate_api_token(self) -> str: """Generate a new API token for the user.""" self.api_token = secrets.token_urlsafe(32) self.api_token_expires_at = None # No expiration by default self.updated_at = datetime.utcnow() return self.api_token def is_api_token_valid(self) -> bool: """Check if the user's API token is valid (exists and not expired).""" if not self.api_token: return False if self.api_token_expires_at is None: return True # No expiration return datetime.utcnow() < self.api_token_expires_at def revoke_api_token(self) -> None: """Revoke the user's API token.""" self.api_token = None self.api_token_expires_at = None self.updated_at = datetime.utcnow() def activate(self) -> None: """Activate the user account.""" self.is_active = True self.updated_at = datetime.utcnow() def deactivate(self) -> None: """Deactivate the user account.""" self.is_active = False self.updated_at = datetime.utcnow() @classmethod def find_by_email(cls, email: str) -> Optional["User"]: """Find user by email address.""" return cls.query.filter_by(email=email).first() @classmethod def find_by_api_token(cls, api_token: str) -> Optional["User"]: """Find user by API token if token is valid.""" user = cls.query.filter_by(api_token=api_token).first() if user and user.is_api_token_valid(): return user return None @classmethod def find_or_create_from_oauth( cls, provider: str, provider_id: str, email: str, name: str, picture: Optional[str] = None ) -> tuple["User", "UserOAuth"]: """Find existing user or create new one from OAuth data.""" from app.models.user_oauth import UserOAuth from app.models.plan import Plan # First, try to find existing OAuth provider oauth_provider = UserOAuth.find_by_provider_and_id(provider, provider_id) if oauth_provider: # Update existing provider and user info user = oauth_provider.user oauth_provider.email = email oauth_provider.name = name oauth_provider.picture = picture oauth_provider.updated_at = datetime.utcnow() # Update user info with latest data user.update_from_provider({"email": email, "name": name, "picture": picture}) else: # Try to find user by email to link the new provider user = cls.find_by_email(email) if not user: # Check if this is the first user (admin with pro plan) user_count = cls.query.count() role = "admin" if user_count == 0 else "user" # Assign plan: first user gets pro, others get free if user_count == 0: plan = Plan.get_pro_plan() else: plan = Plan.get_default_plan() # Create new user user = cls( email=email, name=name, picture=picture, role=role, plan_id=plan.id, credits=plan.credits, # Set credits from plan ) user.generate_api_token() # Generate API token on creation db.session.add(user) db.session.flush() # Flush to get user.id # Create new OAuth provider oauth_provider = UserOAuth.create_or_update( user_id=user.id, provider=provider, provider_id=provider_id, email=email, name=name, picture=picture, ) db.session.commit() return user, oauth_provider @classmethod def create_with_password(cls, email: str, password: str, name: str) -> "User": """Create new user with email and password.""" from app.models.plan import Plan # Check if user already exists existing_user = cls.find_by_email(email) if existing_user: raise ValueError("User with this email already exists") # Check if this is the first user (admin with pro plan) user_count = cls.query.count() role = "admin" if user_count == 0 else "user" # Assign plan: first user gets pro, others get free if user_count == 0: plan = Plan.get_pro_plan() else: plan = Plan.get_default_plan() # Create new user user = cls( email=email, name=name, role=role, plan_id=plan.id, credits=plan.credits, # Set credits from plan ) user.set_password(password) user.generate_api_token() # Generate API token on creation db.session.add(user) db.session.commit() return user @classmethod def authenticate_with_password(cls, email: str, password: str) -> Optional["User"]: """Authenticate user with email and password.""" user = cls.find_by_email(email) if user and user.check_password(password) and user.is_active: return user return None