"""Sound scanner service for scanning and importing audio files.""" from pathlib import Path from typing import TypedDict from sqlmodel.ext.asyncio.session import AsyncSession from app.core.logging import get_logger from app.models.sound import Sound from app.repositories.sound import SoundRepository from app.utils.audio import get_audio_duration, get_file_hash, get_file_size logger = get_logger(__name__) class FileInfo(TypedDict): """Type definition for file information in scan results.""" filename: str status: str reason: str | None name: str | None duration: int | None size: int | None id: int | None error: str | None changes: list[str] | None class ScanResults(TypedDict): """Type definition for scan results.""" scanned: int added: int updated: int deleted: int skipped: int errors: int files: list[FileInfo] class SoundScannerService: """Service for scanning and importing audio files.""" def __init__(self, session: AsyncSession) -> None: """Initialize the sound scanner service.""" self.session = session self.sound_repo = SoundRepository(session) self.supported_extensions = { ".mp3", ".wav", ".opus", ".flac", ".ogg", ".m4a", ".aac", } def extract_name_from_filename(self, filename: str) -> str: """Extract a clean name from filename.""" # Remove extension name = Path(filename).stem # Replace underscores and hyphens with spaces name = name.replace("_", " ").replace("-", " ") # Capitalize words return " ".join(word.capitalize() for word in name.split()) async def scan_directory( self, directory_path: str, sound_type: str = "SDB", ) -> ScanResults: """Sync a directory with the database (add/update/delete sounds).""" scan_path = Path(directory_path) if not scan_path.exists(): msg = f"Directory does not exist: {directory_path}" raise ValueError(msg) if not scan_path.is_dir(): msg = f"Path is not a directory: {directory_path}" raise ValueError(msg) results: ScanResults = { "scanned": 0, "added": 0, "updated": 0, "deleted": 0, "skipped": 0, "errors": 0, "files": [], } logger.info("Starting sync of directory: %s", directory_path) # Get all existing sounds of this type from database existing_sounds = await self.sound_repo.get_by_type(sound_type) sounds_by_filename = {sound.filename: sound for sound in existing_sounds} # Get all audio files from directory audio_files = [ f for f in scan_path.iterdir() if f.is_file() and f.suffix.lower() in self.supported_extensions ] # Process each file in directory processed_filenames = set() for file_path in audio_files: results["scanned"] += 1 filename = file_path.name processed_filenames.add(filename) try: await self._sync_audio_file( file_path, sound_type, sounds_by_filename.get(filename), results, ) except Exception as e: logger.exception("Error processing file %s", file_path) results["errors"] += 1 results["files"].append( { "filename": filename, "status": "error", "reason": None, "name": None, "duration": None, "size": None, "id": None, "error": str(e), "changes": None, }, ) # Delete sounds that no longer exist in directory for filename, sound in sounds_by_filename.items(): if filename not in processed_filenames: try: await self.sound_repo.delete(sound) logger.info("Deleted sound no longer in directory: %s", filename) results["deleted"] += 1 results["files"].append( { "filename": filename, "status": "deleted", "reason": "file no longer exists", "name": sound.name, "duration": sound.duration, "size": sound.size, "id": sound.id, "error": None, "changes": None, }, ) except Exception as e: logger.exception("Error deleting sound %s", filename) results["errors"] += 1 results["files"].append( { "filename": filename, "status": "error", "reason": "failed to delete", "name": sound.name, "duration": sound.duration, "size": sound.size, "id": sound.id, "error": str(e), "changes": None, }, ) logger.info("Sync completed: %s", results) return results async def _sync_audio_file( self, file_path: Path, sound_type: str, existing_sound: Sound | None, results: ScanResults, ) -> None: """Sync a single audio file (add new or update existing).""" filename = file_path.name file_hash = get_file_hash(file_path) duration = get_audio_duration(file_path) size = get_file_size(file_path) name = self.extract_name_from_filename(filename) if existing_sound is None: # Add new sound sound_data = { "type": sound_type, "name": name, "filename": filename, "duration": duration, "size": size, "hash": file_hash, "is_deletable": False, "is_music": False, "is_normalized": False, "play_count": 0, } sound = await self.sound_repo.create(sound_data) logger.info("Added new sound: %s (ID: %s)", sound.name, sound.id) results["added"] += 1 results["files"].append( { "filename": filename, "status": "added", "reason": None, "name": name, "duration": duration, "size": size, "id": sound.id, "error": None, "changes": None, }, ) elif existing_sound.hash != file_hash: # Update existing sound (file was modified) update_data = { "name": name, "duration": duration, "size": size, "hash": file_hash, } await self.sound_repo.update(existing_sound, update_data) logger.info("Updated modified sound: %s (ID: %s)", name, existing_sound.id) results["updated"] += 1 results["files"].append( { "filename": filename, "status": "updated", "reason": "file was modified", "name": name, "duration": duration, "size": size, "id": existing_sound.id, "error": None, "changes": ["hash", "duration", "size", "name"], }, ) else: # File unchanged, skip logger.debug("Sound unchanged: %s", filename) results["skipped"] += 1 results["files"].append( { "filename": filename, "status": "skipped", "reason": "file unchanged", "name": existing_sound.name, "duration": existing_sound.duration, "size": existing_sound.size, "id": existing_sound.id, "error": None, "changes": None, }, ) async def scan_soundboard_directory(self) -> ScanResults: """Sync the default soundboard directory.""" soundboard_path = "sounds/originals/soundboard" return await self.scan_directory(soundboard_path, "SDB")