feat: Add sorting by name for soundboard sounds and improve socket emission logging

This commit is contained in:
JSC
2025-07-18 21:10:08 +02:00
parent 010f18bff4
commit d0bda6c930
2 changed files with 234 additions and 4 deletions

View File

@@ -1,6 +1,17 @@
"""Main routes for the application.""" """Main routes for the application."""
from flask import Blueprint from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from flask import Blueprint, request
from sqlalchemy import desc, func
from app.database import db
from app.models.playlist import Playlist
from app.models.sound import Sound
from app.models.sound_played import SoundPlayed
from app.models.user import User
from app.services.decorators import require_auth
bp = Blueprint("main", __name__) bp = Blueprint("main", __name__)
@@ -9,3 +20,213 @@ bp = Blueprint("main", __name__)
def health() -> dict[str, str]: def health() -> dict[str, str]:
"""Health check endpoint.""" """Health check endpoint."""
return {"status": "ok"} return {"status": "ok"}
def get_period_filter(period: str) -> datetime | None:
"""Get the start date for the specified period."""
now = datetime.now(tz=ZoneInfo("UTC"))
if period == "today":
return now.replace(hour=0, minute=0, second=0, microsecond=0)
if period == "week":
return now - timedelta(days=7)
if period == "month":
return now - timedelta(days=30)
if period == "year":
return now - timedelta(days=365)
if period == "all":
return None
# Default to all time
return None
@bp.route("/dashboard/stats")
@require_auth
def dashboard_stats() -> dict:
"""Get dashboard statistics."""
# Count soundboard sounds (type = SDB)
soundboard_count = Sound.query.filter_by(type="SDB").count()
# Count tracks (type = STR)
track_count = Sound.query.filter_by(type="STR").count()
# Count playlists
playlist_count = Playlist.query.count()
# Calculate total size of all sounds (original + normalized)
total_size_result = db.session.query(
func.sum(Sound.size).label("original_size"),
func.sum(Sound.normalized_size).label("normalized_size"),
).first()
original_size = getattr(total_size_result, "original_size", 0) or 0
normalized_size = getattr(total_size_result, "normalized_size", 0) or 0
total_size = original_size + normalized_size
return {
"soundboard_sounds": soundboard_count,
"tracks": track_count,
"playlists": playlist_count,
"total_size": total_size,
"original_size": original_size,
"normalized_size": normalized_size,
}
@bp.route("/dashboard/top-sounds")
@require_auth
def top_sounds() -> dict:
"""Get top played sounds for a specific period."""
period = request.args.get("period", "all")
limit = int(request.args.get("limit", 5))
period_start = get_period_filter(period)
# Base query for soundboard sounds with play counts
query = (
db.session.query(
Sound.id,
Sound.name,
Sound.filename,
Sound.thumbnail,
Sound.type,
func.count(SoundPlayed.id).label("play_count"),
)
.outerjoin(SoundPlayed, Sound.id == SoundPlayed.sound_id)
.filter(Sound.type == "SDB") # Only soundboard sounds
.group_by(
Sound.id,
Sound.name,
Sound.filename,
Sound.thumbnail,
Sound.type,
)
)
# Apply period filter if specified
if period_start:
query = query.filter(SoundPlayed.played_at >= period_start)
# Order by play count and limit results
results = query.order_by(desc("play_count")).limit(limit).all()
# Convert to list of dictionaries
top_sounds_list = [
{
"id": result.id,
"name": result.name,
"filename": result.filename,
"thumbnail": result.thumbnail,
"type": result.type,
"play_count": result.play_count,
}
for result in results
]
return {
"period": period,
"sounds": top_sounds_list,
}
@bp.route("/dashboard/top-tracks")
@require_auth
def top_tracks() -> dict:
"""Get top played tracks for a specific period."""
period = request.args.get("period", "all")
limit = int(request.args.get("limit", 10))
period_start = get_period_filter(period)
# Base query for tracks with play counts
query = (
db.session.query(
Sound.id,
Sound.name,
Sound.filename,
Sound.thumbnail,
Sound.type,
func.count(SoundPlayed.id).label("play_count"),
)
.outerjoin(SoundPlayed, Sound.id == SoundPlayed.sound_id)
.filter(Sound.type == "STR") # Only tracks
.group_by(
Sound.id,
Sound.name,
Sound.filename,
Sound.thumbnail,
Sound.type,
)
)
# Apply period filter if specified
if period_start:
query = query.filter(SoundPlayed.played_at >= period_start)
# Order by play count and limit results
results = query.order_by(desc("play_count")).limit(limit).all()
# Convert to list of dictionaries
top_tracks_list = [
{
"id": result.id,
"name": result.name,
"filename": result.filename,
"thumbnail": result.thumbnail,
"type": result.type,
"play_count": result.play_count,
}
for result in results
]
return {
"period": period,
"tracks": top_tracks_list,
}
@bp.route("/dashboard/top-users")
@require_auth
def top_users() -> dict:
"""Get top users by play count for a specific period."""
period = request.args.get("period", "all")
limit = int(request.args.get("limit", 10))
period_start = get_period_filter(period)
# Base query for users with play counts
query = (
db.session.query(
User.id,
User.name,
User.email,
User.picture,
func.count(SoundPlayed.id).label("play_count"),
)
.outerjoin(SoundPlayed, User.id == SoundPlayed.user_id)
.group_by(User.id, User.name, User.email, User.picture)
)
# Apply period filter if specified
if period_start:
query = query.filter(SoundPlayed.played_at >= period_start)
# Order by play count and limit results
results = query.order_by(desc("play_count")).limit(limit).all()
# Convert to list of dictionaries
top_users_list = [
{
"id": result.id,
"name": result.name,
"email": result.email,
"picture": result.picture,
"play_count": result.play_count,
}
for result in results
]
return {
"period": period,
"users": top_users_list,
}

View File

@@ -29,6 +29,9 @@ def get_sounds():
# Get sounds from database # Get sounds from database
sounds = Sound.find_by_type(sound_type) sounds = Sound.find_by_type(sound_type)
# Order by name
sounds = sorted(sounds, key=lambda s: s.name.lower())
# Convert to dict format # Convert to dict format
sounds_data = [sound.to_dict() for sound in sounds] sounds_data = [sound.to_dict() for sound in sounds]
@@ -65,12 +68,18 @@ def play_sound(sound_id: int):
# Emit sound_changed event to all connected clients # Emit sound_changed event to all connected clients
try: try:
from app.services.socketio_service import SocketIOService from app.services.socketio_service import SocketIOService
SocketIOService.emit_sound_play_count_changed(sound_id, sound.play_count)
SocketIOService.emit_sound_play_count_changed(
sound_id, sound.play_count
)
except Exception as e: except Exception as e:
# Don't fail the request if socket emission fails # Don't fail the request if socket emission fails
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.warning(f"Failed to emit sound_play_count_changed event: {e}") logger.warning(
f"Failed to emit sound_play_count_changed event: {e}"
)
return jsonify({"message": "Sound playing", "sound_id": sound_id}) return jsonify({"message": "Sound playing", "sound_id": sound_id})
return ( return (