feat: add SoundRepository and SoundScannerService for audio file management

- Implemented SoundRepository for database operations related to sounds, including methods for retrieving, creating, updating, and deleting sound records.
- Developed SoundScannerService to scan directories for audio files, calculate their metadata, and synchronize with the database.
- Added support for various audio file formats and integrated ffmpeg for audio duration extraction.
- Created comprehensive tests for sound API endpoints and sound scanner service to ensure functionality and error handling.
- Updated dependencies to include ffmpeg-python for audio processing.
This commit is contained in:
JSC
2025-07-27 23:34:17 +02:00
parent cb20746f84
commit 36949a1f1c
9 changed files with 1346 additions and 1 deletions

View File

@@ -0,0 +1,514 @@
"""Tests for sound API endpoints."""
from unittest.mock import patch
import pytest
from httpx import ASGITransport, AsyncClient
from app.models.user import User
from app.services.sound_scanner import ScanResults
class TestSoundEndpoints:
"""Test sound API endpoints."""
@pytest.mark.asyncio
async def test_scan_sounds_success(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test successful sound scanning."""
# Mock the scanner service to return successful results
mock_results: ScanResults = {
"scanned": 5,
"added": 3,
"updated": 1,
"deleted": 1,
"skipped": 0,
"errors": 0,
"files": [
{
"filename": "test1.mp3",
"status": "added",
"reason": None,
"name": "Test1",
"duration": 5000,
"size": 1024,
"id": 1,
"error": None,
"changes": None,
},
{
"filename": "test2.mp3",
"status": "updated",
"reason": "file was modified",
"name": "Test2",
"duration": 7500,
"size": 2048,
"id": 2,
"error": None,
"changes": ["hash", "duration", "size"],
},
{
"filename": "test3.mp3",
"status": "deleted",
"reason": "file no longer exists",
"name": "Test3",
"duration": 3000,
"size": 512,
"id": 3,
"error": None,
"changes": None,
},
],
}
with patch(
"app.services.sound_scanner.SoundScannerService.scan_soundboard_directory"
) as mock_scan:
mock_scan.return_value = mock_results
response = await authenticated_admin_client.post("/api/v1/sounds/scan")
assert response.status_code == 200
data = response.json()
assert "message" in data
assert "Sound sync completed" in data["message"]
assert "results" in data
results = data["results"]
assert results["scanned"] == 5
assert results["added"] == 3
assert results["updated"] == 1
assert results["deleted"] == 1
assert results["skipped"] == 0
assert results["errors"] == 0
assert len(results["files"]) == 3
# Check file details
assert results["files"][0]["filename"] == "test1.mp3"
assert results["files"][0]["status"] == "added"
assert results["files"][1]["status"] == "updated"
assert results["files"][2]["status"] == "deleted"
@pytest.mark.asyncio
async def test_scan_sounds_unauthenticated(self, client: AsyncClient):
"""Test scanning sounds without authentication."""
response = await client.post("/api/v1/sounds/scan")
assert response.status_code == 401
data = response.json()
assert "Could not validate credentials" in data["detail"]
@pytest.mark.asyncio
async def test_scan_sounds_non_admin(
self,
test_app,
test_user: User,
):
"""Test scanning sounds with non-admin user."""
from app.core.dependencies import get_current_active_user_flexible
# Override the dependency to return regular user
async def override_get_current_user():
test_user.role = "user"
return test_user
test_app.dependency_overrides[get_current_active_user_flexible] = (
override_get_current_user
)
# Create API token for regular user
headers = {"API-TOKEN": "test_api_token"}
async with AsyncClient(
transport=ASGITransport(app=test_app),
base_url="http://test",
) as client:
response = await client.post("/api/v1/sounds/scan", headers=headers)
assert response.status_code == 403
data = response.json()
assert "Only administrators can sync sounds" in data["detail"]
# Clean up override
test_app.dependency_overrides.pop(get_current_active_user_flexible, None)
@pytest.mark.asyncio
async def test_scan_sounds_admin_user(
self,
test_app,
admin_user: User,
):
"""Test scanning sounds with admin user."""
from app.core.dependencies import get_current_active_user_flexible
mock_results: ScanResults = {
"scanned": 1,
"added": 1,
"updated": 0,
"deleted": 0,
"skipped": 0,
"errors": 0,
"files": [],
}
# Override the dependency to return admin user
async def override_get_current_user():
return admin_user
test_app.dependency_overrides[get_current_active_user_flexible] = (
override_get_current_user
)
headers = {"API-TOKEN": "admin_api_token"}
with patch(
"app.services.sound_scanner.SoundScannerService.scan_soundboard_directory"
) as mock_scan:
mock_scan.return_value = mock_results
async with AsyncClient(
transport=ASGITransport(app=test_app),
base_url="http://test",
) as client:
response = await client.post("/api/v1/sounds/scan", headers=headers)
assert response.status_code == 200
data = response.json()
assert "Sound sync completed" in data["message"]
# Clean up override
test_app.dependency_overrides.pop(get_current_active_user_flexible, None)
@pytest.mark.asyncio
async def test_scan_sounds_service_error(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test scanning sounds when service raises an error."""
with patch(
"app.services.sound_scanner.SoundScannerService.scan_soundboard_directory"
) as mock_scan:
mock_scan.side_effect = Exception("Directory not found")
response = await authenticated_admin_client.post("/api/v1/sounds/scan")
assert response.status_code == 500
data = response.json()
assert "Failed to sync sounds" in data["detail"]
assert "Directory not found" in data["detail"]
@pytest.mark.asyncio
async def test_scan_custom_directory_success(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test successful custom directory scanning."""
mock_results: ScanResults = {
"scanned": 2,
"added": 2,
"updated": 0,
"deleted": 0,
"skipped": 0,
"errors": 0,
"files": [
{
"filename": "custom1.wav",
"status": "added",
"reason": None,
"name": "Custom1",
"duration": 4000,
"size": 800,
"id": 10,
"error": None,
"changes": None,
},
{
"filename": "custom2.wav",
"status": "added",
"reason": None,
"name": "Custom2",
"duration": 6000,
"size": 1200,
"id": 11,
"error": None,
"changes": None,
},
],
}
with patch(
"app.services.sound_scanner.SoundScannerService.scan_directory"
) as mock_scan:
mock_scan.return_value = mock_results
response = await authenticated_admin_client.post(
"/api/v1/sounds/scan/custom",
params={"directory": "/custom/path", "sound_type": "CUSTOM"},
)
assert response.status_code == 200
data = response.json()
assert "Sync of directory '/custom/path' completed" in data["message"]
assert "results" in data
results = data["results"]
assert results["scanned"] == 2
assert results["added"] == 2
assert len(results["files"]) == 2
# Verify the service was called with correct parameters
mock_scan.assert_called_once_with("/custom/path", "CUSTOM")
@pytest.mark.asyncio
async def test_scan_custom_directory_default_type(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test custom directory scanning with default sound type."""
mock_results: ScanResults = {
"scanned": 1,
"added": 1,
"updated": 0,
"deleted": 0,
"skipped": 0,
"errors": 0,
"files": [],
}
with patch(
"app.services.sound_scanner.SoundScannerService.scan_directory"
) as mock_scan:
mock_scan.return_value = mock_results
response = await authenticated_admin_client.post(
"/api/v1/sounds/scan/custom",
params={"directory": "/another/path"}, # No sound_type param
)
assert response.status_code == 200
# Verify the service was called with default SDB type
mock_scan.assert_called_once_with("/another/path", "SDB")
@pytest.mark.asyncio
async def test_scan_custom_directory_invalid_path(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test custom directory scanning with invalid path."""
with patch(
"app.services.sound_scanner.SoundScannerService.scan_directory"
) as mock_scan:
mock_scan.side_effect = ValueError(
"Directory does not exist: /invalid/path"
)
response = await authenticated_admin_client.post(
"/api/v1/sounds/scan/custom", params={"directory": "/invalid/path"}
)
assert response.status_code == 400
data = response.json()
assert "Directory does not exist" in data["detail"]
@pytest.mark.asyncio
async def test_scan_custom_directory_unauthenticated(self, client: AsyncClient):
"""Test custom directory scanning without authentication."""
response = await client.post(
"/api/v1/sounds/scan/custom", params={"directory": "/some/path"}
)
assert response.status_code == 401
data = response.json()
assert "Could not validate credentials" in data["detail"]
@pytest.mark.asyncio
async def test_scan_custom_directory_non_admin(
self,
test_app,
test_user: User,
):
"""Test custom directory scanning with non-admin user."""
from app.core.dependencies import get_current_active_user_flexible
# Override the dependency to return regular user
async def override_get_current_user():
test_user.role = "user"
return test_user
test_app.dependency_overrides[get_current_active_user_flexible] = (
override_get_current_user
)
headers = {"API-TOKEN": "test_api_token"}
async with AsyncClient(
transport=ASGITransport(app=test_app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/sounds/scan/custom",
headers=headers,
params={"directory": "/some/path"},
)
assert response.status_code == 403
data = response.json()
assert "Only administrators can sync sounds" in data["detail"]
# Clean up override
test_app.dependency_overrides.pop(get_current_active_user_flexible, None)
@pytest.mark.asyncio
async def test_scan_custom_directory_service_error(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test custom directory scanning when service raises an error."""
with patch(
"app.services.sound_scanner.SoundScannerService.scan_directory"
) as mock_scan:
mock_scan.side_effect = Exception("Permission denied")
response = await authenticated_admin_client.post(
"/api/v1/sounds/scan/custom", params={"directory": "/restricted/path"}
)
assert response.status_code == 500
data = response.json()
assert "Failed to sync directory" in data["detail"]
assert "Permission denied" in data["detail"]
@pytest.mark.asyncio
async def test_scan_results_with_errors(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test scanning with some errors in results."""
mock_results: ScanResults = {
"scanned": 3,
"added": 1,
"updated": 0,
"deleted": 0,
"skipped": 1,
"errors": 1,
"files": [
{
"filename": "good.mp3",
"status": "added",
"reason": None,
"name": "Good",
"duration": 5000,
"size": 1024,
"id": 1,
"error": None,
"changes": None,
},
{
"filename": "unchanged.mp3",
"status": "skipped",
"reason": "file unchanged",
"name": "Unchanged",
"duration": 3000,
"size": 512,
"id": 2,
"error": None,
"changes": None,
},
{
"filename": "bad.mp3",
"status": "error",
"reason": None,
"name": None,
"duration": None,
"size": None,
"id": None,
"error": "Invalid audio format",
"changes": None,
},
],
}
with patch(
"app.services.sound_scanner.SoundScannerService.scan_soundboard_directory"
) as mock_scan:
mock_scan.return_value = mock_results
response = await authenticated_admin_client.post("/api/v1/sounds/scan")
assert response.status_code == 200
data = response.json()
results = data["results"]
assert results["errors"] == 1
assert results["added"] == 1
assert results["skipped"] == 1
# Check error file details
error_file = next(f for f in results["files"] if f["status"] == "error")
assert error_file["filename"] == "bad.mp3"
assert error_file["error"] == "Invalid audio format"
assert error_file["id"] is None
@pytest.mark.asyncio
async def test_endpoint_response_structure(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test that endpoint response has correct structure."""
mock_results: ScanResults = {
"scanned": 0,
"added": 0,
"updated": 0,
"deleted": 0,
"skipped": 0,
"errors": 0,
"files": [],
}
with patch(
"app.services.sound_scanner.SoundScannerService.scan_soundboard_directory"
) as mock_scan:
mock_scan.return_value = mock_results
response = await authenticated_admin_client.post("/api/v1/sounds/scan")
assert response.status_code == 200
data = response.json()
# Check top-level structure
assert isinstance(data, dict)
assert "message" in data
assert "results" in data
assert isinstance(data["message"], str)
assert isinstance(data["results"], dict)
# Check results structure
results = data["results"]
required_fields = [
"scanned",
"added",
"updated",
"deleted",
"skipped",
"errors",
"files",
]
for field in required_fields:
assert field in results
if field == "files":
assert isinstance(results[field], list)
else:
assert isinstance(results[field], int)

View File

@@ -114,6 +114,19 @@ async def authenticated_client(
yield client
@pytest_asyncio.fixture
async def authenticated_admin_client(
test_app: FastAPI, admin_cookies: dict[str, str],
) -> AsyncGenerator[AsyncClient, None]:
"""Create a test HTTP client with admin authentication cookies."""
async with AsyncClient(
transport=ASGITransport(app=test_app),
base_url="http://test",
cookies=admin_cookies,
) as client:
yield client
@pytest_asyncio.fixture
async def test_plan(test_session: AsyncSession) -> Plan:
"""Create a test plan."""
@@ -307,3 +320,17 @@ async def auth_cookies(test_user: User) -> dict[str, str]:
access_token = JWTUtils.create_access_token(token_data)
return {"access_token": access_token}
@pytest_asyncio.fixture
async def admin_cookies(admin_user: User) -> dict[str, str]:
"""Create admin authentication cookies with JWT token."""
token_data = {
"sub": str(admin_user.id),
"email": admin_user.email,
"role": admin_user.role,
}
access_token = JWTUtils.create_access_token(token_data)
return {"access_token": access_token}

View File

@@ -0,0 +1,298 @@
"""Tests for sound scanner service."""
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, Mock, patch
import pytest
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.sound import Sound
from app.services.sound_scanner import SoundScannerService
class TestSoundScannerService:
"""Test sound scanner service."""
@pytest.fixture
def mock_session(self):
"""Create a mock session."""
return Mock(spec=AsyncSession)
@pytest.fixture
def scanner_service(self, mock_session):
"""Create a scanner service with mock session."""
return SoundScannerService(mock_session)
def test_init(self, scanner_service):
"""Test scanner service initialization."""
assert scanner_service.session is not None
assert scanner_service.sound_repo is not None
assert len(scanner_service.supported_extensions) > 0
assert ".mp3" in scanner_service.supported_extensions
assert ".wav" in scanner_service.supported_extensions
def test_get_file_hash(self, scanner_service):
"""Test file hash calculation."""
# Create a temporary file
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
f.write("test content")
temp_path = Path(f.name)
try:
hash_value = scanner_service.get_file_hash(temp_path)
assert len(hash_value) == 32 # MD5 hash length
assert isinstance(hash_value, str)
finally:
temp_path.unlink()
def test_get_file_size(self, scanner_service):
"""Test file size calculation."""
# Create a temporary file
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
f.write("test content for size calculation")
temp_path = Path(f.name)
try:
size = scanner_service.get_file_size(temp_path)
assert size > 0
assert isinstance(size, int)
finally:
temp_path.unlink()
def test_extract_name_from_filename(self, scanner_service):
"""Test name extraction from filename."""
test_cases = [
("hello_world.mp3", "Hello World"),
("my-awesome-sound.wav", "My Awesome Sound"),
("TEST_FILE_NAME.opus", "Test File Name"),
("single.mp3", "Single"),
("multiple_words_here.flac", "Multiple Words Here"),
]
for filename, expected_name in test_cases:
result = scanner_service.extract_name_from_filename(filename)
assert result == expected_name
@patch("app.services.sound_scanner.ffmpeg.probe")
def test_get_audio_duration_success(self, mock_probe, scanner_service):
"""Test successful audio duration extraction."""
mock_probe.return_value = {
"format": {"duration": "123.456"}
}
temp_path = Path("/fake/path/test.mp3")
duration = scanner_service.get_audio_duration(temp_path)
assert duration == 123456 # 123.456 seconds * 1000 = 123456 ms
mock_probe.assert_called_once_with(str(temp_path))
@patch("app.services.sound_scanner.ffmpeg.probe")
def test_get_audio_duration_failure(self, mock_probe, scanner_service):
"""Test audio duration extraction failure."""
mock_probe.side_effect = Exception("FFmpeg error")
temp_path = Path("/fake/path/test.mp3")
duration = scanner_service.get_audio_duration(temp_path)
assert duration == 0
mock_probe.assert_called_once_with(str(temp_path))
@pytest.mark.asyncio
async def test_scan_directory_nonexistent(self, scanner_service):
"""Test scanning a non-existent directory."""
with pytest.raises(ValueError, match="Directory does not exist"):
await scanner_service.scan_directory("/non/existent/path")
@pytest.mark.asyncio
async def test_scan_directory_not_directory(self, scanner_service):
"""Test scanning a path that is not a directory."""
# Create a temporary file
with tempfile.NamedTemporaryFile() as f:
with pytest.raises(ValueError, match="Path is not a directory"):
await scanner_service.scan_directory(f.name)
@pytest.mark.asyncio
async def test_sync_audio_file_unchanged(self, scanner_service):
"""Test syncing file that is unchanged."""
# Existing sound with same hash as file
existing_sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=120000, # 120 seconds = 120000 ms
size=1024,
hash="same_hash",
)
# Mock file operations to return same hash
scanner_service.get_file_hash = Mock(return_value="same_hash")
scanner_service.get_audio_duration = Mock(return_value=120000)
scanner_service.get_file_size = Mock(return_value=1024)
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
temp_path = Path(f.name)
try:
results = {
"scanned": 0, "added": 0, "updated": 0, "deleted": 0,
"skipped": 0, "errors": 0, "files": []
}
await scanner_service._sync_audio_file(
temp_path, "SDB", existing_sound, results
)
assert results["skipped"] == 1
assert results["added"] == 0
assert results["updated"] == 0
assert len(results["files"]) == 1
assert results["files"][0]["status"] == "skipped"
assert results["files"][0]["reason"] == "file unchanged"
finally:
temp_path.unlink()
@pytest.mark.asyncio
async def test_sync_audio_file_new(self, scanner_service):
"""Test syncing a new audio file."""
created_sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=120000, # 120 seconds = 120000 ms
size=1024,
hash="test_hash",
)
scanner_service.sound_repo.create = AsyncMock(return_value=created_sound)
# Mock file operations
scanner_service.get_file_hash = Mock(return_value="test_hash")
scanner_service.get_audio_duration = Mock(return_value=120000) # Duration in ms
scanner_service.get_file_size = Mock(return_value=1024)
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
temp_path = Path(f.name)
try:
results = {
"scanned": 0, "added": 0, "updated": 0, "deleted": 0,
"skipped": 0, "errors": 0, "files": []
}
await scanner_service._sync_audio_file(temp_path, "SDB", None, results)
assert results["added"] == 1
assert results["skipped"] == 0
assert results["updated"] == 0
assert len(results["files"]) == 1
assert results["files"][0]["status"] == "added"
# Verify sound_repo.create was called with correct data
call_args = scanner_service.sound_repo.create.call_args[0][0]
assert call_args["type"] == "SDB"
assert call_args["filename"] == temp_path.name
assert call_args["duration"] == 120000 # Duration in ms
assert call_args["size"] == 1024
assert call_args["hash"] == "test_hash"
assert call_args["is_deletable"] is False # SDB sounds are not deletable
finally:
temp_path.unlink()
@pytest.mark.asyncio
async def test_sync_audio_file_updated(self, scanner_service):
"""Test syncing a file that was modified (different hash)."""
# Existing sound with different hash than file
existing_sound = Sound(
id=1,
type="SDB",
name="Old Sound",
filename="test.mp3",
duration=60000, # Old duration
size=512, # Old size
hash="old_hash", # Old hash
)
scanner_service.sound_repo.update = AsyncMock(return_value=existing_sound)
# Mock file operations to return new values
scanner_service.get_file_hash = Mock(return_value="new_hash")
scanner_service.get_audio_duration = Mock(return_value=120000) # New duration
scanner_service.get_file_size = Mock(return_value=1024) # New size
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
temp_path = Path(f.name)
try:
results = {
"scanned": 0, "added": 0, "updated": 0, "deleted": 0,
"skipped": 0, "errors": 0, "files": []
}
await scanner_service._sync_audio_file(
temp_path, "SDB", existing_sound, results
)
assert results["updated"] == 1
assert results["added"] == 0
assert results["skipped"] == 0
assert len(results["files"]) == 1
assert results["files"][0]["status"] == "updated"
assert results["files"][0]["reason"] == "file was modified"
# Verify sound_repo.update was called with correct data
call_args = scanner_service.sound_repo.update.call_args[0][1] # update_data
assert call_args["duration"] == 120000
assert call_args["size"] == 1024
assert call_args["hash"] == "new_hash"
# Name is extracted from temp filename, should be capitalized
assert call_args["name"].endswith("mp3") is False # Should be cleaned
finally:
temp_path.unlink()
@pytest.mark.asyncio
async def test_sync_audio_file_custom_type(self, scanner_service):
"""Test syncing file with custom type."""
created_sound = Sound(
id=1,
type="CUSTOM",
name="Test Sound",
filename="test.mp3",
duration=60000, # 60 seconds = 60000 ms
size=2048,
hash="custom_hash",
)
scanner_service.sound_repo.create = AsyncMock(return_value=created_sound)
# Mock file operations
scanner_service.get_file_hash = Mock(return_value="custom_hash")
scanner_service.get_audio_duration = Mock(return_value=60000) # Duration in ms
scanner_service.get_file_size = Mock(return_value=2048)
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
temp_path = Path(f.name)
try:
results = {
"scanned": 0, "added": 0, "updated": 0, "deleted": 0,
"skipped": 0, "errors": 0, "files": []
}
await scanner_service._sync_audio_file(temp_path, "CUSTOM", None, results)
assert results["added"] == 1
assert results["skipped"] == 0
assert len(results["files"]) == 1
assert results["files"][0]["status"] == "added"
# Verify sound_repo.create was called with correct data for custom type
call_args = scanner_service.sound_repo.create.call_args[0][0]
assert call_args["type"] == "CUSTOM"
assert call_args["filename"] == temp_path.name
assert call_args["duration"] == 60000 # Duration in ms
assert call_args["size"] == 2048
assert call_args["hash"] == "custom_hash"
assert call_args["is_deletable"] is False # All sounds are set to not deletable
finally:
temp_path.unlink()