refactor(decorators): simplify require_admin decorator by reusing require_role

This commit is contained in:
JSC
2025-07-04 19:13:33 +02:00
parent 5c29fa1a4c
commit 4375718c2f
3 changed files with 107 additions and 92 deletions

View File

@@ -1,10 +1,9 @@
"""Sound played tracking model."""
from datetime import datetime
from typing import List, Optional
from app.database import db
from sqlalchemy import DateTime, ForeignKey, Integer, String
from sqlalchemy import DateTime, ForeignKey, Integer, func, text
from sqlalchemy.orm import Mapped, mapped_column, relationship
@@ -17,16 +16,15 @@ class SoundPlayed(db.Model):
# Foreign keys
user_id: Mapped[int] = mapped_column(
Integer, ForeignKey("users.id"), nullable=False
Integer, ForeignKey("users.id"), nullable=False,
)
sound_id: Mapped[int] = mapped_column(
Integer, ForeignKey("sounds.id"), nullable=False
Integer, ForeignKey("sounds.id"), nullable=False,
)
# Timestamp
played_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, nullable=False
DateTime, default=datetime.utcnow, nullable=False,
)
# Relationships
@@ -34,8 +32,11 @@ class SoundPlayed(db.Model):
sound: Mapped["Sound"] = relationship("Sound", backref="play_history")
def __repr__(self) -> str:
"""String representation of SoundPlayed."""
return f"<SoundPlayed user_id={self.user_id} sound_id={self.sound_id} at={self.played_at}>"
"""Return string representation of SoundPlayed."""
return (
f"<SoundPlayed user_id={self.user_id} sound_id={self.sound_id} "
f"at={self.played_at}>"
)
def to_dict(self) -> dict:
"""Convert sound played record to dictionary."""
@@ -62,6 +63,7 @@ class SoundPlayed(db.Model):
cls,
user_id: int,
sound_id: int,
*,
commit: bool = True,
) -> "SoundPlayed":
"""Create a new sound played record."""
@@ -77,8 +79,8 @@ class SoundPlayed(db.Model):
@classmethod
def get_user_plays(
cls, user_id: int, limit: int = 50, offset: int = 0
) -> List["SoundPlayed"]:
cls, user_id: int, limit: int = 50, offset: int = 0,
) -> list["SoundPlayed"]:
"""Get recent plays for a specific user."""
return (
cls.query.filter_by(user_id=user_id)
@@ -90,8 +92,8 @@ class SoundPlayed(db.Model):
@classmethod
def get_sound_plays(
cls, sound_id: int, limit: int = 50, offset: int = 0
) -> List["SoundPlayed"]:
cls, sound_id: int, limit: int = 50, offset: int = 0,
) -> list["SoundPlayed"]:
"""Get recent plays for a specific sound."""
return (
cls.query.filter_by(sound_id=sound_id)
@@ -103,8 +105,8 @@ class SoundPlayed(db.Model):
@classmethod
def get_recent_plays(
cls, limit: int = 100, offset: int = 0
) -> List["SoundPlayed"]:
cls, limit: int = 100, offset: int = 0,
) -> list["SoundPlayed"]:
"""Get recent plays across all users and sounds."""
return (
cls.query.order_by(cls.played_at.desc())
@@ -125,10 +127,10 @@ class SoundPlayed(db.Model):
@classmethod
def get_popular_sounds(
cls, limit: int = 10, days: int | None = None
) -> List[dict]:
cls, limit: int = 10, days: int | None = None,
) -> list[dict]:
"""Get most popular sounds with play counts."""
from sqlalchemy import func, text
from app.models.sound import Sound
query = (
db.session.query(
@@ -150,14 +152,16 @@ class SoundPlayed(db.Model):
# Get sound details
popular_sounds = []
for result in results:
from app.models.sound import Sound
sound = Sound.query.get(result.sound_id)
if sound:
popular_sounds.append({
"sound": sound.to_dict(),
"play_count": result.play_count,
"last_played": result.last_played.isoformat() if result.last_played else None,
"last_played": (
result.last_played.isoformat()
if result.last_played
else None
),
})
return popular_sounds
@@ -165,7 +169,7 @@ class SoundPlayed(db.Model):
@classmethod
def get_user_stats(cls, user_id: int) -> dict:
"""Get comprehensive stats for a user."""
from sqlalchemy import func
from app.models.sound import Sound
total_plays = cls.query.filter_by(user_id=user_id).count()
@@ -199,8 +203,6 @@ class SoundPlayed(db.Model):
favorite_sound = None
if favorite_query:
from app.models.sound import Sound
sound = Sound.query.get(favorite_query.sound_id)
if sound:
favorite_sound = {
@@ -224,6 +226,10 @@ class SoundPlayed(db.Model):
"total_plays": total_plays,
"unique_sounds": unique_sounds,
"favorite_sound": favorite_sound,
"first_play": first_play.played_at.isoformat() if first_play else None,
"last_play": last_play.played_at.isoformat() if last_play else None,
"first_play": (
first_play.played_at.isoformat() if first_play else None
),
"last_play": (
last_play.played_at.isoformat() if last_play else None
),
}

View File

@@ -1,10 +1,15 @@
"""Soundboard routes."""
from flask import Blueprint, jsonify, request
from app.models.sound import Sound, SoundType
from app.models.sound_played import SoundPlayed
from app.services.decorators import (
get_current_user,
require_auth,
require_credits,
)
from app.services.vlc_service import vlc_service
from app.services.decorators import require_auth, get_current_user
bp = Blueprint("soundboard", __name__, url_prefix="/api/soundboard")
@@ -40,6 +45,7 @@ def get_sounds():
@bp.route("/sounds/<int:sound_id>/play", methods=["POST"])
@require_auth
@require_credits(1)
def play_sound(sound_id: int):
"""Play a specific sound."""
try:
@@ -61,9 +67,10 @@ def play_sound(sound_id: int):
if success:
return jsonify({"message": "Sound playing", "sound_id": sound_id})
else:
return jsonify(
{"error": "Sound not found or cannot be played"}
), 404
return (
jsonify({"error": "Sound not found or cannot be played"}),
404,
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@@ -78,15 +85,18 @@ def stop_all_sounds():
# Wait a moment and check if any are still playing
import time
time.sleep(0.2)
# If there are still instances, force stop them
if vlc_service.get_playing_count() > 0:
stopped_count = vlc_service.force_stop_all()
return jsonify({
"message": f"Force stopped {stopped_count} sounds",
"forced": True
})
return jsonify(
{
"message": f"Force stopped {stopped_count} sounds",
"forced": True,
}
)
return jsonify({"message": "All sounds stopped"})
except Exception as e:
@@ -99,10 +109,12 @@ def force_stop_all_sounds():
"""Force stop all sounds with aggressive cleanup."""
try:
stopped_count = vlc_service.force_stop_all()
return jsonify({
"message": f"Force stopped {stopped_count} sound instances",
"stopped_count": stopped_count
})
return jsonify(
{
"message": f"Force stopped {stopped_count} sound instances",
"stopped_count": stopped_count,
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@@ -118,11 +130,13 @@ def get_status():
with vlc_service.lock:
processes = []
for process_id, process in vlc_service.processes.items():
processes.append({
"id": process_id,
"pid": process.pid,
"running": process.poll() is None,
})
processes.append(
{
"id": process_id,
"pid": process.pid,
"running": process.poll() is None,
}
)
return jsonify(
{
@@ -144,13 +158,17 @@ def get_play_history():
per_page = min(int(request.args.get("per_page", 50)), 100)
offset = (page - 1) * per_page
recent_plays = SoundPlayed.get_recent_plays(limit=per_page, offset=offset)
recent_plays = SoundPlayed.get_recent_plays(
limit=per_page, offset=offset
)
return jsonify({
"plays": [play.to_dict() for play in recent_plays],
"page": page,
"per_page": per_page,
})
return jsonify(
{
"plays": [play.to_dict() for play in recent_plays],
"page": page,
"per_page": per_page,
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@@ -169,14 +187,18 @@ def get_my_play_history():
per_page = min(int(request.args.get("per_page", 50)), 100)
offset = (page - 1) * per_page
user_plays = SoundPlayed.get_user_plays(user_id=user_id, limit=per_page, offset=offset)
user_plays = SoundPlayed.get_user_plays(
user_id=user_id, limit=per_page, offset=offset
)
return jsonify({
"plays": [play.to_dict() for play in user_plays],
"page": page,
"per_page": per_page,
"user_id": user_id,
})
return jsonify(
{
"plays": [play.to_dict() for play in user_plays],
"page": page,
"per_page": per_page,
"user_id": user_id,
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@@ -209,10 +231,12 @@ def get_popular_sounds():
popular_sounds = SoundPlayed.get_popular_sounds(limit=limit, days=days)
return jsonify({
"popular_sounds": popular_sounds,
"limit": limit,
"days": days,
})
return jsonify(
{
"popular_sounds": popular_sounds,
"limit": limit,
"days": days,
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500

View File

@@ -148,22 +148,7 @@ def require_role(required_role: str):
def require_admin(f):
"""Decorator to require admin role for routes."""
@wraps(f)
def wrapper(*args, **kwargs):
user = get_current_user()
if not user:
return jsonify({"error": "Authentication required"}), 401
if user.get("role") != "admin":
return (
jsonify({"error": "Access denied. Admin role required"}),
403,
)
return f(*args, **kwargs)
return wrapper
return require_role("admin")(f)
def require_credits(credits_needed: int):