feat(auth): enhance user authentication by querying user data from the database and simplifying access token generation

This commit is contained in:
JSC
2025-07-01 02:41:25 +02:00
parent a7210a8d50
commit e63c7a0767
2 changed files with 23 additions and 28 deletions

View File

@@ -17,23 +17,31 @@ def get_user_from_jwt() -> dict[str, Any] | None:
if not current_user_id: if not current_user_id:
return None return None
claims = get_jwt() # Query database for user data instead of using JWT claims
is_active = claims.get("is_active", True) from app.models.user import User
if not is_active: user = User.query.get(int(current_user_id))
if not user or not user.is_active:
return None return None
# Build comprehensive providers list
providers = [p.provider for p in user.oauth_providers]
if user.password_hash:
providers.append("password")
if user.api_token:
providers.append("api_token")
return { return {
"id": current_user_id, "id": str(user.id),
"email": claims.get("email", ""), "email": user.email,
"name": claims.get("name", ""), "name": user.name,
"picture": claims.get("picture"), "picture": user.picture,
"role": claims.get("role", "user"), "role": user.role,
"is_active": is_active, "is_active": user.is_active,
"provider": claims.get("provider", "unknown"), "provider": "jwt",
"providers": claims.get("providers", []), "providers": providers,
"plan": claims.get("plan"), "plan": user.plan.to_dict() if user.plan else None,
"credits": claims.get("credits"), "credits": user.credits,
} }
except Exception: except Exception:
return None return None

View File

@@ -10,20 +10,7 @@ class TokenService:
def generate_access_token(self, user_data: dict[str, Any]) -> str: def generate_access_token(self, user_data: dict[str, Any]) -> str:
"""Generate an access token for the user.""" """Generate an access token for the user."""
return create_access_token( return create_access_token(identity=user_data["id"])
identity=user_data["id"],
additional_claims={
"email": user_data["email"],
"name": user_data["name"],
"picture": user_data.get("picture"),
"role": user_data.get("role"),
"is_active": user_data.get("is_active"),
"provider": user_data.get("provider"),
"providers": user_data.get("providers", []),
"plan": user_data.get("plan"),
"credits": user_data.get("credits"),
},
)
def generate_refresh_token(self, user_data: dict[str, Any]) -> str: def generate_refresh_token(self, user_data: dict[str, Any]) -> str:
"""Generate a refresh token for the user.""" """Generate a refresh token for the user."""