Files
sdb2-backend/app/api/v1/sounds.py
JSC 6068599a47
All checks were successful
Backend CI / lint (push) Successful in 9m49s
Backend CI / test (push) Successful in 6m15s
Refactor test cases for improved readability and consistency
- Adjusted function signatures in various test files to enhance clarity by aligning parameters.
- Updated patching syntax for better readability across test cases.
- Improved formatting and spacing in test assertions and mock setups.
- Ensured consistent use of async/await patterns in async test functions.
- Enhanced comments for better understanding of test intentions.
2025-08-01 20:53:30 +02:00

220 lines
7.2 KiB
Python

"""Sound management API endpoints."""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.database import get_db, get_session_factory
from app.core.dependencies import get_current_active_user_flexible
from app.models.credit_action import CreditActionType
from app.models.user import User
from app.repositories.sound import SoundRepository
from app.services.credit import CreditService, InsufficientCreditsError
from app.services.extraction import ExtractionInfo, ExtractionService
from app.services.extraction_processor import extraction_processor
from app.services.vlc_player import VLCPlayerService, get_vlc_player_service
router = APIRouter(prefix="/sounds", tags=["sounds"])
async def get_extraction_service(
session: Annotated[AsyncSession, Depends(get_db)],
) -> ExtractionService:
"""Get the extraction service."""
return ExtractionService(session)
def get_vlc_player() -> VLCPlayerService:
"""Get the VLC player service."""
return get_vlc_player_service(get_session_factory())
def get_credit_service() -> CreditService:
"""Get the credit service."""
return CreditService(get_session_factory())
async def get_sound_repository(
session: Annotated[AsyncSession, Depends(get_db)],
) -> SoundRepository:
"""Get the sound repository."""
return SoundRepository(session)
# EXTRACT
@router.post("/extract")
async def create_extraction(
url: str,
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
extraction_service: Annotated[ExtractionService, Depends(get_extraction_service)],
) -> dict[str, ExtractionInfo | str]:
"""Create a new extraction job for a URL."""
try:
if current_user.id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User ID not available",
)
extraction_info = await extraction_service.create_extraction(
url,
current_user.id,
)
# Queue the extraction for background processing
await extraction_processor.queue_extraction(extraction_info["id"])
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create extraction: {e!s}",
) from e
else:
return {
"message": "Extraction queued successfully",
"extraction": extraction_info,
}
@router.get("/extract/{extraction_id}")
async def get_extraction(
extraction_id: int,
current_user: Annotated[User, Depends(get_current_active_user_flexible)], # noqa: ARG001
extraction_service: Annotated[ExtractionService, Depends(get_extraction_service)],
) -> ExtractionInfo:
"""Get extraction information by ID."""
try:
extraction_info = await extraction_service.get_extraction_by_id(extraction_id)
if not extraction_info:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Extraction {extraction_id} not found",
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get extraction: {e!s}",
) from e
else:
return extraction_info
@router.get("/extract")
async def get_user_extractions(
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
extraction_service: Annotated[ExtractionService, Depends(get_extraction_service)],
) -> dict[str, list[ExtractionInfo]]:
"""Get all extractions for the current user."""
try:
if current_user.id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User ID not available",
)
extractions = await extraction_service.get_user_extractions(current_user.id)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get extractions: {e!s}",
) from e
else:
return {
"extractions": extractions,
}
# VLC PLAYER
@router.post("/play/{sound_id}")
async def play_sound_with_vlc(
sound_id: int,
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
vlc_player: Annotated[VLCPlayerService, Depends(get_vlc_player)],
sound_repo: Annotated[SoundRepository, Depends(get_sound_repository)],
credit_service: Annotated[CreditService, Depends(get_credit_service)],
) -> dict[str, str | int | bool]:
"""Play a sound using VLC subprocess (requires 1 credit)."""
try:
# Get the sound
sound = await sound_repo.get_by_id(sound_id)
if not sound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Sound with ID {sound_id} not found",
)
# Check and validate credits before playing
try:
await credit_service.validate_and_reserve_credits(
current_user.id,
CreditActionType.VLC_PLAY_SOUND,
)
except InsufficientCreditsError as e:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=(
f"Insufficient credits: {e.required} required, "
f"{e.available} available"
),
) from e
# Play the sound using VLC
success = await vlc_player.play_sound(sound)
# Deduct credits based on success
await credit_service.deduct_credits(
current_user.id,
CreditActionType.VLC_PLAY_SOUND,
success=success,
metadata={"sound_id": sound_id, "sound_name": sound.name},
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to launch VLC for sound playback",
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to play sound: {e!s}",
) from e
else:
return {
"message": f"Sound '{sound.name}' is now playing via VLC",
"sound_id": sound_id,
"sound_name": sound.name,
"success": True,
"credits_deducted": 1,
}
@router.post("/stop")
async def stop_all_vlc_instances(
current_user: Annotated[User, Depends(get_current_active_user_flexible)], # noqa: ARG001
vlc_player: Annotated[VLCPlayerService, Depends(get_vlc_player)],
) -> dict:
"""Stop all running VLC instances."""
try:
return await vlc_player.stop_all_vlc_instances()
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to stop VLC instances: {e!s}",
) from e