- Implemented VLCService to handle sound playback using VLC. - Added routes for soundboard management including play, stop, and status. - Introduced admin routes for sound normalization and scanning. - Updated user model and services to accommodate new functionalities. - Enhanced error handling and logging throughout the application. - Updated dependencies to include python-vlc for sound playback capabilities.
211 lines
5.9 KiB
Python
211 lines
5.9 KiB
Python
"""Authentication decorators and middleware."""
|
|
|
|
from functools import wraps
|
|
from typing import Any
|
|
|
|
from flask import jsonify, request
|
|
from flask_jwt_extended import get_jwt_identity, verify_jwt_in_request
|
|
|
|
|
|
def get_user_from_jwt() -> dict[str, Any] | None:
|
|
"""Helper function to get current user from JWT token."""
|
|
try:
|
|
# Try to verify JWT token in request - this sets up the context
|
|
verify_jwt_in_request()
|
|
|
|
current_user_id = get_jwt_identity()
|
|
if not current_user_id:
|
|
return None
|
|
|
|
# Query database for user data instead of using JWT claims
|
|
from app.models.user import User
|
|
|
|
user = User.query.get(int(current_user_id))
|
|
if not user or not user.is_active:
|
|
return None
|
|
|
|
# Build comprehensive providers list
|
|
providers = [p.provider for p in user.oauth_providers]
|
|
if user.password_hash:
|
|
providers.append("password")
|
|
if user.api_token:
|
|
providers.append("api_token")
|
|
|
|
return {
|
|
"id": str(user.id),
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"picture": user.picture,
|
|
"role": user.role,
|
|
"is_active": user.is_active,
|
|
"provider": "jwt",
|
|
"providers": providers,
|
|
"plan": user.plan.to_dict() if user.plan else None,
|
|
"credits": user.credits,
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_user_from_api_token() -> dict[str, Any] | None:
|
|
"""Get user from API token in request headers."""
|
|
try:
|
|
auth_header = request.headers.get("Authorization")
|
|
if not auth_header:
|
|
return None
|
|
|
|
parts = auth_header.split()
|
|
if len(parts) != 2 or parts[0].lower() not in ["bearer", "token"]:
|
|
return None
|
|
|
|
api_token = parts[1]
|
|
|
|
from app.models.user import User
|
|
|
|
user = User.find_by_api_token(api_token)
|
|
if user and user.is_active:
|
|
# Build comprehensive providers list
|
|
providers = [p.provider for p in user.oauth_providers]
|
|
if user.password_hash:
|
|
providers.append("password")
|
|
if user.api_token:
|
|
providers.append("api_token")
|
|
|
|
return {
|
|
"id": str(user.id),
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"picture": user.picture,
|
|
"role": user.role,
|
|
"is_active": user.is_active,
|
|
"provider": "api_token",
|
|
"providers": providers,
|
|
"plan": user.plan.to_dict() if user.plan else None,
|
|
"credits": user.credits,
|
|
}
|
|
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_current_user() -> dict[str, Any] | None:
|
|
"""Get current user from either JWT or API token."""
|
|
# Try JWT first
|
|
user = get_user_from_jwt()
|
|
if user:
|
|
return user
|
|
|
|
# Try API token
|
|
return get_user_from_api_token()
|
|
|
|
|
|
def require_auth(f):
|
|
"""Decorator to require authentication (JWT or API token) for routes."""
|
|
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
user = get_current_user()
|
|
if not user:
|
|
return (
|
|
jsonify(
|
|
{"error": "Authentication required (JWT or API token)"},
|
|
),
|
|
401,
|
|
)
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
def require_role(required_role: str):
|
|
"""Decorator to require specific role for routes."""
|
|
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
user = get_current_user()
|
|
if not user:
|
|
return jsonify({"error": "Authentication required"}), 401
|
|
|
|
if user.get("role") != required_role:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"error": f"Access denied. {required_role.title()} role required",
|
|
},
|
|
),
|
|
403,
|
|
)
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def require_admin(f):
|
|
"""Decorator to require admin role for routes."""
|
|
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
user = get_current_user()
|
|
if not user:
|
|
return jsonify({"error": "Authentication required"}), 401
|
|
|
|
if user.get("role") != "admin":
|
|
return (
|
|
jsonify({"error": "Access denied. Admin role required"}),
|
|
403,
|
|
)
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
def require_credits(credits_needed: int):
|
|
"""Decorator to require and deduct credits for routes."""
|
|
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
from app.database import db
|
|
from app.models.user import User
|
|
|
|
# First check authentication
|
|
user_data = get_current_user()
|
|
if not user_data:
|
|
return jsonify({"error": "Authentication required"}), 401
|
|
|
|
# Get the actual user from database to check/update credits
|
|
user = User.query.get(int(user_data["id"]))
|
|
if not user or not user.is_active:
|
|
return jsonify({"error": "User not found or inactive"}), 401
|
|
|
|
# Check if user has enough credits
|
|
if user.credits < credits_needed:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"error": f"Insufficient credits. Required: {credits_needed}, Available: {user.credits}",
|
|
},
|
|
),
|
|
402, # Payment Required status code
|
|
)
|
|
|
|
# Deduct credits
|
|
user.credits -= credits_needed
|
|
db.session.commit()
|
|
|
|
# Execute the function
|
|
result = f(*args, **kwargs)
|
|
|
|
return result
|
|
|
|
return wrapper
|
|
|
|
return decorator
|