Add tests for extraction API endpoints and enhance existing tests
- Implement tests for admin extraction API endpoints including status retrieval, deletion of extractions, and permission checks. - Add tests for user extraction deletion, ensuring proper handling of permissions and non-existent extractions. - Enhance sound endpoint tests to include duplicate handling in responses. - Refactor favorite service tests to utilize mock dependencies for better maintainability and clarity. - Update sound scanner tests to improve file handling and ensure proper deletion of associated files.
This commit is contained in:
@@ -8,7 +8,7 @@ import pytest
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
from app.models.sound import Sound
|
||||
from app.services.sound_scanner import SoundScannerService
|
||||
from app.services.sound_scanner import SoundScannerService, SyncContext
|
||||
|
||||
|
||||
class TestSoundScannerService:
|
||||
@@ -154,15 +154,15 @@ class TestSoundScannerService:
|
||||
}
|
||||
# Set the existing sound filename to match temp file for "unchanged" test
|
||||
existing_sound.filename = temp_path.name
|
||||
|
||||
await scanner_service._sync_audio_file(
|
||||
temp_path,
|
||||
"SDB",
|
||||
existing_sound, # existing_sound_by_hash (same hash)
|
||||
None, # existing_sound_by_filename (no conflict)
|
||||
"same_hash",
|
||||
results,
|
||||
|
||||
sync_context = SyncContext(
|
||||
file_path=temp_path,
|
||||
sound_type="SDB",
|
||||
existing_sound_by_hash=existing_sound,
|
||||
existing_sound_by_filename=None,
|
||||
file_hash="same_hash",
|
||||
)
|
||||
await scanner_service._sync_audio_file(sync_context, results)
|
||||
|
||||
assert results["skipped"] == 1
|
||||
assert results["added"] == 0
|
||||
@@ -186,7 +186,7 @@ class TestSoundScannerService:
|
||||
size=1024,
|
||||
hash="same_hash",
|
||||
)
|
||||
|
||||
|
||||
scanner_service.sound_repo.update = AsyncMock(return_value=existing_sound)
|
||||
|
||||
# Mock file operations to return same hash
|
||||
@@ -209,15 +209,15 @@ class TestSoundScannerService:
|
||||
"errors": 0,
|
||||
"files": [],
|
||||
}
|
||||
|
||||
await scanner_service._sync_audio_file(
|
||||
temp_path,
|
||||
"SDB",
|
||||
existing_sound, # existing_sound_by_hash (same hash)
|
||||
None, # existing_sound_by_filename (different filename)
|
||||
"same_hash",
|
||||
results,
|
||||
|
||||
sync_context = SyncContext(
|
||||
file_path=temp_path,
|
||||
sound_type="SDB",
|
||||
existing_sound_by_hash=existing_sound,
|
||||
existing_sound_by_filename=None,
|
||||
file_hash="same_hash",
|
||||
)
|
||||
await scanner_service._sync_audio_file(sync_context, results)
|
||||
|
||||
# Should be marked as updated (renamed)
|
||||
assert results["updated"] == 1
|
||||
@@ -227,12 +227,12 @@ class TestSoundScannerService:
|
||||
assert results["files"][0]["status"] == "updated"
|
||||
assert results["files"][0]["reason"] == "file was renamed"
|
||||
assert results["files"][0]["changes"] == ["filename", "name"]
|
||||
|
||||
|
||||
# Verify update was called with new filename
|
||||
scanner_service.sound_repo.update.assert_called_once()
|
||||
call_args = scanner_service.sound_repo.update.call_args[0][1] # update_data
|
||||
assert call_args["filename"] == temp_path.name
|
||||
|
||||
|
||||
finally:
|
||||
temp_path.unlink()
|
||||
|
||||
@@ -249,22 +249,21 @@ class TestSoundScannerService:
|
||||
size=1024,
|
||||
hash="same_hash",
|
||||
)
|
||||
|
||||
|
||||
# Mock the repository to return the existing sound
|
||||
scanner_service.sound_repo.get_by_type = AsyncMock(return_value=[existing_sound])
|
||||
scanner_service.sound_repo.update = AsyncMock()
|
||||
scanner_service.sound_repo.delete = AsyncMock()
|
||||
|
||||
|
||||
# Create temporary directory with renamed file
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
# Create the "renamed" file (same hash, different name)
|
||||
new_file_path = os.path.join(temp_dir, "new_name.mp3")
|
||||
with open(new_file_path, "wb") as f:
|
||||
new_file_path = Path(temp_dir) / "new_name.mp3"
|
||||
with new_file_path.open("wb") as f:
|
||||
f.write(b"test audio content") # This will produce consistent hash
|
||||
|
||||
|
||||
# Mock file operations to return same hash
|
||||
with (
|
||||
patch("app.services.sound_scanner.get_file_hash", return_value="same_hash"),
|
||||
@@ -272,18 +271,18 @@ class TestSoundScannerService:
|
||||
patch("app.services.sound_scanner.get_file_size", return_value=1024),
|
||||
):
|
||||
results = await scanner_service.scan_directory(temp_dir, "SDB")
|
||||
|
||||
|
||||
# Should have detected one renamed file
|
||||
assert results["updated"] == 1
|
||||
assert results["deleted"] == 0 # This is the key assertion - no deletion!
|
||||
assert results["added"] == 0
|
||||
assert len(results["files"]) == 1
|
||||
|
||||
|
||||
# Verify it was marked as renamed
|
||||
file_result = results["files"][0]
|
||||
assert file_result["status"] == "updated"
|
||||
assert file_result["reason"] == "file was renamed"
|
||||
|
||||
|
||||
# Verify update was called but delete was NOT called
|
||||
scanner_service.sound_repo.update.assert_called_once()
|
||||
scanner_service.sound_repo.delete.assert_not_called()
|
||||
@@ -301,25 +300,24 @@ class TestSoundScannerService:
|
||||
size=1024,
|
||||
hash="same_hash",
|
||||
)
|
||||
|
||||
|
||||
# Mock the repository
|
||||
scanner_service.sound_repo.get_by_type = AsyncMock(return_value=[existing_sound])
|
||||
scanner_service.sound_repo.update = AsyncMock()
|
||||
|
||||
|
||||
# Create temporary directory with both original and duplicate files
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
# Create both files (simulating duplicate content)
|
||||
original_path = os.path.join(temp_dir, "original.mp3")
|
||||
duplicate_path = os.path.join(temp_dir, "duplicate.mp3")
|
||||
|
||||
with open(original_path, "wb") as f:
|
||||
original_path = Path(temp_dir) / "original.mp3"
|
||||
duplicate_path = Path(temp_dir) / "duplicate.mp3"
|
||||
|
||||
with original_path.open("wb") as f:
|
||||
f.write(b"test audio content")
|
||||
with open(duplicate_path, "wb") as f:
|
||||
with duplicate_path.open("wb") as f:
|
||||
f.write(b"test audio content") # Same content = same hash
|
||||
|
||||
|
||||
# Mock file operations
|
||||
with (
|
||||
patch("app.services.sound_scanner.get_file_hash", return_value="same_hash"),
|
||||
@@ -327,14 +325,14 @@ class TestSoundScannerService:
|
||||
patch("app.services.sound_scanner.get_file_size", return_value=1024),
|
||||
):
|
||||
results = await scanner_service.scan_directory(temp_dir, "SDB")
|
||||
|
||||
|
||||
# Should have 1 unchanged (original) and 1 skipped (duplicate)
|
||||
assert results["skipped"] == 2 # Both files have same hash, both skipped
|
||||
assert results["duplicates"] == 1 # One duplicate detected
|
||||
assert results["updated"] == 0
|
||||
assert results["added"] == 0
|
||||
assert results["deleted"] == 0
|
||||
|
||||
|
||||
# Check that duplicate was properly detected
|
||||
skipped_files = [f for f in results["files"] if f["status"] == "skipped"]
|
||||
duplicate_file = next((f for f in skipped_files if "duplicate" in f["reason"]), None)
|
||||
@@ -375,14 +373,14 @@ class TestSoundScannerService:
|
||||
"errors": 0,
|
||||
"files": [],
|
||||
}
|
||||
await scanner_service._sync_audio_file(
|
||||
temp_path,
|
||||
"SDB",
|
||||
None, # existing_sound_by_hash
|
||||
None, # existing_sound_by_filename
|
||||
"test_hash",
|
||||
results,
|
||||
sync_context = SyncContext(
|
||||
file_path=temp_path,
|
||||
sound_type="SDB",
|
||||
existing_sound_by_hash=None,
|
||||
existing_sound_by_filename=None,
|
||||
file_hash="test_hash",
|
||||
)
|
||||
await scanner_service._sync_audio_file(sync_context, results)
|
||||
|
||||
assert results["added"] == 1
|
||||
assert results["skipped"] == 0
|
||||
@@ -439,14 +437,14 @@ class TestSoundScannerService:
|
||||
"errors": 0,
|
||||
"files": [],
|
||||
}
|
||||
await scanner_service._sync_audio_file(
|
||||
temp_path,
|
||||
"SDB",
|
||||
None, # existing_sound_by_hash (different hash)
|
||||
existing_sound, # existing_sound_by_filename
|
||||
"new_hash",
|
||||
results,
|
||||
sync_context = SyncContext(
|
||||
file_path=temp_path,
|
||||
sound_type="SDB",
|
||||
existing_sound_by_hash=None,
|
||||
existing_sound_by_filename=existing_sound,
|
||||
file_hash="new_hash",
|
||||
)
|
||||
await scanner_service._sync_audio_file(sync_context, results)
|
||||
|
||||
assert results["updated"] == 1
|
||||
assert results["added"] == 0
|
||||
@@ -504,14 +502,14 @@ class TestSoundScannerService:
|
||||
"errors": 0,
|
||||
"files": [],
|
||||
}
|
||||
await scanner_service._sync_audio_file(
|
||||
temp_path,
|
||||
"CUSTOM",
|
||||
None, # existing_sound_by_hash
|
||||
None, # existing_sound_by_filename
|
||||
"custom_hash",
|
||||
results,
|
||||
sync_context = SyncContext(
|
||||
file_path=temp_path,
|
||||
sound_type="CUSTOM",
|
||||
existing_sound_by_hash=None,
|
||||
existing_sound_by_filename=None,
|
||||
file_hash="custom_hash",
|
||||
)
|
||||
await scanner_service._sync_audio_file(sync_context, results)
|
||||
|
||||
assert results["added"] == 1
|
||||
assert results["skipped"] == 0
|
||||
@@ -533,41 +531,40 @@ class TestSoundScannerService:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_audio_file_rename_with_normalized_file(
|
||||
self, test_session, scanner_service
|
||||
self, test_session, scanner_service,
|
||||
):
|
||||
"""Test that renaming a sound file also renames its normalized file."""
|
||||
# Create temporary directories for testing
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_dir_path = Path(temp_dir)
|
||||
|
||||
|
||||
# Set up the scanner's normalized directories to use temp dir
|
||||
scanner_service.normalized_directories = {
|
||||
"SDB": str(temp_dir_path / "normalized" / "soundboard")
|
||||
"SDB": str(temp_dir_path / "normalized" / "soundboard"),
|
||||
}
|
||||
|
||||
|
||||
# Create the normalized directory
|
||||
normalized_dir = temp_dir_path / "normalized" / "soundboard"
|
||||
normalized_dir.mkdir(parents=True)
|
||||
|
||||
|
||||
# Create the old normalized file
|
||||
old_normalized_file = normalized_dir / "old_sound.mp3"
|
||||
old_normalized_file.write_text("normalized audio content")
|
||||
|
||||
|
||||
# Create the audio files (they need to exist for the scanner)
|
||||
old_path = temp_dir_path / "old_sound.mp3"
|
||||
new_path = temp_dir_path / "new_sound.mp3"
|
||||
|
||||
|
||||
# Create a dummy audio file for the new path
|
||||
new_path.write_bytes(b"fake audio data for testing")
|
||||
|
||||
|
||||
# Mock the audio utility functions since we're using fake files
|
||||
from unittest.mock import patch
|
||||
with patch('app.services.sound_scanner.get_audio_duration', return_value=60000), \
|
||||
patch('app.services.sound_scanner.get_file_size', return_value=2048):
|
||||
|
||||
with patch("app.services.sound_scanner.get_audio_duration", return_value=60000), \
|
||||
patch("app.services.sound_scanner.get_file_size", return_value=2048):
|
||||
|
||||
# Create existing sound with normalized file info
|
||||
existing_sound = Sound(
|
||||
id=1,
|
||||
@@ -584,9 +581,9 @@ class TestSoundScannerService:
|
||||
normalized_hash="normalized_hash",
|
||||
play_count=5,
|
||||
is_deletable=False,
|
||||
is_music=False
|
||||
is_music=False,
|
||||
)
|
||||
|
||||
|
||||
results = {
|
||||
"scanned": 0,
|
||||
"added": 0,
|
||||
@@ -597,36 +594,36 @@ class TestSoundScannerService:
|
||||
"errors": 0,
|
||||
"files": [],
|
||||
}
|
||||
|
||||
|
||||
# Mock the sound repository update
|
||||
scanner_service.sound_repo.update = AsyncMock()
|
||||
|
||||
|
||||
# Simulate rename detection by calling _sync_audio_file
|
||||
await scanner_service._sync_audio_file(
|
||||
new_path,
|
||||
"SDB",
|
||||
existing_sound, # existing_sound_by_hash (same hash, different filename)
|
||||
None, # existing_sound_by_filename (no file with new name exists)
|
||||
"test_hash",
|
||||
results,
|
||||
sync_context = SyncContext(
|
||||
file_path=new_path,
|
||||
sound_type="SDB",
|
||||
existing_sound_by_hash=existing_sound,
|
||||
existing_sound_by_filename=None,
|
||||
file_hash="test_hash",
|
||||
)
|
||||
|
||||
await scanner_service._sync_audio_file(sync_context, results)
|
||||
|
||||
# Verify the results
|
||||
assert results["updated"] == 1
|
||||
assert len(results["files"]) == 1
|
||||
assert results["files"][0]["status"] == "updated"
|
||||
assert results["files"][0]["reason"] == "file was renamed"
|
||||
assert "normalized_filename" in results["files"][0]["changes"]
|
||||
|
||||
|
||||
# Verify sound_repo.update was called with normalized filename update
|
||||
update_call = scanner_service.sound_repo.update.call_args
|
||||
update_data = update_call[0][1] # Second argument is the update data
|
||||
|
||||
|
||||
assert "filename" in update_data
|
||||
assert "name" in update_data
|
||||
assert "normalized_filename" in update_data
|
||||
assert update_data["normalized_filename"] == "new_sound.mp3"
|
||||
|
||||
|
||||
# Verify the normalized file was actually renamed
|
||||
new_normalized_file = normalized_dir / "new_sound.mp3"
|
||||
assert new_normalized_file.exists()
|
||||
@@ -635,29 +632,29 @@ class TestSoundScannerService:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scan_directory_delete_with_normalized_file(
|
||||
self, test_session, scanner_service
|
||||
self, test_session, scanner_service,
|
||||
):
|
||||
"""Test that deleting a sound also deletes its normalized file."""
|
||||
# Create temporary directories for testing
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_dir_path = Path(temp_dir)
|
||||
scan_dir = temp_dir_path / "sounds"
|
||||
scan_dir.mkdir()
|
||||
|
||||
|
||||
# Set up the scanner's normalized directories to use temp dir
|
||||
scanner_service.normalized_directories = {
|
||||
"SDB": str(temp_dir_path / "normalized" / "soundboard")
|
||||
"SDB": str(temp_dir_path / "normalized" / "soundboard"),
|
||||
}
|
||||
|
||||
|
||||
# Create the normalized directory and file
|
||||
normalized_dir = temp_dir_path / "normalized" / "soundboard"
|
||||
normalized_dir.mkdir(parents=True)
|
||||
normalized_file = normalized_dir / "test_sound.mp3"
|
||||
normalized_file.write_text("normalized audio content")
|
||||
|
||||
|
||||
# Create existing sound with normalized file info
|
||||
existing_sound = Sound(
|
||||
id=1,
|
||||
@@ -674,21 +671,21 @@ class TestSoundScannerService:
|
||||
normalized_hash="normalized_hash",
|
||||
play_count=5,
|
||||
is_deletable=False,
|
||||
is_music=False
|
||||
is_music=False,
|
||||
)
|
||||
|
||||
|
||||
# Mock sound repository methods
|
||||
scanner_service.sound_repo.get_by_type = AsyncMock(return_value=[existing_sound])
|
||||
scanner_service.sound_repo.delete = AsyncMock()
|
||||
|
||||
|
||||
# Mock audio utility functions
|
||||
from unittest.mock import patch
|
||||
with patch('app.services.sound_scanner.get_audio_duration'), \
|
||||
patch('app.services.sound_scanner.get_file_size'):
|
||||
|
||||
with patch("app.services.sound_scanner.get_audio_duration"), \
|
||||
patch("app.services.sound_scanner.get_file_size"):
|
||||
|
||||
# Run scan with empty directory (should trigger deletion)
|
||||
results = await scanner_service.scan_directory(str(scan_dir), "SDB")
|
||||
|
||||
|
||||
# Verify the results
|
||||
assert results["deleted"] == 1
|
||||
assert results["added"] == 0
|
||||
@@ -696,9 +693,9 @@ class TestSoundScannerService:
|
||||
assert len(results["files"]) == 1
|
||||
assert results["files"][0]["status"] == "deleted"
|
||||
assert results["files"][0]["reason"] == "file no longer exists"
|
||||
|
||||
|
||||
# Verify sound_repo.delete was called
|
||||
scanner_service.sound_repo.delete.assert_called_once_with(existing_sound)
|
||||
|
||||
|
||||
# Verify the normalized file was actually deleted
|
||||
assert not normalized_file.exists()
|
||||
|
||||
Reference in New Issue
Block a user