"""Authentication routes.""" from flask import Blueprint, jsonify, url_for from flask_jwt_extended import ( create_access_token, get_jwt_identity, jwt_required, ) from app import auth_service from app.services.decorators import get_current_user, require_auth bp = Blueprint("auth", __name__) @bp.route("/login/") def login_oauth(provider): """Initiate OAuth login for specified provider.""" redirect_uri = url_for("auth.callback", provider=provider, _external=True) return auth_service.redirect_to_login(provider, redirect_uri) @bp.route("/callback/") def callback(provider): """Handle OAuth callback from specified provider.""" from flask import make_response, redirect try: auth_response = auth_service.handle_callback(provider) # If successful, redirect to frontend dashboard with cookies if auth_response.status_code == 200: redirect_response = make_response( redirect("http://localhost:3000/dashboard"), ) # Copy all cookies from the auth response for cookie in auth_response.headers.getlist("Set-Cookie"): redirect_response.headers.add("Set-Cookie", cookie) return redirect_response # If there was an error, redirect to login with error return redirect("http://localhost:3000/login?error=oauth_failed") except Exception as e: error_msg = str(e).replace(" ", "_").replace('"', "") return redirect(f"http://localhost:3000/login?error={error_msg}") @bp.route("/providers") def providers(): """Get list of available OAuth providers.""" return {"providers": auth_service.get_available_providers()} @bp.route("/register", methods=["POST"]) def register(): """Register new user with email and password.""" from flask import request data = request.get_json() if not data: return {"error": "No data provided"}, 400 email = data.get("email") password = data.get("password") name = data.get("name") if not email or not password or not name: return {"error": "Email, password, and name are required"}, 400 # Basic email validation if "@" not in email or "." not in email: return {"error": "Invalid email format"}, 400 # Basic password validation if len(password) < 6: return {"error": "Password must be at least 6 characters long"}, 400 return auth_service.register_with_password(email, password, name) @bp.route("/login", methods=["POST"]) def login(): """Login user with email and password.""" from flask import request data = request.get_json() if not data: return {"error": "No data provided"}, 400 email = data.get("email") password = data.get("password") if not email or not password: return {"error": "Email and password are required"}, 400 return auth_service.login_with_password(email, password) @bp.route("/logout") def logout(): """Logout current user.""" return auth_service.logout() @bp.route("/refresh", methods=["POST"]) @jwt_required(refresh=True) def refresh(): """Refresh access token using refresh token.""" current_user_id = get_jwt_identity() # Create new access token new_access_token = create_access_token(identity=current_user_id) response = jsonify({"message": "Token refreshed"}) # Set new access token cookie from flask_jwt_extended import set_access_cookies set_access_cookies(response, new_access_token) return response @bp.route("/link/") @jwt_required() def link_provider(provider): """Link a new OAuth provider to current user account.""" redirect_uri = url_for( "auth.link_callback", provider=provider, _external=True, ) return auth_service.redirect_to_login(provider, redirect_uri) @bp.route("/link/callback/") @jwt_required() def link_callback(provider): """Handle OAuth callback for linking new provider.""" try: from app.services.oauth_linking_service import OAuthLinkingService current_user_id = get_jwt_identity() if not current_user_id: return {"error": "User not authenticated"}, 401 result = OAuthLinkingService.link_provider_to_user( provider, current_user_id, ) return result except ValueError as e: error_str = str(e) if "not found" in error_str: return {"error": error_str}, 404 if "not configured" in error_str: return {"error": error_str}, 400 if "already linked" in error_str: return {"error": error_str}, 409 return {"error": error_str}, 400 except Exception as e: return {"error": str(e)}, 400 @bp.route("/unlink/", methods=["DELETE"]) @jwt_required() def unlink_provider(provider): """Unlink an OAuth provider from current user account.""" try: from app.services.oauth_linking_service import OAuthLinkingService current_user_id = get_jwt_identity() if not current_user_id: return {"error": "User not authenticated"}, 401 result = OAuthLinkingService.unlink_provider_from_user( provider, current_user_id, ) return result except ValueError as e: error_str = str(e) if "not found" in error_str: return {"error": error_str}, 404 if "Cannot unlink" in error_str: return {"error": error_str}, 400 if "not linked" in error_str: return {"error": error_str}, 404 return {"error": error_str}, 400 except Exception as e: return {"error": str(e)}, 400 @bp.route("/regenerate-api-token", methods=["POST"]) @jwt_required() def regenerate_api_token(): """Regenerate API token for current user.""" current_user_id = get_jwt_identity() if not current_user_id: return {"error": "User not authenticated"}, 401 from app.database import db from app.models.user import User user = User.query.get(current_user_id) if not user: return {"error": "User not found"}, 404 # Generate new API token new_token = user.generate_api_token() db.session.commit() return { "message": "API token regenerated successfully", "api_token": new_token, "expires_at": ( user.api_token_expires_at.isoformat() if user.api_token_expires_at else None ), } @bp.route("/me") @require_auth def me(): """Get current user information.""" user = get_current_user() return {"user": user} @bp.route("/profile", methods=["PATCH"]) @require_auth def update_profile(): """Update current user profile information.""" from flask import request from app.database import db from app.models.user import User data = request.get_json() if not data: return {"error": "No data provided"}, 400 user_data = get_current_user() if not user_data: return {"error": "User not authenticated"}, 401 user = User.query.get(int(user_data["id"])) if not user: return {"error": "User not found"}, 404 # Update allowed fields if "name" in data: name = data["name"].strip() if not name: return {"error": "Name cannot be empty"}, 400 if len(name) > 100: return {"error": "Name too long (max 100 characters)"}, 400 user.name = name try: db.session.commit() # Return fresh user data from database updated_user = { "id": str(user.id), "email": user.email, "name": user.name, "picture": user.picture, "role": user.role, "is_active": user.is_active, "provider": "password", # This endpoint is only for password users "providers": [p.provider for p in user.oauth_providers], "plan": user.plan.to_dict() if user.plan else None, "credits": user.credits, } return {"message": "Profile updated successfully", "user": updated_user} except Exception as e: db.session.rollback() return {"error": f"Failed to update profile: {e!s}"}, 500 @bp.route("/password", methods=["PUT"]) @require_auth def change_password(): """Change or set user password.""" from flask import request from werkzeug.security import check_password_hash from app.database import db from app.models.user import User data = request.get_json() if not data: return {"error": "No data provided"}, 400 user_data = get_current_user() if not user_data: return {"error": "User not authenticated"}, 401 user = User.query.get(int(user_data["id"])) if not user: return {"error": "User not found"}, 404 new_password = data.get("new_password") current_password = data.get("current_password") if not new_password: return {"error": "New password is required"}, 400 # Password validation if len(new_password) < 6: return {"error": "Password must be at least 6 characters long"}, 400 # Check authentication method: if user logged in via password, require current password # If user logged in via OAuth, they can change password without current password current_auth_method = user_data.get("provider", "unknown") if user.password_hash and current_auth_method == "password": # User has a password AND logged in via password, require current password for verification if not current_password: return { "error": "Current password is required to change password", }, 400 if not check_password_hash(user.password_hash, current_password): return {"error": "Current password is incorrect"}, 400 # If user logged in via OAuth (google, github, etc.), they can change password without current password # Set the new password try: user.set_password(new_password) db.session.commit() return {"message": "Password updated successfully"} except Exception as e: db.session.rollback() return {"error": f"Failed to update password: {e!s}"}, 500