Files
sdb2-backend/app/api/v1/admin/extractions.py
JSC 7dee6e320e
Some checks failed
Backend CI / lint (push) Successful in 9m25s
Backend CI / test (push) Failing after 4m48s
Add tests for extraction API endpoints and enhance existing tests
- Implement tests for admin extraction API endpoints including status retrieval, deletion of extractions, and permission checks.
- Add tests for user extraction deletion, ensuring proper handling of permissions and non-existent extractions.
- Enhance sound endpoint tests to include duplicate handling in responses.
- Refactor favorite service tests to utilize mock dependencies for better maintainability and clarity.
- Update sound scanner tests to improve file handling and ensure proper deletion of associated files.
2025-08-25 21:40:31 +02:00

60 lines
2.0 KiB
Python

"""Admin audio extraction API endpoints."""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.database import get_db
from app.core.dependencies import get_admin_user
from app.models.user import User
from app.services.extraction import ExtractionService
from app.services.extraction_processor import extraction_processor
router = APIRouter(prefix="/extractions", tags=["admin-extractions"])
async def get_extraction_service(
session: Annotated[AsyncSession, Depends(get_db)],
) -> ExtractionService:
"""Get the extraction service."""
return ExtractionService(session)
@router.get("/status")
async def get_extraction_processor_status(
current_user: Annotated[User, Depends(get_admin_user)], # noqa: ARG001
) -> dict:
"""Get the status of the extraction processor. Admin only."""
return extraction_processor.get_status()
@router.delete("/{extraction_id}")
async def delete_extraction(
extraction_id: int,
current_user: Annotated[User, Depends(get_admin_user)], # noqa: ARG001
extraction_service: Annotated[ExtractionService, Depends(get_extraction_service)],
) -> dict[str, str]:
"""Delete any extraction and its associated sound and files. Admin only."""
try:
deleted = await extraction_service.delete_extraction(extraction_id, None)
if not deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Extraction {extraction_id} not found",
)
except HTTPException:
# Re-raise HTTPExceptions without wrapping them
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete extraction: {e!s}",
) from e
else:
return {
"message": f"Extraction {extraction_id} deleted successfully",
}