- Implemented VLCService to handle sound playback using VLC. - Added routes for soundboard management including play, stop, and status. - Introduced admin routes for sound normalization and scanning. - Updated user model and services to accommodate new functionalities. - Enhanced error handling and logging throughout the application. - Updated dependencies to include python-vlc for sound playback capabilities.
236 lines
7.9 KiB
Python
236 lines
7.9 KiB
Python
"""Authentication service for multiple OAuth providers."""
|
|
|
|
from typing import Any
|
|
|
|
from authlib.integrations.flask_client import OAuth
|
|
from flask import Flask, jsonify
|
|
from flask_jwt_extended import (
|
|
get_jwt_identity,
|
|
jwt_required,
|
|
set_access_cookies,
|
|
set_refresh_cookies,
|
|
unset_jwt_cookies,
|
|
)
|
|
|
|
from app.models.user import User
|
|
from app.services.oauth_providers.registry import OAuthProviderRegistry
|
|
from app.services.token_service import TokenService
|
|
|
|
|
|
class AuthService:
|
|
"""Service for handling multiple OAuth providers authentication."""
|
|
|
|
def __init__(self, app: Flask | None = None) -> None:
|
|
"""Initialize the authentication service."""
|
|
self.oauth = OAuth()
|
|
self.provider_registry = None
|
|
self.token_service = TokenService()
|
|
if app:
|
|
self.init_app(app)
|
|
|
|
def init_app(self, app: Flask) -> None:
|
|
"""Initialize the service with Flask app."""
|
|
self.oauth.init_app(app)
|
|
|
|
# Initialize provider registry
|
|
self.provider_registry = OAuthProviderRegistry(self.oauth)
|
|
|
|
def redirect_to_login(self, provider_name: str, redirect_uri: str):
|
|
"""Redirect to OAuth provider login."""
|
|
provider = self.provider_registry.get_provider(provider_name)
|
|
if not provider:
|
|
msg = f"OAuth provider '{provider_name}' not configured"
|
|
raise RuntimeError(msg)
|
|
|
|
client = provider.get_client()
|
|
return client.authorize_redirect(redirect_uri)
|
|
|
|
def get_login_url(self, provider_name: str, redirect_uri: str) -> str:
|
|
"""Generate OAuth provider login URL (for testing or manual use)."""
|
|
provider = self.provider_registry.get_provider(provider_name)
|
|
if not provider:
|
|
msg = f"OAuth provider '{provider_name}' not configured"
|
|
raise RuntimeError(msg)
|
|
|
|
return provider.get_authorization_url(redirect_uri)
|
|
|
|
def handle_callback(self, provider_name: str) -> Any:
|
|
"""Handle OAuth callback and exchange code for token."""
|
|
provider = self.provider_registry.get_provider(provider_name)
|
|
if not provider:
|
|
msg = f"OAuth provider '{provider_name}' not configured"
|
|
raise RuntimeError(msg)
|
|
|
|
token = provider.exchange_code_for_token(None, None)
|
|
raw_user_info = provider.get_user_info(token)
|
|
user_data = provider.normalize_user_data(raw_user_info)
|
|
|
|
if user_data and user_data.get("id"):
|
|
# Find or create user in database
|
|
user, oauth_provider = User.find_or_create_from_oauth(
|
|
provider=provider_name,
|
|
provider_id=user_data["id"],
|
|
email=user_data["email"],
|
|
name=user_data["name"],
|
|
picture=user_data.get("picture"),
|
|
)
|
|
|
|
# Check if user account is active
|
|
if not user.is_active:
|
|
response = jsonify({"error": "Account is disabled"})
|
|
response.status_code = 401
|
|
return response
|
|
|
|
# Prepare user data for JWT token using user.to_dict()
|
|
jwt_user_data = user.to_dict()
|
|
jwt_user_data["provider"] = (
|
|
oauth_provider.provider
|
|
) # Override provider for OAuth login
|
|
|
|
# Generate JWT tokens
|
|
access_token = self.token_service.generate_access_token(
|
|
jwt_user_data,
|
|
)
|
|
refresh_token = self.token_service.generate_refresh_token(
|
|
jwt_user_data,
|
|
)
|
|
|
|
# Create response and set HTTP-only cookies
|
|
response = jsonify(
|
|
{
|
|
"message": "Login successful",
|
|
"user": jwt_user_data,
|
|
},
|
|
)
|
|
|
|
# Set JWT cookies
|
|
set_access_cookies(response, access_token)
|
|
set_refresh_cookies(response, refresh_token)
|
|
|
|
return response
|
|
|
|
msg = f"Failed to get user information from {provider.display_name}"
|
|
raise ValueError(msg)
|
|
|
|
def get_available_providers(self) -> dict[str, Any]:
|
|
"""Get list of available OAuth providers."""
|
|
if not self.provider_registry:
|
|
return {}
|
|
|
|
providers = self.provider_registry.get_available_providers()
|
|
return {
|
|
name: {"name": provider.name, "display_name": provider.display_name}
|
|
for name, provider in providers.items()
|
|
}
|
|
|
|
@jwt_required()
|
|
def get_current_user(self) -> dict[str, Any] | None:
|
|
"""Get current user from JWT token."""
|
|
from flask_jwt_extended import get_jwt
|
|
|
|
current_user_id = get_jwt_identity()
|
|
claims = get_jwt()
|
|
|
|
if current_user_id:
|
|
# Get plan information from JWT claims
|
|
plan_data = claims.get("plan")
|
|
return {
|
|
"id": current_user_id,
|
|
"email": claims.get("email", ""),
|
|
"name": claims.get("name", ""),
|
|
"picture": claims.get("picture"),
|
|
"role": claims.get("role", "user"),
|
|
"is_active": claims.get("is_active", True),
|
|
"provider": claims.get("provider", "unknown"),
|
|
"providers": claims.get("providers", []),
|
|
"plan": plan_data,
|
|
"credits": claims.get("credits"),
|
|
}
|
|
return None
|
|
|
|
def register_with_password(
|
|
self,
|
|
email: str,
|
|
password: str,
|
|
name: str,
|
|
) -> Any:
|
|
"""Register new user with email and password."""
|
|
try:
|
|
# Create user with password
|
|
user = User.create_with_password(email, password, name)
|
|
|
|
# Prepare user data for JWT token using user.to_dict()
|
|
jwt_user_data = user.to_dict()
|
|
jwt_user_data["provider"] = (
|
|
"password" # Override provider for password registration
|
|
)
|
|
|
|
# Generate JWT tokens
|
|
access_token = self.token_service.generate_access_token(
|
|
jwt_user_data,
|
|
)
|
|
refresh_token = self.token_service.generate_refresh_token(
|
|
jwt_user_data,
|
|
)
|
|
|
|
# Create response and set HTTP-only cookies
|
|
response = jsonify(
|
|
{
|
|
"message": "Registration successful",
|
|
"user": jwt_user_data,
|
|
},
|
|
)
|
|
|
|
# Set JWT cookies
|
|
set_access_cookies(response, access_token)
|
|
set_refresh_cookies(response, refresh_token)
|
|
|
|
return response
|
|
|
|
except ValueError as e:
|
|
response = jsonify({"error": str(e)})
|
|
response.status_code = 400
|
|
return response
|
|
|
|
def login_with_password(self, email: str, password: str) -> Any:
|
|
"""Login user with email and password."""
|
|
# Authenticate user
|
|
user = User.authenticate_with_password(email, password)
|
|
|
|
if not user:
|
|
response = jsonify(
|
|
{"error": "Invalid email, password or disabled account"},
|
|
)
|
|
response.status_code = 401
|
|
return response
|
|
|
|
# Prepare user data for JWT token using user.to_dict()
|
|
jwt_user_data = user.to_dict()
|
|
jwt_user_data["provider"] = (
|
|
"password" # Override provider for password login
|
|
)
|
|
|
|
# Generate JWT tokens
|
|
access_token = self.token_service.generate_access_token(jwt_user_data)
|
|
refresh_token = self.token_service.generate_refresh_token(jwt_user_data)
|
|
|
|
# Create response and set HTTP-only cookies
|
|
response = jsonify(
|
|
{
|
|
"message": "Login successful",
|
|
"user": jwt_user_data,
|
|
},
|
|
)
|
|
|
|
# Set JWT cookies
|
|
set_access_cookies(response, access_token)
|
|
set_refresh_cookies(response, refresh_token)
|
|
|
|
return response
|
|
|
|
def logout(self) -> Any:
|
|
"""Clear authentication cookies."""
|
|
response = jsonify({"message": "Logged out successfully"})
|
|
unset_jwt_cookies(response)
|
|
return response
|