462 lines
16 KiB
Python
462 lines
16 KiB
Python
"""Sound management API endpoints."""
|
|
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from app.core.database import get_db, get_session_factory
|
|
from app.core.dependencies import get_current_active_user_flexible
|
|
from app.models.credit_action import CreditActionType
|
|
from app.models.user import User
|
|
from app.repositories.sound import SoundRepository
|
|
from app.services.credit import CreditService, InsufficientCreditsError
|
|
from app.services.extraction import ExtractionInfo, ExtractionService
|
|
from app.services.extraction_processor import extraction_processor
|
|
from app.services.sound_normalizer import NormalizationResults, SoundNormalizerService
|
|
from app.services.sound_scanner import ScanResults, SoundScannerService
|
|
from app.services.vlc_player import VLCPlayerService, get_vlc_player_service
|
|
|
|
router = APIRouter(prefix="/sounds", tags=["sounds"])
|
|
|
|
|
|
async def get_sound_scanner_service(
|
|
session: Annotated[AsyncSession, Depends(get_db)],
|
|
) -> SoundScannerService:
|
|
"""Get the sound scanner service."""
|
|
return SoundScannerService(session)
|
|
|
|
|
|
async def get_sound_normalizer_service(
|
|
session: Annotated[AsyncSession, Depends(get_db)],
|
|
) -> SoundNormalizerService:
|
|
"""Get the sound normalizer service."""
|
|
return SoundNormalizerService(session)
|
|
|
|
|
|
async def get_extraction_service(
|
|
session: Annotated[AsyncSession, Depends(get_db)],
|
|
) -> ExtractionService:
|
|
"""Get the extraction service."""
|
|
return ExtractionService(session)
|
|
|
|
|
|
def get_vlc_player() -> VLCPlayerService:
|
|
"""Get the VLC player service."""
|
|
return get_vlc_player_service(get_session_factory())
|
|
|
|
|
|
def get_credit_service() -> CreditService:
|
|
"""Get the credit service."""
|
|
return CreditService(get_session_factory())
|
|
|
|
|
|
async def get_sound_repository(
|
|
session: Annotated[AsyncSession, Depends(get_db)],
|
|
) -> SoundRepository:
|
|
"""Get the sound repository."""
|
|
return SoundRepository(session)
|
|
|
|
|
|
# SCAN
|
|
@router.post("/scan")
|
|
async def scan_sounds(
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
scanner_service: Annotated[SoundScannerService, Depends(get_sound_scanner_service)],
|
|
) -> dict[str, ScanResults | str]:
|
|
"""Sync the soundboard directory (add/update/delete sounds)."""
|
|
# Only allow admins to scan sounds
|
|
if current_user.role not in ["admin", "superadmin"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only administrators can sync sounds",
|
|
)
|
|
|
|
try:
|
|
results = await scanner_service.scan_soundboard_directory()
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to sync sounds: {e!s}",
|
|
) from e
|
|
else:
|
|
return {
|
|
"message": "Sound sync completed",
|
|
"results": results,
|
|
}
|
|
|
|
|
|
@router.post("/scan/custom")
|
|
async def scan_custom_directory(
|
|
directory: str,
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
scanner_service: Annotated[SoundScannerService, Depends(get_sound_scanner_service)],
|
|
sound_type: str = "SDB",
|
|
) -> dict[str, ScanResults | str]:
|
|
"""Sync a custom directory with the database (add/update/delete sounds)."""
|
|
# Only allow admins to sync sounds
|
|
if current_user.role not in ["admin", "superadmin"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only administrators can sync sounds",
|
|
)
|
|
|
|
try:
|
|
results = await scanner_service.scan_directory(directory, sound_type)
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=str(e),
|
|
) from e
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to sync directory: {e!s}",
|
|
) from e
|
|
else:
|
|
return {
|
|
"message": f"Sync of directory '{directory}' completed",
|
|
"results": results,
|
|
}
|
|
|
|
|
|
# NORMALIZE
|
|
@router.post("/normalize/all")
|
|
async def normalize_all_sounds(
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
normalizer_service: Annotated[
|
|
SoundNormalizerService, Depends(get_sound_normalizer_service),
|
|
],
|
|
force: Annotated[bool, Query( # noqa: FBT002
|
|
description="Force normalization of already normalized sounds",
|
|
)] = False,
|
|
one_pass: Annotated[bool | None, Query(
|
|
description="Use one-pass normalization (overrides config)",
|
|
)] = None,
|
|
) -> dict[str, NormalizationResults | str]:
|
|
"""Normalize all unnormalized sounds."""
|
|
# Only allow admins to normalize sounds
|
|
if current_user.role not in ["admin", "superadmin"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only administrators can normalize sounds",
|
|
)
|
|
|
|
try:
|
|
results = await normalizer_service.normalize_all_sounds(
|
|
force=force,
|
|
one_pass=one_pass,
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to normalize sounds: {e!s}",
|
|
) from e
|
|
else:
|
|
return {
|
|
"message": "Sound normalization completed",
|
|
"results": results,
|
|
}
|
|
|
|
|
|
@router.post("/normalize/type/{sound_type}")
|
|
async def normalize_sounds_by_type(
|
|
sound_type: str,
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
normalizer_service: Annotated[
|
|
SoundNormalizerService, Depends(get_sound_normalizer_service),
|
|
],
|
|
force: Annotated[bool, Query( # noqa: FBT002
|
|
description="Force normalization of already normalized sounds",
|
|
)] = False,
|
|
one_pass: Annotated[bool | None, Query(
|
|
description="Use one-pass normalization (overrides config)",
|
|
)] = None,
|
|
) -> dict[str, NormalizationResults | str]:
|
|
"""Normalize all sounds of a specific type (SDB, TTS, EXT)."""
|
|
# Only allow admins to normalize sounds
|
|
if current_user.role not in ["admin", "superadmin"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only administrators can normalize sounds",
|
|
)
|
|
|
|
# Validate sound type
|
|
valid_types = ["SDB", "TTS", "EXT"]
|
|
if sound_type not in valid_types:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid sound type. Must be one of: {', '.join(valid_types)}",
|
|
)
|
|
|
|
try:
|
|
results = await normalizer_service.normalize_sounds_by_type(
|
|
sound_type=sound_type,
|
|
force=force,
|
|
one_pass=one_pass,
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to normalize {sound_type} sounds: {e!s}",
|
|
) from e
|
|
else:
|
|
return {
|
|
"message": f"Normalization of {sound_type} sounds completed",
|
|
"results": results,
|
|
}
|
|
|
|
|
|
@router.post("/normalize/{sound_id}")
|
|
async def normalize_sound_by_id(
|
|
sound_id: int,
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
normalizer_service: Annotated[
|
|
SoundNormalizerService, Depends(get_sound_normalizer_service),
|
|
],
|
|
force: Annotated[bool, Query( # noqa: FBT002
|
|
description="Force normalization of already normalized sound",
|
|
)] = False,
|
|
one_pass: Annotated[bool | None, Query(
|
|
description="Use one-pass normalization (overrides config)",
|
|
)] = None,
|
|
) -> dict[str, str]:
|
|
"""Normalize a specific sound by ID."""
|
|
# Only allow admins to normalize sounds
|
|
if current_user.role not in ["admin", "superadmin"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only administrators can normalize sounds",
|
|
)
|
|
|
|
try:
|
|
# Get the sound
|
|
sound = await normalizer_service.sound_repo.get_by_id(sound_id)
|
|
if not sound:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Sound with ID {sound_id} not found",
|
|
)
|
|
|
|
# Normalize the sound
|
|
result = await normalizer_service.normalize_sound(
|
|
sound=sound,
|
|
force=force,
|
|
one_pass=one_pass,
|
|
)
|
|
|
|
# Check result status
|
|
if result["status"] == "error":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to normalize sound: {result['error']}",
|
|
)
|
|
|
|
return {
|
|
"message": f"Sound normalization {result['status']}: {sound.filename}",
|
|
"status": result["status"],
|
|
"reason": result["reason"] or "",
|
|
"normalized_filename": result["normalized_filename"] or "",
|
|
}
|
|
|
|
except HTTPException:
|
|
# Re-raise HTTPExceptions without wrapping them
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to normalize sound: {e!s}",
|
|
) from e
|
|
|
|
|
|
# EXTRACT
|
|
@router.post("/extract")
|
|
async def create_extraction(
|
|
url: str,
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
extraction_service: Annotated[ExtractionService, Depends(get_extraction_service)],
|
|
) -> dict[str, ExtractionInfo | str]:
|
|
"""Create a new extraction job for a URL."""
|
|
try:
|
|
if current_user.id is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="User ID not available",
|
|
)
|
|
|
|
extraction_info = await extraction_service.create_extraction(
|
|
url, current_user.id,
|
|
)
|
|
|
|
# Queue the extraction for background processing
|
|
await extraction_processor.queue_extraction(extraction_info["id"])
|
|
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=str(e),
|
|
) from e
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create extraction: {e!s}",
|
|
) from e
|
|
else:
|
|
return {
|
|
"message": "Extraction queued successfully",
|
|
"extraction": extraction_info,
|
|
}
|
|
|
|
|
|
@router.get("/extract/status")
|
|
async def get_extraction_processor_status(
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
) -> dict:
|
|
"""Get the status of the extraction processor."""
|
|
# Only allow admins to see processor status
|
|
if current_user.role not in ["admin", "superadmin"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only administrators can view processor status",
|
|
)
|
|
|
|
return extraction_processor.get_status()
|
|
|
|
|
|
@router.get("/extract/{extraction_id}")
|
|
async def get_extraction(
|
|
extraction_id: int,
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)], # noqa: ARG001
|
|
extraction_service: Annotated[ExtractionService, Depends(get_extraction_service)],
|
|
) -> ExtractionInfo:
|
|
"""Get extraction information by ID."""
|
|
try:
|
|
extraction_info = await extraction_service.get_extraction_by_id(extraction_id)
|
|
|
|
if not extraction_info:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Extraction {extraction_id} not found",
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get extraction: {e!s}",
|
|
) from e
|
|
else:
|
|
return extraction_info
|
|
|
|
|
|
@router.get("/extract")
|
|
async def get_user_extractions(
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
extraction_service: Annotated[ExtractionService, Depends(get_extraction_service)],
|
|
) -> dict[str, list[ExtractionInfo]]:
|
|
"""Get all extractions for the current user."""
|
|
try:
|
|
if current_user.id is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="User ID not available",
|
|
)
|
|
|
|
extractions = await extraction_service.get_user_extractions(current_user.id)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get extractions: {e!s}",
|
|
) from e
|
|
else:
|
|
return {
|
|
"extractions": extractions,
|
|
}
|
|
|
|
|
|
# VLC PLAYER
|
|
@router.post("/vlc/play/{sound_id}")
|
|
async def play_sound_with_vlc(
|
|
sound_id: int,
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
|
|
vlc_player: Annotated[VLCPlayerService, Depends(get_vlc_player)],
|
|
sound_repo: Annotated[SoundRepository, Depends(get_sound_repository)],
|
|
credit_service: Annotated[CreditService, Depends(get_credit_service)],
|
|
) -> dict[str, str | int | bool]:
|
|
"""Play a sound using VLC subprocess (requires 1 credit)."""
|
|
try:
|
|
# Get the sound
|
|
sound = await sound_repo.get_by_id(sound_id)
|
|
if not sound:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Sound with ID {sound_id} not found",
|
|
)
|
|
|
|
# Check and validate credits before playing
|
|
try:
|
|
await credit_service.validate_and_reserve_credits(
|
|
current_user.id,
|
|
CreditActionType.VLC_PLAY_SOUND,
|
|
)
|
|
except InsufficientCreditsError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_402_PAYMENT_REQUIRED,
|
|
detail=(
|
|
f"Insufficient credits: {e.required} required, "
|
|
f"{e.available} available"
|
|
),
|
|
) from e
|
|
|
|
# Play the sound using VLC
|
|
success = await vlc_player.play_sound(sound)
|
|
|
|
# Deduct credits based on success
|
|
await credit_service.deduct_credits(
|
|
current_user.id,
|
|
CreditActionType.VLC_PLAY_SOUND,
|
|
success=success,
|
|
metadata={"sound_id": sound_id, "sound_name": sound.name},
|
|
)
|
|
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to launch VLC for sound playback",
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to play sound: {e!s}",
|
|
) from e
|
|
else:
|
|
return {
|
|
"message": f"Sound '{sound.name}' is now playing via VLC",
|
|
"sound_id": sound_id,
|
|
"sound_name": sound.name,
|
|
"success": True,
|
|
"credits_deducted": 1,
|
|
}
|
|
|
|
|
|
|
|
@router.post("/vlc/stop-all")
|
|
async def stop_all_vlc_instances(
|
|
current_user: Annotated[User, Depends(get_current_active_user_flexible)], # noqa: ARG001
|
|
vlc_player: Annotated[VLCPlayerService, Depends(get_vlc_player)],
|
|
) -> dict:
|
|
"""Stop all running VLC instances."""
|
|
try:
|
|
return await vlc_player.stop_all_vlc_instances()
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to stop VLC instances: {e!s}",
|
|
) from e
|