Files
sdb-back/app/routes/auth.py

384 lines
12 KiB
Python

"""Authentication routes."""
from flask import Blueprint, jsonify, url_for
from flask_jwt_extended import (
create_access_token,
get_jwt_identity,
jwt_required,
)
from app import auth_service
from app.services.decorators import get_current_user, require_auth
bp = Blueprint("auth", __name__)
@bp.route("/login/<provider>")
def login_oauth(provider):
"""Initiate OAuth login for specified provider."""
redirect_uri = url_for("auth.callback", provider=provider, _external=True)
return auth_service.redirect_to_login(provider, redirect_uri)
@bp.route("/callback/<provider>")
def callback(provider):
"""Handle OAuth callback from specified provider."""
from flask import make_response, redirect
try:
auth_response = auth_service.handle_callback(provider)
# If successful, redirect to frontend dashboard with cookies
if auth_response.status_code == 200:
redirect_response = make_response(
redirect("http://localhost:3000/dashboard"),
)
# Copy all cookies from the auth response
for cookie in auth_response.headers.getlist("Set-Cookie"):
redirect_response.headers.add("Set-Cookie", cookie)
return redirect_response
# If there was an error, redirect to login with error
return redirect("http://localhost:3000/login?error=oauth_failed")
except Exception as e:
error_msg = str(e).replace(" ", "_").replace('"', "")
return redirect(f"http://localhost:3000/login?error={error_msg}")
@bp.route("/providers")
def providers():
"""Get list of available OAuth providers."""
return {"providers": auth_service.get_available_providers()}
@bp.route("/register", methods=["POST"])
def register():
"""Register new user with email and password."""
from flask import request
data = request.get_json()
if not data:
return {"error": "No data provided"}, 400
email = data.get("email")
password = data.get("password")
name = data.get("name")
if not email or not password or not name:
return {"error": "Email, password, and name are required"}, 400
# Basic email validation
if "@" not in email or "." not in email:
return {"error": "Invalid email format"}, 400
# Basic password validation
if len(password) < 6:
return {"error": "Password must be at least 6 characters long"}, 400
return auth_service.register_with_password(email, password, name)
@bp.route("/login", methods=["POST"])
def login():
"""Login user with email and password."""
from flask import request
data = request.get_json()
if not data:
return {"error": "No data provided"}, 400
email = data.get("email")
password = data.get("password")
if not email or not password:
return {"error": "Email and password are required"}, 400
return auth_service.login_with_password(email, password)
@bp.route("/logout")
def logout():
"""Logout current user."""
return auth_service.logout()
@bp.route("/refresh", methods=["POST"])
@jwt_required(refresh=True)
def refresh():
"""Refresh access token using refresh token."""
current_user_id = get_jwt_identity()
# Create new access token
new_access_token = create_access_token(identity=current_user_id)
response = jsonify({"message": "Token refreshed"})
# Set new access token cookie
from flask_jwt_extended import set_access_cookies
set_access_cookies(response, new_access_token)
return response
@bp.route("/link/<provider>")
@jwt_required()
def link_provider(provider):
"""Link a new OAuth provider to current user account."""
redirect_uri = url_for(
"auth.link_callback", provider=provider, _external=True,
)
return auth_service.redirect_to_login(provider, redirect_uri)
@bp.route("/link/callback/<provider>")
@jwt_required()
def link_callback(provider):
"""Handle OAuth callback for linking new provider."""
try:
current_user_id = get_jwt_identity()
if not current_user_id:
return {"error": "User not authenticated"}, 401
# Get current user from database
from app.models.user import User
user = User.query.get(current_user_id)
if not user:
return {"error": "User not found"}, 404
# Process OAuth callback but link to existing user
from authlib.integrations.flask_client import OAuth
from app.services.oauth_providers.registry import OAuthProviderRegistry
oauth = OAuth()
registry = OAuthProviderRegistry(oauth)
oauth_provider = registry.get_provider(provider)
if not oauth_provider:
return {"error": f"OAuth provider '{provider}' not configured"}, 400
token = oauth_provider.exchange_code_for_token(None, None)
raw_user_info = oauth_provider.get_user_info(token)
provider_data = oauth_provider.normalize_user_data(raw_user_info)
if not provider_data.get("id"):
return {
"error": "Failed to get user information from provider",
}, 400
# Check if this provider is already linked to another user
from app.models.user_oauth import UserOAuth
existing_provider = UserOAuth.find_by_provider_and_id(
provider, provider_data["id"],
)
if existing_provider and existing_provider.user_id != user.id:
return {
"error": "This provider account is already linked to another user",
}, 409
# Link the provider to current user
UserOAuth.create_or_update(
user_id=user.id,
provider=provider,
provider_id=provider_data["id"],
email=provider_data["email"],
name=provider_data["name"],
picture=provider_data.get("picture"),
)
return {"message": f"{provider.title()} account linked successfully"}
except Exception as e:
return {"error": str(e)}, 400
@bp.route("/unlink/<provider>", methods=["DELETE"])
@jwt_required()
def unlink_provider(provider):
"""Unlink an OAuth provider from current user account."""
try:
current_user_id = get_jwt_identity()
if not current_user_id:
return {"error": "User not authenticated"}, 401
from app.database import db
from app.models.user import User
user = User.query.get(current_user_id)
if not user:
return {"error": "User not found"}, 404
# Check if user has more than one provider (prevent locking out)
if len(user.oauth_providers) <= 1:
return {"error": "Cannot unlink last authentication provider"}, 400
# Find and remove the provider
oauth_provider = user.get_provider(provider)
if not oauth_provider:
return {
"error": f"Provider '{provider}' not linked to this account",
}, 404
db.session.delete(oauth_provider)
db.session.commit()
return {"message": f"{provider.title()} account unlinked successfully"}
except Exception as e:
return {"error": str(e)}, 400
@bp.route("/regenerate-api-token", methods=["POST"])
@jwt_required()
def regenerate_api_token():
"""Regenerate API token for current user."""
current_user_id = get_jwt_identity()
if not current_user_id:
return {"error": "User not authenticated"}, 401
from app.database import db
from app.models.user import User
user = User.query.get(current_user_id)
if not user:
return {"error": "User not found"}, 404
# Generate new API token
new_token = user.generate_api_token()
db.session.commit()
return {
"message": "API token regenerated successfully",
"api_token": new_token,
"expires_at": (
user.api_token_expires_at.isoformat()
if user.api_token_expires_at
else None
),
}
@bp.route("/me")
@require_auth
def me():
"""Get current user information."""
user = get_current_user()
return {"user": user}
@bp.route("/profile", methods=["PATCH"])
@require_auth
def update_profile():
"""Update current user profile information."""
from flask import request
from app.database import db
from app.models.user import User
data = request.get_json()
if not data:
return {"error": "No data provided"}, 400
user_data = get_current_user()
if not user_data:
return {"error": "User not authenticated"}, 401
user = User.query.get(int(user_data["id"]))
if not user:
return {"error": "User not found"}, 404
# Update allowed fields
if "name" in data:
name = data["name"].strip()
if not name:
return {"error": "Name cannot be empty"}, 400
if len(name) > 100:
return {"error": "Name too long (max 100 characters)"}, 400
user.name = name
try:
db.session.commit()
# Return fresh user data from database
updated_user = {
"id": str(user.id),
"email": user.email,
"name": user.name,
"picture": user.picture,
"role": user.role,
"is_active": user.is_active,
"provider": "password", # This endpoint is only for password users
"providers": [p.provider for p in user.oauth_providers],
"plan": user.plan.to_dict() if user.plan else None,
"credits": user.credits,
}
return {"message": "Profile updated successfully", "user": updated_user}
except Exception as e:
db.session.rollback()
return {"error": f"Failed to update profile: {e!s}"}, 500
@bp.route("/password", methods=["PUT"])
@require_auth
def change_password():
"""Change or set user password."""
from flask import request
from werkzeug.security import check_password_hash
from app.database import db
from app.models.user import User
data = request.get_json()
if not data:
return {"error": "No data provided"}, 400
user_data = get_current_user()
if not user_data:
return {"error": "User not authenticated"}, 401
user = User.query.get(int(user_data["id"]))
if not user:
return {"error": "User not found"}, 404
new_password = data.get("new_password")
current_password = data.get("current_password")
if not new_password:
return {"error": "New password is required"}, 400
# Password validation
if len(new_password) < 6:
return {"error": "Password must be at least 6 characters long"}, 400
# Check authentication method: if user logged in via password, require current password
# If user logged in via OAuth, they can change password without current password
current_auth_method = user_data.get("provider", "unknown")
if user.password_hash and current_auth_method == "password":
# User has a password AND logged in via password, require current password for verification
if not current_password:
return {
"error": "Current password is required to change password",
}, 400
if not check_password_hash(user.password_hash, current_password):
return {"error": "Current password is incorrect"}, 400
# If user logged in via OAuth (google, github, etc.), they can change password without current password
# Set the new password
try:
user.set_password(new_password)
db.session.commit()
return {"message": "Password updated successfully"}
except Exception as e:
db.session.rollback()
return {"error": f"Failed to update password: {e!s}"}, 500