- Implement tests for admin extraction API endpoints including status retrieval, deletion of extractions, and permission checks. - Add tests for user extraction deletion, ensuring proper handling of permissions and non-existent extractions. - Enhance sound endpoint tests to include duplicate handling in responses. - Refactor favorite service tests to utilize mock dependencies for better maintainability and clarity. - Update sound scanner tests to improve file handling and ensure proper deletion of associated files.
702 lines
28 KiB
Python
702 lines
28 KiB
Python
"""Tests for sound scanner service."""
|
|
|
|
import tempfile
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, Mock, patch
|
|
|
|
import pytest
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from app.models.sound import Sound
|
|
from app.services.sound_scanner import SoundScannerService, SyncContext
|
|
|
|
|
|
class TestSoundScannerService:
|
|
"""Test sound scanner service."""
|
|
|
|
@pytest.fixture
|
|
def mock_session(self):
|
|
"""Create a mock session."""
|
|
return Mock(spec=AsyncSession)
|
|
|
|
@pytest.fixture
|
|
def scanner_service(self, mock_session):
|
|
"""Create a scanner service with mock session."""
|
|
return SoundScannerService(mock_session)
|
|
|
|
def test_init(self, scanner_service) -> None:
|
|
"""Test scanner service initialization."""
|
|
assert scanner_service.session is not None
|
|
assert scanner_service.sound_repo is not None
|
|
assert len(scanner_service.supported_extensions) > 0
|
|
assert ".mp3" in scanner_service.supported_extensions
|
|
assert ".wav" in scanner_service.supported_extensions
|
|
|
|
def test_get_file_hash(self, scanner_service) -> None:
|
|
"""Test file hash calculation."""
|
|
# Create a temporary file
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
|
|
f.write("test content")
|
|
temp_path = Path(f.name)
|
|
|
|
try:
|
|
from app.utils.audio import get_file_hash
|
|
|
|
hash_value = get_file_hash(temp_path)
|
|
assert len(hash_value) == 64 # SHA-256 hash length
|
|
assert isinstance(hash_value, str)
|
|
finally:
|
|
temp_path.unlink()
|
|
|
|
def test_get_file_size(self, scanner_service) -> None:
|
|
"""Test file size calculation."""
|
|
# Create a temporary file
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
|
|
f.write("test content for size calculation")
|
|
temp_path = Path(f.name)
|
|
|
|
try:
|
|
from app.utils.audio import get_file_size
|
|
|
|
size = get_file_size(temp_path)
|
|
assert size > 0
|
|
assert isinstance(size, int)
|
|
finally:
|
|
temp_path.unlink()
|
|
|
|
def test_extract_name_from_filename(self, scanner_service) -> None:
|
|
"""Test name extraction from filename."""
|
|
test_cases = [
|
|
("hello_world.mp3", "Hello World"),
|
|
("my-awesome-sound.wav", "My Awesome Sound"),
|
|
("TEST_FILE_NAME.opus", "Test File Name"),
|
|
("single.mp3", "Single"),
|
|
("multiple_words_here.flac", "Multiple Words Here"),
|
|
]
|
|
|
|
for filename, expected_name in test_cases:
|
|
result = scanner_service.extract_name_from_filename(filename)
|
|
assert result == expected_name
|
|
|
|
@patch("app.utils.audio.ffmpeg.probe")
|
|
def test_get_audio_duration_success(self, mock_probe, scanner_service) -> None:
|
|
"""Test successful audio duration extraction."""
|
|
mock_probe.return_value = {"format": {"duration": "123.456"}}
|
|
|
|
temp_path = Path("/fake/path/test.mp3")
|
|
from app.utils.audio import get_audio_duration
|
|
|
|
duration = get_audio_duration(temp_path)
|
|
|
|
assert duration == 123456 # 123.456 seconds * 1000 = 123456 ms
|
|
mock_probe.assert_called_once_with(str(temp_path))
|
|
|
|
@patch("app.utils.audio.ffmpeg.probe")
|
|
def test_get_audio_duration_failure(self, mock_probe, scanner_service) -> None:
|
|
"""Test audio duration extraction failure."""
|
|
mock_probe.side_effect = Exception("FFmpeg error")
|
|
|
|
temp_path = Path("/fake/path/test.mp3")
|
|
from app.utils.audio import get_audio_duration
|
|
|
|
duration = get_audio_duration(temp_path)
|
|
|
|
assert duration == 0
|
|
mock_probe.assert_called_once_with(str(temp_path))
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_directory_nonexistent(self, scanner_service) -> None:
|
|
"""Test scanning a non-existent directory."""
|
|
with pytest.raises(ValueError, match="Directory does not exist"):
|
|
await scanner_service.scan_directory("/non/existent/path")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_directory_not_directory(self, scanner_service) -> None:
|
|
"""Test scanning a path that is not a directory."""
|
|
# Create a temporary file
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
with pytest.raises(ValueError, match="Path is not a directory"):
|
|
await scanner_service.scan_directory(f.name)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sync_audio_file_unchanged(self, scanner_service) -> None:
|
|
"""Test syncing file that is unchanged."""
|
|
# Existing sound with same hash as file
|
|
existing_sound = Sound(
|
|
id=1,
|
|
type="SDB",
|
|
name="Test Sound",
|
|
filename="test.mp3",
|
|
duration=120000, # 120 seconds = 120000 ms
|
|
size=1024,
|
|
hash="same_hash",
|
|
)
|
|
|
|
# Mock file operations to return same hash
|
|
with (
|
|
patch("app.services.sound_scanner.get_file_hash", return_value="same_hash"),
|
|
patch("app.services.sound_scanner.get_audio_duration", return_value=120000),
|
|
patch("app.services.sound_scanner.get_file_size", return_value=1024),
|
|
):
|
|
# Create a temporary file
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
|
|
temp_path = Path(f.name)
|
|
|
|
try:
|
|
results = {
|
|
"scanned": 0,
|
|
"added": 0,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
# Set the existing sound filename to match temp file for "unchanged" test
|
|
existing_sound.filename = temp_path.name
|
|
|
|
sync_context = SyncContext(
|
|
file_path=temp_path,
|
|
sound_type="SDB",
|
|
existing_sound_by_hash=existing_sound,
|
|
existing_sound_by_filename=None,
|
|
file_hash="same_hash",
|
|
)
|
|
await scanner_service._sync_audio_file(sync_context, results)
|
|
|
|
assert results["skipped"] == 1
|
|
assert results["added"] == 0
|
|
assert results["updated"] == 0
|
|
assert len(results["files"]) == 1
|
|
assert results["files"][0]["status"] == "skipped"
|
|
assert results["files"][0]["reason"] == "file unchanged"
|
|
finally:
|
|
temp_path.unlink()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sync_audio_file_renamed(self, scanner_service) -> None:
|
|
"""Test syncing file that was renamed (same hash, different filename)."""
|
|
# Existing sound with same hash but different filename
|
|
existing_sound = Sound(
|
|
id=1,
|
|
type="SDB",
|
|
name="Old Name",
|
|
filename="old_name.mp3",
|
|
duration=120000,
|
|
size=1024,
|
|
hash="same_hash",
|
|
)
|
|
|
|
scanner_service.sound_repo.update = AsyncMock(return_value=existing_sound)
|
|
|
|
# Mock file operations to return same hash
|
|
with (
|
|
patch("app.services.sound_scanner.get_file_hash", return_value="same_hash"),
|
|
patch("app.services.sound_scanner.get_audio_duration", return_value=120000),
|
|
patch("app.services.sound_scanner.get_file_size", return_value=1024),
|
|
):
|
|
# Create a temporary file with different name
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
|
|
temp_path = Path(f.name)
|
|
|
|
try:
|
|
results = {
|
|
"scanned": 0,
|
|
"added": 0,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
|
|
sync_context = SyncContext(
|
|
file_path=temp_path,
|
|
sound_type="SDB",
|
|
existing_sound_by_hash=existing_sound,
|
|
existing_sound_by_filename=None,
|
|
file_hash="same_hash",
|
|
)
|
|
await scanner_service._sync_audio_file(sync_context, results)
|
|
|
|
# Should be marked as updated (renamed)
|
|
assert results["updated"] == 1
|
|
assert results["added"] == 0
|
|
assert results["skipped"] == 0
|
|
assert len(results["files"]) == 1
|
|
assert results["files"][0]["status"] == "updated"
|
|
assert results["files"][0]["reason"] == "file was renamed"
|
|
assert results["files"][0]["changes"] == ["filename", "name"]
|
|
|
|
# Verify update was called with new filename
|
|
scanner_service.sound_repo.update.assert_called_once()
|
|
call_args = scanner_service.sound_repo.update.call_args[0][1] # update_data
|
|
assert call_args["filename"] == temp_path.name
|
|
|
|
finally:
|
|
temp_path.unlink()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_directory_rename_no_delete(self, scanner_service, mock_session) -> None:
|
|
"""Test that renamed files are not deleted (regression test)."""
|
|
# Create a mock existing sound that will be "renamed"
|
|
existing_sound = Sound(
|
|
id=1,
|
|
type="SDB",
|
|
name="Old Name",
|
|
filename="old_name.mp3",
|
|
duration=120000,
|
|
size=1024,
|
|
hash="same_hash",
|
|
)
|
|
|
|
# Mock the repository to return the existing sound
|
|
scanner_service.sound_repo.get_by_type = AsyncMock(return_value=[existing_sound])
|
|
scanner_service.sound_repo.update = AsyncMock()
|
|
scanner_service.sound_repo.delete = AsyncMock()
|
|
|
|
# Create temporary directory with renamed file
|
|
import tempfile
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Create the "renamed" file (same hash, different name)
|
|
new_file_path = Path(temp_dir) / "new_name.mp3"
|
|
with new_file_path.open("wb") as f:
|
|
f.write(b"test audio content") # This will produce consistent hash
|
|
|
|
# Mock file operations to return same hash
|
|
with (
|
|
patch("app.services.sound_scanner.get_file_hash", return_value="same_hash"),
|
|
patch("app.services.sound_scanner.get_audio_duration", return_value=120000),
|
|
patch("app.services.sound_scanner.get_file_size", return_value=1024),
|
|
):
|
|
results = await scanner_service.scan_directory(temp_dir, "SDB")
|
|
|
|
# Should have detected one renamed file
|
|
assert results["updated"] == 1
|
|
assert results["deleted"] == 0 # This is the key assertion - no deletion!
|
|
assert results["added"] == 0
|
|
assert len(results["files"]) == 1
|
|
|
|
# Verify it was marked as renamed
|
|
file_result = results["files"][0]
|
|
assert file_result["status"] == "updated"
|
|
assert file_result["reason"] == "file was renamed"
|
|
|
|
# Verify update was called but delete was NOT called
|
|
scanner_service.sound_repo.update.assert_called_once()
|
|
scanner_service.sound_repo.delete.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_directory_duplicate_detection(self, scanner_service, mock_session) -> None:
|
|
"""Test that duplicate files (same hash) are detected and logged."""
|
|
# Create a mock existing sound
|
|
existing_sound = Sound(
|
|
id=1,
|
|
type="SDB",
|
|
name="Original Song",
|
|
filename="original.mp3",
|
|
duration=120000,
|
|
size=1024,
|
|
hash="same_hash",
|
|
)
|
|
|
|
# Mock the repository
|
|
scanner_service.sound_repo.get_by_type = AsyncMock(return_value=[existing_sound])
|
|
scanner_service.sound_repo.update = AsyncMock()
|
|
|
|
# Create temporary directory with both original and duplicate files
|
|
import tempfile
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Create both files (simulating duplicate content)
|
|
original_path = Path(temp_dir) / "original.mp3"
|
|
duplicate_path = Path(temp_dir) / "duplicate.mp3"
|
|
|
|
with original_path.open("wb") as f:
|
|
f.write(b"test audio content")
|
|
with duplicate_path.open("wb") as f:
|
|
f.write(b"test audio content") # Same content = same hash
|
|
|
|
# Mock file operations
|
|
with (
|
|
patch("app.services.sound_scanner.get_file_hash", return_value="same_hash"),
|
|
patch("app.services.sound_scanner.get_audio_duration", return_value=120000),
|
|
patch("app.services.sound_scanner.get_file_size", return_value=1024),
|
|
):
|
|
results = await scanner_service.scan_directory(temp_dir, "SDB")
|
|
|
|
# Should have 1 unchanged (original) and 1 skipped (duplicate)
|
|
assert results["skipped"] == 2 # Both files have same hash, both skipped
|
|
assert results["duplicates"] == 1 # One duplicate detected
|
|
assert results["updated"] == 0
|
|
assert results["added"] == 0
|
|
assert results["deleted"] == 0
|
|
|
|
# Check that duplicate was properly detected
|
|
skipped_files = [f for f in results["files"] if f["status"] == "skipped"]
|
|
duplicate_file = next((f for f in skipped_files if "duplicate" in f["reason"]), None)
|
|
assert duplicate_file is not None
|
|
assert duplicate_file["reason"] == "duplicate content"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sync_audio_file_new(self, scanner_service) -> None:
|
|
"""Test syncing a new audio file."""
|
|
created_sound = Sound(
|
|
id=1,
|
|
type="SDB",
|
|
name="Test Sound",
|
|
filename="test.mp3",
|
|
duration=120000, # 120 seconds = 120000 ms
|
|
size=1024,
|
|
hash="test_hash",
|
|
)
|
|
scanner_service.sound_repo.create = AsyncMock(return_value=created_sound)
|
|
|
|
# Mock file operations
|
|
with (
|
|
patch("app.services.sound_scanner.get_file_hash", return_value="test_hash"),
|
|
patch("app.services.sound_scanner.get_audio_duration", return_value=120000),
|
|
patch("app.services.sound_scanner.get_file_size", return_value=1024),
|
|
):
|
|
# Create a temporary file
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
|
|
temp_path = Path(f.name)
|
|
|
|
try:
|
|
results = {
|
|
"scanned": 0,
|
|
"added": 0,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
sync_context = SyncContext(
|
|
file_path=temp_path,
|
|
sound_type="SDB",
|
|
existing_sound_by_hash=None,
|
|
existing_sound_by_filename=None,
|
|
file_hash="test_hash",
|
|
)
|
|
await scanner_service._sync_audio_file(sync_context, results)
|
|
|
|
assert results["added"] == 1
|
|
assert results["skipped"] == 0
|
|
assert results["updated"] == 0
|
|
assert len(results["files"]) == 1
|
|
assert results["files"][0]["status"] == "added"
|
|
|
|
# Verify sound_repo.create was called with correct data
|
|
call_args = scanner_service.sound_repo.create.call_args[0][0]
|
|
assert call_args["type"] == "SDB"
|
|
assert call_args["filename"] == temp_path.name
|
|
assert call_args["duration"] == 120000 # Duration in ms
|
|
assert call_args["size"] == 1024
|
|
assert call_args["hash"] == "test_hash"
|
|
assert (
|
|
call_args["is_deletable"] is False
|
|
) # SDB sounds are not deletable
|
|
finally:
|
|
temp_path.unlink()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sync_audio_file_updated(self, scanner_service) -> None:
|
|
"""Test syncing a file that was modified (different hash)."""
|
|
# Existing sound with different hash than file
|
|
existing_sound = Sound(
|
|
id=1,
|
|
type="SDB",
|
|
name="Old Sound",
|
|
filename="test.mp3",
|
|
duration=60000, # Old duration
|
|
size=512, # Old size
|
|
hash="old_hash", # Old hash
|
|
)
|
|
|
|
scanner_service.sound_repo.update = AsyncMock(return_value=existing_sound)
|
|
|
|
# Mock file operations to return new values
|
|
with (
|
|
patch("app.services.sound_scanner.get_file_hash", return_value="new_hash"),
|
|
patch("app.services.sound_scanner.get_audio_duration", return_value=120000),
|
|
patch("app.services.sound_scanner.get_file_size", return_value=1024),
|
|
):
|
|
# Create a temporary file
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
|
|
temp_path = Path(f.name)
|
|
|
|
try:
|
|
results = {
|
|
"scanned": 0,
|
|
"added": 0,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
sync_context = SyncContext(
|
|
file_path=temp_path,
|
|
sound_type="SDB",
|
|
existing_sound_by_hash=None,
|
|
existing_sound_by_filename=existing_sound,
|
|
file_hash="new_hash",
|
|
)
|
|
await scanner_service._sync_audio_file(sync_context, results)
|
|
|
|
assert results["updated"] == 1
|
|
assert results["added"] == 0
|
|
assert results["skipped"] == 0
|
|
assert len(results["files"]) == 1
|
|
assert results["files"][0]["status"] == "updated"
|
|
assert results["files"][0]["reason"] == "file was modified"
|
|
|
|
# Verify sound_repo.update was called with correct data
|
|
call_args = scanner_service.sound_repo.update.call_args[0][
|
|
1
|
|
] # update_data
|
|
assert call_args["duration"] == 120000
|
|
assert call_args["size"] == 1024
|
|
assert call_args["hash"] == "new_hash"
|
|
# Name is extracted from temp filename, should be capitalized
|
|
assert call_args["name"].endswith("mp3") is False # Should be cleaned
|
|
finally:
|
|
temp_path.unlink()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sync_audio_file_custom_type(self, scanner_service) -> None:
|
|
"""Test syncing file with custom type."""
|
|
created_sound = Sound(
|
|
id=1,
|
|
type="CUSTOM",
|
|
name="Test Sound",
|
|
filename="test.mp3",
|
|
duration=60000, # 60 seconds = 60000 ms
|
|
size=2048,
|
|
hash="custom_hash",
|
|
)
|
|
scanner_service.sound_repo.create = AsyncMock(return_value=created_sound)
|
|
|
|
# Mock file operations
|
|
with (
|
|
patch(
|
|
"app.services.sound_scanner.get_file_hash",
|
|
return_value="custom_hash",
|
|
),
|
|
patch("app.services.sound_scanner.get_audio_duration", return_value=60000),
|
|
patch("app.services.sound_scanner.get_file_size", return_value=2048),
|
|
):
|
|
# Create a temporary file
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
|
temp_path = Path(f.name)
|
|
|
|
try:
|
|
results = {
|
|
"scanned": 0,
|
|
"added": 0,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
sync_context = SyncContext(
|
|
file_path=temp_path,
|
|
sound_type="CUSTOM",
|
|
existing_sound_by_hash=None,
|
|
existing_sound_by_filename=None,
|
|
file_hash="custom_hash",
|
|
)
|
|
await scanner_service._sync_audio_file(sync_context, results)
|
|
|
|
assert results["added"] == 1
|
|
assert results["skipped"] == 0
|
|
assert len(results["files"]) == 1
|
|
assert results["files"][0]["status"] == "added"
|
|
|
|
# Verify sound_repo.create was called with correct data for custom type
|
|
call_args = scanner_service.sound_repo.create.call_args[0][0]
|
|
assert call_args["type"] == "CUSTOM"
|
|
assert call_args["filename"] == temp_path.name
|
|
assert call_args["duration"] == 60000 # Duration in ms
|
|
assert call_args["size"] == 2048
|
|
assert call_args["hash"] == "custom_hash"
|
|
assert (
|
|
call_args["is_deletable"] is False
|
|
) # All sounds are set to not deletable
|
|
finally:
|
|
temp_path.unlink()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sync_audio_file_rename_with_normalized_file(
|
|
self, test_session, scanner_service,
|
|
):
|
|
"""Test that renaming a sound file also renames its normalized file."""
|
|
# Create temporary directories for testing
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_dir_path = Path(temp_dir)
|
|
|
|
# Set up the scanner's normalized directories to use temp dir
|
|
scanner_service.normalized_directories = {
|
|
"SDB": str(temp_dir_path / "normalized" / "soundboard"),
|
|
}
|
|
|
|
# Create the normalized directory
|
|
normalized_dir = temp_dir_path / "normalized" / "soundboard"
|
|
normalized_dir.mkdir(parents=True)
|
|
|
|
# Create the old normalized file
|
|
old_normalized_file = normalized_dir / "old_sound.mp3"
|
|
old_normalized_file.write_text("normalized audio content")
|
|
|
|
# Create the audio files (they need to exist for the scanner)
|
|
new_path = temp_dir_path / "new_sound.mp3"
|
|
|
|
# Create a dummy audio file for the new path
|
|
new_path.write_bytes(b"fake audio data for testing")
|
|
|
|
# Mock the audio utility functions since we're using fake files
|
|
from unittest.mock import patch
|
|
with patch("app.services.sound_scanner.get_audio_duration", return_value=60000), \
|
|
patch("app.services.sound_scanner.get_file_size", return_value=2048):
|
|
|
|
# Create existing sound with normalized file info
|
|
existing_sound = Sound(
|
|
id=1,
|
|
type="SDB",
|
|
name="Old Sound",
|
|
filename="old_sound.mp3",
|
|
duration=60000,
|
|
size=2048,
|
|
hash="test_hash",
|
|
is_normalized=True,
|
|
normalized_filename="old_sound.mp3",
|
|
normalized_duration=60000,
|
|
normalized_size=1024,
|
|
normalized_hash="normalized_hash",
|
|
play_count=5,
|
|
is_deletable=False,
|
|
is_music=False,
|
|
)
|
|
|
|
results = {
|
|
"scanned": 0,
|
|
"added": 0,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"duplicates": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
|
|
# Mock the sound repository update
|
|
scanner_service.sound_repo.update = AsyncMock()
|
|
|
|
# Simulate rename detection by calling _sync_audio_file
|
|
sync_context = SyncContext(
|
|
file_path=new_path,
|
|
sound_type="SDB",
|
|
existing_sound_by_hash=existing_sound,
|
|
existing_sound_by_filename=None,
|
|
file_hash="test_hash",
|
|
)
|
|
await scanner_service._sync_audio_file(sync_context, results)
|
|
|
|
# Verify the results
|
|
assert results["updated"] == 1
|
|
assert len(results["files"]) == 1
|
|
assert results["files"][0]["status"] == "updated"
|
|
assert results["files"][0]["reason"] == "file was renamed"
|
|
assert "normalized_filename" in results["files"][0]["changes"]
|
|
|
|
# Verify sound_repo.update was called with normalized filename update
|
|
update_call = scanner_service.sound_repo.update.call_args
|
|
update_data = update_call[0][1] # Second argument is the update data
|
|
|
|
assert "filename" in update_data
|
|
assert "name" in update_data
|
|
assert "normalized_filename" in update_data
|
|
assert update_data["normalized_filename"] == "new_sound.mp3"
|
|
|
|
# Verify the normalized file was actually renamed
|
|
new_normalized_file = normalized_dir / "new_sound.mp3"
|
|
assert new_normalized_file.exists()
|
|
assert not old_normalized_file.exists()
|
|
assert new_normalized_file.read_text() == "normalized audio content"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scan_directory_delete_with_normalized_file(
|
|
self, test_session, scanner_service,
|
|
):
|
|
"""Test that deleting a sound also deletes its normalized file."""
|
|
# Create temporary directories for testing
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_dir_path = Path(temp_dir)
|
|
scan_dir = temp_dir_path / "sounds"
|
|
scan_dir.mkdir()
|
|
|
|
# Set up the scanner's normalized directories to use temp dir
|
|
scanner_service.normalized_directories = {
|
|
"SDB": str(temp_dir_path / "normalized" / "soundboard"),
|
|
}
|
|
|
|
# Create the normalized directory and file
|
|
normalized_dir = temp_dir_path / "normalized" / "soundboard"
|
|
normalized_dir.mkdir(parents=True)
|
|
normalized_file = normalized_dir / "test_sound.mp3"
|
|
normalized_file.write_text("normalized audio content")
|
|
|
|
# Create existing sound with normalized file info
|
|
existing_sound = Sound(
|
|
id=1,
|
|
type="SDB",
|
|
name="Test Sound",
|
|
filename="test_sound.mp3",
|
|
duration=60000,
|
|
size=2048,
|
|
hash="test_hash",
|
|
is_normalized=True,
|
|
normalized_filename="test_sound.mp3",
|
|
normalized_duration=60000,
|
|
normalized_size=1024,
|
|
normalized_hash="normalized_hash",
|
|
play_count=5,
|
|
is_deletable=False,
|
|
is_music=False,
|
|
)
|
|
|
|
# Mock sound repository methods
|
|
scanner_service.sound_repo.get_by_type = AsyncMock(return_value=[existing_sound])
|
|
scanner_service.sound_repo.delete = AsyncMock()
|
|
|
|
# Mock audio utility functions
|
|
from unittest.mock import patch
|
|
with patch("app.services.sound_scanner.get_audio_duration"), \
|
|
patch("app.services.sound_scanner.get_file_size"):
|
|
|
|
# Run scan with empty directory (should trigger deletion)
|
|
results = await scanner_service.scan_directory(str(scan_dir), "SDB")
|
|
|
|
# Verify the results
|
|
assert results["deleted"] == 1
|
|
assert results["added"] == 0
|
|
assert results["updated"] == 0
|
|
assert len(results["files"]) == 1
|
|
assert results["files"][0]["status"] == "deleted"
|
|
assert results["files"][0]["reason"] == "file no longer exists"
|
|
|
|
# Verify sound_repo.delete was called
|
|
scanner_service.sound_repo.delete.assert_called_once_with(existing_sound)
|
|
|
|
# Verify the normalized file was actually deleted
|
|
assert not normalized_file.exists()
|