feat: Add endpoint and service method to retrieve top sounds by play count with filtering options
Some checks failed
Backend CI / lint (push) Failing after 4m54s
Backend CI / test (push) Failing after 4m24s

This commit is contained in:
JSC
2025-08-11 22:04:42 +02:00
parent 53b6c4bca5
commit c69a45c9b4
3 changed files with 151 additions and 2 deletions

View File

@@ -2,7 +2,7 @@
from typing import Annotated, Any from typing import Annotated, Any
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends, Query
from app.core.dependencies import get_current_user, get_dashboard_service from app.core.dependencies import get_current_user, get_dashboard_service
from app.models.user import User from app.models.user import User
@@ -27,3 +27,19 @@ async def get_track_statistics(
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Get track statistics.""" """Get track statistics."""
return await dashboard_service.get_track_statistics() return await dashboard_service.get_track_statistics()
@router.get("/top-sounds")
async def get_top_sounds(
_current_user: Annotated[User, Depends(get_current_user)],
dashboard_service: Annotated[DashboardService, Depends(get_dashboard_service)],
sound_type: Annotated[str, Query(description="Sound type filter (SDB, TTS, EXT, or 'all')")],
period: Annotated[str, Query(description="Time period (today, 1_day, 1_week, 1_month, 1_year, all_time)")] = "all_time",
limit: Annotated[int, Query(description="Number of top sounds to return", ge=1, le=100)] = 10,
) -> list[dict[str, Any]]:
"""Get top sounds by play count for a specific period."""
return await dashboard_service.get_top_sounds(
sound_type=sound_type,
period=period,
limit=limit
)

View File

@@ -1,12 +1,15 @@
"""Sound repository for database operations.""" """Sound repository for database operations."""
from datetime import datetime
from enum import Enum from enum import Enum
from sqlalchemy import func from sqlalchemy import func
from sqlmodel import col, select from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.logging import get_logger from app.core.logging import get_logger
from app.models.sound import Sound from app.models.sound import Sound
from app.models.sound_played import SoundPlayed
from app.repositories.base import BaseRepository from app.repositories.base import BaseRepository
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -224,3 +227,71 @@ class SoundRepository(BaseRepository[Sound]):
except Exception: except Exception:
logger.exception("Failed to get track statistics") logger.exception("Failed to get track statistics")
raise raise
async def get_top_sounds(
self,
sound_type: str,
date_filter: datetime | None = None,
limit: int = 10,
) -> list[dict]:
"""Get top sounds by play count for a specific type and period using SoundPlayed records."""
try:
# Join SoundPlayed with Sound and count plays within the period
statement = (
select(
Sound.id,
Sound.name,
Sound.type,
Sound.duration,
Sound.created_at,
func.count(SoundPlayed.id).label("play_count")
)
.select_from(SoundPlayed)
.join(Sound, SoundPlayed.sound_id == Sound.id)
)
# Apply sound type filter
if sound_type != "all":
statement = statement.where(Sound.type == sound_type.upper())
# Apply date filter if provided
if date_filter:
statement = statement.where(SoundPlayed.created_at >= date_filter)
# Group by sound and order by play count descending
statement = (
statement
.group_by(
Sound.id,
Sound.name,
Sound.type,
Sound.duration,
Sound.created_at
)
.order_by(func.count(SoundPlayed.id).desc())
.limit(limit)
)
result = await self.session.exec(statement)
rows = result.all()
# Convert to dictionaries with the play count from the period
return [
{
"id": row.id,
"name": row.name,
"type": row.type,
"play_count": row.play_count,
"duration": row.duration,
"created_at": row.created_at,
}
for row in rows
]
except Exception:
logger.exception(
"Failed to get top sounds: type=%s, date_filter=%s, limit=%s",
sound_type,
date_filter,
limit,
)
raise

View File

@@ -1,5 +1,6 @@
"""Dashboard service for statistics and analytics.""" """Dashboard service for statistics and analytics."""
from datetime import UTC, datetime, timedelta
from typing import Any from typing import Any
from app.core.logging import get_logger from app.core.logging import get_logger
@@ -44,3 +45,64 @@ class DashboardService:
except Exception: except Exception:
logger.exception("Failed to get track statistics") logger.exception("Failed to get track statistics")
raise raise
async def get_top_sounds(
self,
sound_type: str,
period: str = "all_time",
limit: int = 10,
) -> list[dict[str, Any]]:
"""Get top sounds by play count for a specific period."""
try:
# Calculate the date filter based on period
date_filter = self._get_date_filter(period)
# Get top sounds from repository
top_sounds = await self.sound_repository.get_top_sounds(
sound_type=sound_type,
date_filter=date_filter,
limit=limit,
)
return [
{
"id": sound["id"],
"name": sound["name"],
"type": sound["type"],
"play_count": sound["play_count"],
"duration": sound["duration"],
"created_at": (
sound["created_at"].isoformat()
if sound["created_at"]
else None
),
}
for sound in top_sounds
]
except Exception:
logger.exception(
"Failed to get top sounds for type=%s, period=%s",
sound_type,
period,
)
raise
def _get_date_filter(self, period: str) -> datetime | None:
"""Calculate the date filter based on the period."""
now = datetime.now(UTC)
match period:
case "today":
return now.replace(hour=0, minute=0, second=0, microsecond=0)
case "1_day":
return now - timedelta(days=1)
case "1_week":
return now - timedelta(weeks=1)
case "1_month":
return now - timedelta(days=30)
case "1_year":
return now - timedelta(days=365)
case "all_time":
return None
case _:
return None # Default to all time for unknown periods