- Implemented SoundRepository for database operations related to sounds, including methods for retrieving, creating, updating, and deleting sound records. - Developed SoundScannerService to scan directories for audio files, calculate their metadata, and synchronize with the database. - Added support for various audio file formats and integrated ffmpeg for audio duration extraction. - Created comprehensive tests for sound API endpoints and sound scanner service to ensure functionality and error handling. - Updated dependencies to include ffmpeg-python for audio processing.
515 lines
17 KiB
Python
515 lines
17 KiB
Python
"""Tests for sound API endpoints."""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from app.models.user import User
|
|
from app.services.sound_scanner import ScanResults
|
|
|
|
|
|
class TestSoundEndpoints:
|
|
"""Test sound API endpoints."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_sounds_success(
|
|
self,
|
|
authenticated_admin_client: AsyncClient,
|
|
admin_user: User,
|
|
):
|
|
"""Test successful sound scanning."""
|
|
# Mock the scanner service to return successful results
|
|
mock_results: ScanResults = {
|
|
"scanned": 5,
|
|
"added": 3,
|
|
"updated": 1,
|
|
"deleted": 1,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [
|
|
{
|
|
"filename": "test1.mp3",
|
|
"status": "added",
|
|
"reason": None,
|
|
"name": "Test1",
|
|
"duration": 5000,
|
|
"size": 1024,
|
|
"id": 1,
|
|
"error": None,
|
|
"changes": None,
|
|
},
|
|
{
|
|
"filename": "test2.mp3",
|
|
"status": "updated",
|
|
"reason": "file was modified",
|
|
"name": "Test2",
|
|
"duration": 7500,
|
|
"size": 2048,
|
|
"id": 2,
|
|
"error": None,
|
|
"changes": ["hash", "duration", "size"],
|
|
},
|
|
{
|
|
"filename": "test3.mp3",
|
|
"status": "deleted",
|
|
"reason": "file no longer exists",
|
|
"name": "Test3",
|
|
"duration": 3000,
|
|
"size": 512,
|
|
"id": 3,
|
|
"error": None,
|
|
"changes": None,
|
|
},
|
|
],
|
|
}
|
|
|
|
with patch(
|
|
"app.services.sound_scanner.SoundScannerService.scan_soundboard_directory"
|
|
) as mock_scan:
|
|
mock_scan.return_value = mock_results
|
|
|
|
response = await authenticated_admin_client.post("/api/v1/sounds/scan")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert "message" in data
|
|
assert "Sound sync completed" in data["message"]
|
|
assert "results" in data
|
|
|
|
results = data["results"]
|
|
assert results["scanned"] == 5
|
|
assert results["added"] == 3
|
|
assert results["updated"] == 1
|
|
assert results["deleted"] == 1
|
|
assert results["skipped"] == 0
|
|
assert results["errors"] == 0
|
|
assert len(results["files"]) == 3
|
|
|
|
# Check file details
|
|
assert results["files"][0]["filename"] == "test1.mp3"
|
|
assert results["files"][0]["status"] == "added"
|
|
assert results["files"][1]["status"] == "updated"
|
|
assert results["files"][2]["status"] == "deleted"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_sounds_unauthenticated(self, client: AsyncClient):
|
|
"""Test scanning sounds without authentication."""
|
|
response = await client.post("/api/v1/sounds/scan")
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Could not validate credentials" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_sounds_non_admin(
|
|
self,
|
|
test_app,
|
|
test_user: User,
|
|
):
|
|
"""Test scanning sounds with non-admin user."""
|
|
from app.core.dependencies import get_current_active_user_flexible
|
|
|
|
# Override the dependency to return regular user
|
|
async def override_get_current_user():
|
|
test_user.role = "user"
|
|
return test_user
|
|
|
|
test_app.dependency_overrides[get_current_active_user_flexible] = (
|
|
override_get_current_user
|
|
)
|
|
|
|
# Create API token for regular user
|
|
headers = {"API-TOKEN": "test_api_token"}
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=test_app),
|
|
base_url="http://test",
|
|
) as client:
|
|
response = await client.post("/api/v1/sounds/scan", headers=headers)
|
|
|
|
assert response.status_code == 403
|
|
data = response.json()
|
|
assert "Only administrators can sync sounds" in data["detail"]
|
|
|
|
# Clean up override
|
|
test_app.dependency_overrides.pop(get_current_active_user_flexible, None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_sounds_admin_user(
|
|
self,
|
|
test_app,
|
|
admin_user: User,
|
|
):
|
|
"""Test scanning sounds with admin user."""
|
|
from app.core.dependencies import get_current_active_user_flexible
|
|
|
|
mock_results: ScanResults = {
|
|
"scanned": 1,
|
|
"added": 1,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
|
|
# Override the dependency to return admin user
|
|
async def override_get_current_user():
|
|
return admin_user
|
|
|
|
test_app.dependency_overrides[get_current_active_user_flexible] = (
|
|
override_get_current_user
|
|
)
|
|
|
|
headers = {"API-TOKEN": "admin_api_token"}
|
|
|
|
with patch(
|
|
"app.services.sound_scanner.SoundScannerService.scan_soundboard_directory"
|
|
) as mock_scan:
|
|
mock_scan.return_value = mock_results
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=test_app),
|
|
base_url="http://test",
|
|
) as client:
|
|
response = await client.post("/api/v1/sounds/scan", headers=headers)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "Sound sync completed" in data["message"]
|
|
|
|
# Clean up override
|
|
test_app.dependency_overrides.pop(get_current_active_user_flexible, None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_sounds_service_error(
|
|
self,
|
|
authenticated_admin_client: AsyncClient,
|
|
admin_user: User,
|
|
):
|
|
"""Test scanning sounds when service raises an error."""
|
|
with patch(
|
|
"app.services.sound_scanner.SoundScannerService.scan_soundboard_directory"
|
|
) as mock_scan:
|
|
mock_scan.side_effect = Exception("Directory not found")
|
|
|
|
response = await authenticated_admin_client.post("/api/v1/sounds/scan")
|
|
|
|
assert response.status_code == 500
|
|
data = response.json()
|
|
assert "Failed to sync sounds" in data["detail"]
|
|
assert "Directory not found" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_custom_directory_success(
|
|
self,
|
|
authenticated_admin_client: AsyncClient,
|
|
admin_user: User,
|
|
):
|
|
"""Test successful custom directory scanning."""
|
|
mock_results: ScanResults = {
|
|
"scanned": 2,
|
|
"added": 2,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [
|
|
{
|
|
"filename": "custom1.wav",
|
|
"status": "added",
|
|
"reason": None,
|
|
"name": "Custom1",
|
|
"duration": 4000,
|
|
"size": 800,
|
|
"id": 10,
|
|
"error": None,
|
|
"changes": None,
|
|
},
|
|
{
|
|
"filename": "custom2.wav",
|
|
"status": "added",
|
|
"reason": None,
|
|
"name": "Custom2",
|
|
"duration": 6000,
|
|
"size": 1200,
|
|
"id": 11,
|
|
"error": None,
|
|
"changes": None,
|
|
},
|
|
],
|
|
}
|
|
|
|
with patch(
|
|
"app.services.sound_scanner.SoundScannerService.scan_directory"
|
|
) as mock_scan:
|
|
mock_scan.return_value = mock_results
|
|
|
|
response = await authenticated_admin_client.post(
|
|
"/api/v1/sounds/scan/custom",
|
|
params={"directory": "/custom/path", "sound_type": "CUSTOM"},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert "Sync of directory '/custom/path' completed" in data["message"]
|
|
assert "results" in data
|
|
|
|
results = data["results"]
|
|
assert results["scanned"] == 2
|
|
assert results["added"] == 2
|
|
assert len(results["files"]) == 2
|
|
|
|
# Verify the service was called with correct parameters
|
|
mock_scan.assert_called_once_with("/custom/path", "CUSTOM")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_custom_directory_default_type(
|
|
self,
|
|
authenticated_admin_client: AsyncClient,
|
|
admin_user: User,
|
|
):
|
|
"""Test custom directory scanning with default sound type."""
|
|
mock_results: ScanResults = {
|
|
"scanned": 1,
|
|
"added": 1,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
|
|
with patch(
|
|
"app.services.sound_scanner.SoundScannerService.scan_directory"
|
|
) as mock_scan:
|
|
mock_scan.return_value = mock_results
|
|
|
|
response = await authenticated_admin_client.post(
|
|
"/api/v1/sounds/scan/custom",
|
|
params={"directory": "/another/path"}, # No sound_type param
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
|
|
# Verify the service was called with default SDB type
|
|
mock_scan.assert_called_once_with("/another/path", "SDB")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_custom_directory_invalid_path(
|
|
self,
|
|
authenticated_admin_client: AsyncClient,
|
|
admin_user: User,
|
|
):
|
|
"""Test custom directory scanning with invalid path."""
|
|
with patch(
|
|
"app.services.sound_scanner.SoundScannerService.scan_directory"
|
|
) as mock_scan:
|
|
mock_scan.side_effect = ValueError(
|
|
"Directory does not exist: /invalid/path"
|
|
)
|
|
|
|
response = await authenticated_admin_client.post(
|
|
"/api/v1/sounds/scan/custom", params={"directory": "/invalid/path"}
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
data = response.json()
|
|
assert "Directory does not exist" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_custom_directory_unauthenticated(self, client: AsyncClient):
|
|
"""Test custom directory scanning without authentication."""
|
|
response = await client.post(
|
|
"/api/v1/sounds/scan/custom", params={"directory": "/some/path"}
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "Could not validate credentials" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_custom_directory_non_admin(
|
|
self,
|
|
test_app,
|
|
test_user: User,
|
|
):
|
|
"""Test custom directory scanning with non-admin user."""
|
|
from app.core.dependencies import get_current_active_user_flexible
|
|
|
|
# Override the dependency to return regular user
|
|
async def override_get_current_user():
|
|
test_user.role = "user"
|
|
return test_user
|
|
|
|
test_app.dependency_overrides[get_current_active_user_flexible] = (
|
|
override_get_current_user
|
|
)
|
|
|
|
headers = {"API-TOKEN": "test_api_token"}
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=test_app),
|
|
base_url="http://test",
|
|
) as client:
|
|
response = await client.post(
|
|
"/api/v1/sounds/scan/custom",
|
|
headers=headers,
|
|
params={"directory": "/some/path"},
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
data = response.json()
|
|
assert "Only administrators can sync sounds" in data["detail"]
|
|
|
|
# Clean up override
|
|
test_app.dependency_overrides.pop(get_current_active_user_flexible, None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_custom_directory_service_error(
|
|
self,
|
|
authenticated_admin_client: AsyncClient,
|
|
admin_user: User,
|
|
):
|
|
"""Test custom directory scanning when service raises an error."""
|
|
with patch(
|
|
"app.services.sound_scanner.SoundScannerService.scan_directory"
|
|
) as mock_scan:
|
|
mock_scan.side_effect = Exception("Permission denied")
|
|
|
|
response = await authenticated_admin_client.post(
|
|
"/api/v1/sounds/scan/custom", params={"directory": "/restricted/path"}
|
|
)
|
|
|
|
assert response.status_code == 500
|
|
data = response.json()
|
|
assert "Failed to sync directory" in data["detail"]
|
|
assert "Permission denied" in data["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_results_with_errors(
|
|
self,
|
|
authenticated_admin_client: AsyncClient,
|
|
admin_user: User,
|
|
):
|
|
"""Test scanning with some errors in results."""
|
|
mock_results: ScanResults = {
|
|
"scanned": 3,
|
|
"added": 1,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 1,
|
|
"errors": 1,
|
|
"files": [
|
|
{
|
|
"filename": "good.mp3",
|
|
"status": "added",
|
|
"reason": None,
|
|
"name": "Good",
|
|
"duration": 5000,
|
|
"size": 1024,
|
|
"id": 1,
|
|
"error": None,
|
|
"changes": None,
|
|
},
|
|
{
|
|
"filename": "unchanged.mp3",
|
|
"status": "skipped",
|
|
"reason": "file unchanged",
|
|
"name": "Unchanged",
|
|
"duration": 3000,
|
|
"size": 512,
|
|
"id": 2,
|
|
"error": None,
|
|
"changes": None,
|
|
},
|
|
{
|
|
"filename": "bad.mp3",
|
|
"status": "error",
|
|
"reason": None,
|
|
"name": None,
|
|
"duration": None,
|
|
"size": None,
|
|
"id": None,
|
|
"error": "Invalid audio format",
|
|
"changes": None,
|
|
},
|
|
],
|
|
}
|
|
|
|
with patch(
|
|
"app.services.sound_scanner.SoundScannerService.scan_soundboard_directory"
|
|
) as mock_scan:
|
|
mock_scan.return_value = mock_results
|
|
|
|
response = await authenticated_admin_client.post("/api/v1/sounds/scan")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
results = data["results"]
|
|
assert results["errors"] == 1
|
|
assert results["added"] == 1
|
|
assert results["skipped"] == 1
|
|
|
|
# Check error file details
|
|
error_file = next(f for f in results["files"] if f["status"] == "error")
|
|
assert error_file["filename"] == "bad.mp3"
|
|
assert error_file["error"] == "Invalid audio format"
|
|
assert error_file["id"] is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_endpoint_response_structure(
|
|
self,
|
|
authenticated_admin_client: AsyncClient,
|
|
admin_user: User,
|
|
):
|
|
"""Test that endpoint response has correct structure."""
|
|
mock_results: ScanResults = {
|
|
"scanned": 0,
|
|
"added": 0,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
|
|
with patch(
|
|
"app.services.sound_scanner.SoundScannerService.scan_soundboard_directory"
|
|
) as mock_scan:
|
|
mock_scan.return_value = mock_results
|
|
|
|
response = await authenticated_admin_client.post("/api/v1/sounds/scan")
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Check top-level structure
|
|
assert isinstance(data, dict)
|
|
assert "message" in data
|
|
assert "results" in data
|
|
assert isinstance(data["message"], str)
|
|
assert isinstance(data["results"], dict)
|
|
|
|
# Check results structure
|
|
results = data["results"]
|
|
required_fields = [
|
|
"scanned",
|
|
"added",
|
|
"updated",
|
|
"deleted",
|
|
"skipped",
|
|
"errors",
|
|
"files",
|
|
]
|
|
for field in required_fields:
|
|
assert field in results
|
|
if field == "files":
|
|
assert isinstance(results[field], list)
|
|
else:
|
|
assert isinstance(results[field], int)
|