75 lines
2.6 KiB
Python
75 lines
2.6 KiB
Python
"""JWT token service for handling access and refresh tokens."""
|
|
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import jwt
|
|
|
|
|
|
class TokenService:
|
|
"""Service for handling JWT tokens."""
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize the token service."""
|
|
self.secret_key = os.environ.get("JWT_SECRET_KEY", "jwt-secret-key")
|
|
self.algorithm = "HS256"
|
|
self.access_token_expire_minutes = 15
|
|
self.refresh_token_expire_days = 7
|
|
|
|
def generate_access_token(self, user_data: dict[str, Any]) -> str:
|
|
"""Generate an access token for the user."""
|
|
payload = {
|
|
"user_id": user_data["id"],
|
|
"email": user_data["email"],
|
|
"name": user_data["name"],
|
|
"type": "access",
|
|
"exp": datetime.now(timezone.utc) + timedelta(
|
|
minutes=self.access_token_expire_minutes
|
|
),
|
|
"iat": datetime.now(timezone.utc),
|
|
}
|
|
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
|
|
|
|
def generate_refresh_token(self, user_data: dict[str, Any]) -> str:
|
|
"""Generate a refresh token for the user."""
|
|
payload = {
|
|
"user_id": user_data["id"],
|
|
"type": "refresh",
|
|
"exp": datetime.now(timezone.utc) + timedelta(
|
|
days=self.refresh_token_expire_days
|
|
),
|
|
"iat": datetime.now(timezone.utc),
|
|
}
|
|
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
|
|
|
|
def verify_token(self, token: str) -> dict[str, Any] | None:
|
|
"""Verify and decode a JWT token."""
|
|
try:
|
|
payload = jwt.decode(
|
|
token, self.secret_key, algorithms=[self.algorithm]
|
|
)
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
return None
|
|
except jwt.InvalidTokenError:
|
|
return None
|
|
|
|
def is_access_token(self, payload: dict[str, Any]) -> bool:
|
|
"""Check if the token payload is for an access token."""
|
|
return payload.get("type") == "access"
|
|
|
|
def is_refresh_token(self, payload: dict[str, Any]) -> bool:
|
|
"""Check if the token payload is for a refresh token."""
|
|
return payload.get("type") == "refresh"
|
|
|
|
def get_user_from_access_token(self, token: str) -> dict[str, Any] | None:
|
|
"""Extract user data from access token."""
|
|
payload = self.verify_token(token)
|
|
if payload and self.is_access_token(payload):
|
|
return {
|
|
"id": payload["user_id"],
|
|
"email": payload["email"],
|
|
"name": payload["name"],
|
|
}
|
|
return None |