"""Authentication service for multiple OAuth providers.""" from typing import Any from authlib.integrations.flask_client import OAuth from flask import Flask, jsonify from flask_jwt_extended import ( get_jwt_identity, jwt_required, set_access_cookies, set_refresh_cookies, unset_jwt_cookies, ) from app.models.user import User from app.services.oauth_providers.registry import OAuthProviderRegistry from app.services.token_service import TokenService class AuthService: """Service for handling multiple OAuth providers authentication.""" def __init__(self, app: Flask | None = None) -> None: """Initialize the authentication service.""" self.oauth = OAuth() self.provider_registry = None self.token_service = TokenService() if app: self.init_app(app) def init_app(self, app: Flask) -> None: """Initialize the service with Flask app.""" self.oauth.init_app(app) # Initialize provider registry self.provider_registry = OAuthProviderRegistry(self.oauth) def redirect_to_login(self, provider_name: str, redirect_uri: str): """Redirect to OAuth provider login.""" provider = self.provider_registry.get_provider(provider_name) if not provider: msg = f"OAuth provider '{provider_name}' not configured" raise RuntimeError(msg) client = provider.get_client() return client.authorize_redirect(redirect_uri) def get_login_url(self, provider_name: str, redirect_uri: str) -> str: """Generate OAuth provider login URL (for testing or manual use).""" provider = self.provider_registry.get_provider(provider_name) if not provider: msg = f"OAuth provider '{provider_name}' not configured" raise RuntimeError(msg) return provider.get_authorization_url(redirect_uri) def handle_callback(self, provider_name: str) -> Any: """Handle OAuth callback and exchange code for token.""" provider = self.provider_registry.get_provider(provider_name) if not provider: msg = f"OAuth provider '{provider_name}' not configured" raise RuntimeError(msg) token = provider.exchange_code_for_token(None, None) raw_user_info = provider.get_user_info(token) user_data = provider.normalize_user_data(raw_user_info) if user_data and user_data.get("id"): # Find or create user in database user, oauth_provider = User.find_or_create_from_oauth( provider=provider_name, provider_id=user_data["id"], email=user_data["email"], name=user_data["name"], picture=user_data.get("picture"), ) # Check if user account is active if not user.is_active: response = jsonify({"error": "Account is disabled"}) response.status_code = 401 return response # Prepare user data for JWT token using user.to_dict() jwt_user_data = user.to_dict() jwt_user_data["provider"] = ( oauth_provider.provider ) # Override provider for OAuth login # Generate JWT tokens access_token = self.token_service.generate_access_token( jwt_user_data, ) refresh_token = self.token_service.generate_refresh_token( jwt_user_data, ) # Create response and set HTTP-only cookies response = jsonify( { "message": "Login successful", "user": jwt_user_data, }, ) # Set JWT cookies set_access_cookies(response, access_token) set_refresh_cookies(response, refresh_token) return response msg = f"Failed to get user information from {provider.display_name}" raise ValueError(msg) def get_available_providers(self) -> dict[str, Any]: """Get list of available OAuth providers.""" if not self.provider_registry: return {} providers = self.provider_registry.get_available_providers() return { name: {"name": provider.name, "display_name": provider.display_name} for name, provider in providers.items() } @jwt_required() def get_current_user(self) -> dict[str, Any] | None: """Get current user from JWT token.""" from flask_jwt_extended import get_jwt current_user_id = get_jwt_identity() claims = get_jwt() if current_user_id: # Get plan information from JWT claims plan_data = claims.get("plan") return { "id": current_user_id, "email": claims.get("email", ""), "name": claims.get("name", ""), "picture": claims.get("picture"), "role": claims.get("role", "user"), "is_active": claims.get("is_active", True), "provider": claims.get("provider", "unknown"), "providers": claims.get("providers", []), "plan": plan_data, "credits": claims.get("credits"), } return None def register_with_password( self, email: str, password: str, name: str, ) -> Any: """Register new user with email and password.""" try: # Create user with password user = User.create_with_password(email, password, name) # Prepare user data for JWT token using user.to_dict() jwt_user_data = user.to_dict() jwt_user_data["provider"] = ( "password" # Override provider for password registration ) # Generate JWT tokens access_token = self.token_service.generate_access_token( jwt_user_data, ) refresh_token = self.token_service.generate_refresh_token( jwt_user_data, ) # Create response and set HTTP-only cookies response = jsonify( { "message": "Registration successful", "user": jwt_user_data, }, ) # Set JWT cookies set_access_cookies(response, access_token) set_refresh_cookies(response, refresh_token) return response except ValueError as e: response = jsonify({"error": str(e)}) response.status_code = 400 return response def login_with_password(self, email: str, password: str) -> Any: """Login user with email and password.""" # Authenticate user user = User.authenticate_with_password(email, password) if not user: response = jsonify( {"error": "Invalid email, password or disabled account"}, ) response.status_code = 401 return response # Prepare user data for JWT token using user.to_dict() jwt_user_data = user.to_dict() jwt_user_data["provider"] = ( "password" # Override provider for password login ) # Generate JWT tokens access_token = self.token_service.generate_access_token(jwt_user_data) refresh_token = self.token_service.generate_refresh_token(jwt_user_data) # Create response and set HTTP-only cookies response = jsonify( { "message": "Login successful", "user": jwt_user_data, }, ) # Set JWT cookies set_access_cookies(response, access_token) set_refresh_cookies(response, refresh_token) return response def logout(self) -> Any: """Clear authentication cookies.""" response = jsonify({"message": "Logged out successfully"}) unset_jwt_cookies(response) return response