"""Authentication decorators and middleware.""" from functools import wraps from typing import Any from flask import jsonify, request from flask_jwt_extended import get_jwt, get_jwt_identity, jwt_required def require_auth(f): """Decorator to require authentication for routes.""" return jwt_required()(f) def get_current_user() -> dict[str, Any] | None: """Helper function to get current user from JWT token.""" try: current_user_id = get_jwt_identity() if not current_user_id: return None claims = get_jwt() is_active = claims.get("is_active", True) # Check if user is active if not is_active: return None return { "id": current_user_id, "email": claims.get("email", ""), "name": claims.get("name", ""), "picture": claims.get("picture"), "role": claims.get("role", "user"), "is_active": is_active, "provider": claims.get("provider", "unknown"), "providers": claims.get("providers", []), } except Exception: return None def require_role(required_role: str): """Decorator to require specific role for routes.""" def decorator(f): @wraps(f) @jwt_required() def wrapper(*args, **kwargs): user = get_current_user() if not user: return jsonify({"error": "Authentication required"}), 401 if user.get("role") != required_role: return jsonify({"error": f"Access denied. {required_role.title()} role required"}), 403 return f(*args, **kwargs) return wrapper return decorator def require_admin(f): """Decorator to require admin role for routes.""" return require_role("admin")(f) def require_user_or_admin(f): """Decorator to require user or admin role for routes.""" @wraps(f) @jwt_required() def wrapper(*args, **kwargs): user = get_current_user() if not user: return jsonify({"error": "Authentication required"}), 401 if user.get("role") not in ["user", "admin"]: return jsonify({"error": "Access denied"}), 403 return f(*args, **kwargs) return wrapper def get_user_from_api_token() -> dict[str, Any] | None: """Get user from API token in request headers.""" try: # Check for API token in Authorization header auth_header = request.headers.get("Authorization") if not auth_header: return None # Expected format: "Bearer " or "Token " parts = auth_header.split() if len(parts) != 2 or parts[0].lower() not in ["bearer", "token"]: return None api_token = parts[1] # Import here to avoid circular imports from app.models.user import User user = User.find_by_api_token(api_token) if user and user.is_active: return { "id": str(user.id), "email": user.email, "name": user.name, "picture": user.picture, "role": user.role, "is_active": user.is_active, "provider": "api_token", "providers": [p.provider for p in user.oauth_providers] + ["api_token"], } return None except Exception: return None def require_api_token(f): """Decorator to require API token authentication for routes.""" @wraps(f) def wrapper(*args, **kwargs): user = get_user_from_api_token() if not user: return jsonify({"error": "Valid API token required"}), 401 return f(*args, **kwargs) return wrapper def require_auth_or_api_token(f): """Decorator to accept either JWT or API token authentication.""" @wraps(f) def wrapper(*args, **kwargs): # Try JWT authentication first try: user = get_current_user() if user: return f(*args, **kwargs) except Exception: pass # Try API token authentication user = get_user_from_api_token() if user: return f(*args, **kwargs) return jsonify({"error": "Authentication required (JWT or API token)"}), 401 return wrapper