Files
sdb-back/app/services/decorators.py
2025-06-28 18:30:30 +02:00

148 lines
4.4 KiB
Python

"""Authentication decorators and middleware."""
from functools import wraps
from typing import Any
from flask import jsonify, request
from flask_jwt_extended import get_jwt, get_jwt_identity, jwt_required
def require_auth(f):
"""Decorator to require authentication for routes."""
return jwt_required()(f)
def get_current_user() -> dict[str, Any] | None:
"""Helper function to get current user from JWT token."""
try:
current_user_id = get_jwt_identity()
if not current_user_id:
return None
claims = get_jwt()
is_active = claims.get("is_active", True)
# Check if user is active
if not is_active:
return None
return {
"id": current_user_id,
"email": claims.get("email", ""),
"name": claims.get("name", ""),
"picture": claims.get("picture"),
"role": claims.get("role", "user"),
"is_active": is_active,
"provider": claims.get("provider", "unknown"),
"providers": claims.get("providers", []),
}
except Exception:
return None
def require_role(required_role: str):
"""Decorator to require specific role for routes."""
def decorator(f):
@wraps(f)
@jwt_required()
def wrapper(*args, **kwargs):
user = get_current_user()
if not user:
return jsonify({"error": "Authentication required"}), 401
if user.get("role") != required_role:
return jsonify({"error": f"Access denied. {required_role.title()} role required"}), 403
return f(*args, **kwargs)
return wrapper
return decorator
def require_admin(f):
"""Decorator to require admin role for routes."""
return require_role("admin")(f)
def require_user_or_admin(f):
"""Decorator to require user or admin role for routes."""
@wraps(f)
@jwt_required()
def wrapper(*args, **kwargs):
user = get_current_user()
if not user:
return jsonify({"error": "Authentication required"}), 401
if user.get("role") not in ["user", "admin"]:
return jsonify({"error": "Access denied"}), 403
return f(*args, **kwargs)
return wrapper
def get_user_from_api_token() -> dict[str, Any] | None:
"""Get user from API token in request headers."""
try:
# Check for API token in Authorization header
auth_header = request.headers.get("Authorization")
if not auth_header:
return None
# Expected format: "Bearer <token>" or "Token <token>"
parts = auth_header.split()
if len(parts) != 2 or parts[0].lower() not in ["bearer", "token"]:
return None
api_token = parts[1]
# Import here to avoid circular imports
from app.models.user import User
user = User.find_by_api_token(api_token)
if user and user.is_active:
return {
"id": str(user.id),
"email": user.email,
"name": user.name,
"picture": user.picture,
"role": user.role,
"is_active": user.is_active,
"provider": "api_token",
"providers": [p.provider for p in user.oauth_providers] + ["api_token"],
}
return None
except Exception:
return None
def require_api_token(f):
"""Decorator to require API token authentication for routes."""
@wraps(f)
def wrapper(*args, **kwargs):
user = get_user_from_api_token()
if not user:
return jsonify({"error": "Valid API token required"}), 401
return f(*args, **kwargs)
return wrapper
def require_auth_or_api_token(f):
"""Decorator to accept either JWT or API token authentication."""
@wraps(f)
def wrapper(*args, **kwargs):
# Try JWT authentication first
try:
user = get_current_user()
if user:
return f(*args, **kwargs)
except Exception:
pass
# Try API token authentication
user = get_user_from_api_token()
if user:
return f(*args, **kwargs)
return jsonify({"error": "Authentication required (JWT or API token)"}), 401
return wrapper