Files
sdb2-backend/app/services/sound_scanner.py
JSC 5ed19c8f0f Add comprehensive tests for playlist service and refactor socket service tests
- Introduced a new test suite for the PlaylistService covering various functionalities including creation, retrieval, updating, and deletion of playlists.
- Added tests for handling sounds within playlists, ensuring correct behavior when adding/removing sounds and managing current playlists.
- Refactored socket service tests for improved readability by adjusting function signatures.
- Cleaned up unnecessary whitespace in sound normalizer and sound scanner tests for consistency.
- Enhanced audio utility tests to ensure accurate hash and size calculations, including edge cases for nonexistent files.
- Removed redundant blank lines in cookie utility tests for cleaner code.
2025-07-29 19:25:46 +02:00

274 lines
8.9 KiB
Python

"""Sound scanner service for scanning and importing audio files."""
from pathlib import Path
from typing import TypedDict
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.logging import get_logger
from app.models.sound import Sound
from app.repositories.sound import SoundRepository
from app.utils.audio import get_audio_duration, get_file_hash, get_file_size
logger = get_logger(__name__)
class FileInfo(TypedDict):
"""Type definition for file information in scan results."""
filename: str
status: str
reason: str | None
name: str | None
duration: int | None
size: int | None
id: int | None
error: str | None
changes: list[str] | None
class ScanResults(TypedDict):
"""Type definition for scan results."""
scanned: int
added: int
updated: int
deleted: int
skipped: int
errors: int
files: list[FileInfo]
class SoundScannerService:
"""Service for scanning and importing audio files."""
def __init__(self, session: AsyncSession) -> None:
"""Initialize the sound scanner service."""
self.session = session
self.sound_repo = SoundRepository(session)
self.supported_extensions = {
".mp3",
".wav",
".opus",
".flac",
".ogg",
".m4a",
".aac",
}
def extract_name_from_filename(self, filename: str) -> str:
"""Extract a clean name from filename."""
# Remove extension
name = Path(filename).stem
# Replace underscores and hyphens with spaces
name = name.replace("_", " ").replace("-", " ")
# Capitalize words
return " ".join(word.capitalize() for word in name.split())
async def scan_directory(
self,
directory_path: str,
sound_type: str = "SDB",
) -> ScanResults:
"""Sync a directory with the database (add/update/delete sounds)."""
scan_path = Path(directory_path)
if not scan_path.exists():
msg = f"Directory does not exist: {directory_path}"
raise ValueError(msg)
if not scan_path.is_dir():
msg = f"Path is not a directory: {directory_path}"
raise ValueError(msg)
results: ScanResults = {
"scanned": 0,
"added": 0,
"updated": 0,
"deleted": 0,
"skipped": 0,
"errors": 0,
"files": [],
}
logger.info("Starting sync of directory: %s", directory_path)
# Get all existing sounds of this type from database
existing_sounds = await self.sound_repo.get_by_type(sound_type)
sounds_by_filename = {sound.filename: sound for sound in existing_sounds}
# Get all audio files from directory
audio_files = [
f
for f in scan_path.iterdir()
if f.is_file() and f.suffix.lower() in self.supported_extensions
]
# Process each file in directory
processed_filenames = set()
for file_path in audio_files:
results["scanned"] += 1
filename = file_path.name
processed_filenames.add(filename)
try:
await self._sync_audio_file(
file_path,
sound_type,
sounds_by_filename.get(filename),
results,
)
except Exception as e:
logger.exception("Error processing file %s", file_path)
results["errors"] += 1
results["files"].append(
{
"filename": filename,
"status": "error",
"reason": None,
"name": None,
"duration": None,
"size": None,
"id": None,
"error": str(e),
"changes": None,
}
)
# Delete sounds that no longer exist in directory
for filename, sound in sounds_by_filename.items():
if filename not in processed_filenames:
try:
await self.sound_repo.delete(sound)
logger.info("Deleted sound no longer in directory: %s", filename)
results["deleted"] += 1
results["files"].append(
{
"filename": filename,
"status": "deleted",
"reason": "file no longer exists",
"name": sound.name,
"duration": sound.duration,
"size": sound.size,
"id": sound.id,
"error": None,
"changes": None,
}
)
except Exception as e:
logger.exception("Error deleting sound %s", filename)
results["errors"] += 1
results["files"].append(
{
"filename": filename,
"status": "error",
"reason": "failed to delete",
"name": sound.name,
"duration": sound.duration,
"size": sound.size,
"id": sound.id,
"error": str(e),
"changes": None,
}
)
logger.info("Sync completed: %s", results)
return results
async def _sync_audio_file(
self,
file_path: Path,
sound_type: str,
existing_sound: Sound | None,
results: ScanResults,
) -> None:
"""Sync a single audio file (add new or update existing)."""
filename = file_path.name
file_hash = get_file_hash(file_path)
duration = get_audio_duration(file_path)
size = get_file_size(file_path)
name = self.extract_name_from_filename(filename)
if existing_sound is None:
# Add new sound
sound_data = {
"type": sound_type,
"name": name,
"filename": filename,
"duration": duration,
"size": size,
"hash": file_hash,
"is_deletable": False,
"is_music": False,
"is_normalized": False,
"play_count": 0,
}
sound = await self.sound_repo.create(sound_data)
logger.info("Added new sound: %s (ID: %s)", sound.name, sound.id)
results["added"] += 1
results["files"].append(
{
"filename": filename,
"status": "added",
"reason": None,
"name": name,
"duration": duration,
"size": size,
"id": sound.id,
"error": None,
"changes": None,
}
)
elif existing_sound.hash != file_hash:
# Update existing sound (file was modified)
update_data = {
"name": name,
"duration": duration,
"size": size,
"hash": file_hash,
}
await self.sound_repo.update(existing_sound, update_data)
logger.info("Updated modified sound: %s (ID: %s)", name, existing_sound.id)
results["updated"] += 1
results["files"].append(
{
"filename": filename,
"status": "updated",
"reason": "file was modified",
"name": name,
"duration": duration,
"size": size,
"id": existing_sound.id,
"error": None,
"changes": ["hash", "duration", "size", "name"],
}
)
else:
# File unchanged, skip
logger.debug("Sound unchanged: %s", filename)
results["skipped"] += 1
results["files"].append(
{
"filename": filename,
"status": "skipped",
"reason": "file unchanged",
"name": existing_sound.name,
"duration": existing_sound.duration,
"size": existing_sound.size,
"id": existing_sound.id,
"error": None,
"changes": None,
}
)
async def scan_soundboard_directory(self) -> ScanResults:
"""Sync the default soundboard directory."""
soundboard_path = "sounds/originals/soundboard"
return await self.scan_directory(soundboard_path, "SDB")