"""Sound scanner service for scanning and importing audio files.""" from pathlib import Path from typing import TypedDict from sqlmodel.ext.asyncio.session import AsyncSession from app.core.logging import get_logger from app.models.sound import Sound from app.repositories.sound import SoundRepository from app.utils.audio import get_audio_duration, get_file_hash, get_file_size logger = get_logger(__name__) class FileInfo(TypedDict): """Type definition for file information in scan results.""" filename: str status: str reason: str | None name: str | None duration: int | None size: int | None id: int | None error: str | None changes: list[str] | None class ScanResults(TypedDict): """Type definition for scan results.""" scanned: int added: int updated: int deleted: int skipped: int errors: int files: list[FileInfo] class SoundScannerService: """Service for scanning and importing audio files.""" def __init__(self, session: AsyncSession) -> None: """Initialize the sound scanner service.""" self.session = session self.sound_repo = SoundRepository(session) self.supported_extensions = { ".mp3", ".wav", ".opus", ".flac", ".ogg", ".m4a", ".aac", } def extract_name_from_filename(self, filename: str) -> str: """Extract a clean name from filename.""" # Remove extension name = Path(filename).stem # Replace underscores and hyphens with spaces name = name.replace("_", " ").replace("-", " ") # Capitalize words return " ".join(word.capitalize() for word in name.split()) async def scan_directory( self, directory_path: str, sound_type: str = "SDB", ) -> ScanResults: """Sync a directory with the database (add/update/delete sounds).""" scan_path = Path(directory_path) if not scan_path.exists(): msg = f"Directory does not exist: {directory_path}" raise ValueError(msg) if not scan_path.is_dir(): msg = f"Path is not a directory: {directory_path}" raise ValueError(msg) results: ScanResults = { "scanned": 0, "added": 0, "updated": 0, "deleted": 0, "skipped": 0, "errors": 0, "files": [], } logger.info("Starting sync of directory: %s", directory_path) # Get all existing sounds of this type from database existing_sounds = await self.sound_repo.get_by_type(sound_type) # Create lookup dictionaries with immediate attribute access # to avoid session detachment sounds_by_hash = {} sounds_by_filename = {} for sound in existing_sounds: # Capture all attributes immediately while session is valid sound_data = { "id": sound.id, "hash": sound.hash, "filename": sound.filename, "name": sound.name, "duration": sound.duration, "size": sound.size, "sound_object": sound, # Keep reference for database operations } sounds_by_hash[sound.hash] = sound_data sounds_by_filename[sound.filename] = sound_data # Get all audio files from directory audio_files = [ f for f in scan_path.iterdir() if f.is_file() and f.suffix.lower() in self.supported_extensions ] # Process each file in directory processed_filenames = set() for file_path in audio_files: results["scanned"] += 1 filename = file_path.name processed_filenames.add(filename) try: # Calculate hash first to enable hash-based lookup file_hash = get_file_hash(file_path) existing_sound_by_hash = sounds_by_hash.get(file_hash) existing_sound_by_filename = sounds_by_filename.get(filename) await self._sync_audio_file( file_path, sound_type, existing_sound_by_hash, existing_sound_by_filename, file_hash, results, ) # Check if this was a rename operation and mark old filename as processed if results["files"] and results["files"][-1].get("old_filename"): old_filename = results["files"][-1]["old_filename"] processed_filenames.add(old_filename) logger.debug("Marked old filename as processed: %s", old_filename) # Remove temporary tracking field from results del results["files"][-1]["old_filename"] except Exception as e: logger.exception("Error processing file %s", file_path) results["errors"] += 1 results["files"].append( { "filename": filename, "status": "error", "reason": None, "name": None, "duration": None, "size": None, "id": None, "error": str(e), "changes": None, }, ) # Delete sounds that no longer exist in directory for filename, sound_data in sounds_by_filename.items(): if filename not in processed_filenames: # Attributes already captured in sound_data dictionary sound_name = sound_data["name"] sound_duration = sound_data["duration"] sound_size = sound_data["size"] sound_id = sound_data["id"] sound_object = sound_data["sound_object"] try: await self.sound_repo.delete(sound_object) logger.info("Deleted sound no longer in directory: %s", filename) results["deleted"] += 1 results["files"].append( { "filename": filename, "status": "deleted", "reason": "file no longer exists", "name": sound_name, "duration": sound_duration, "size": sound_size, "id": sound_id, "error": None, "changes": None, }, ) except Exception as e: logger.exception("Error deleting sound %s", filename) results["errors"] += 1 results["files"].append( { "filename": filename, "status": "error", "reason": "failed to delete", "name": sound_name, "duration": sound_duration, "size": sound_size, "id": sound_id, "error": str(e), "changes": None, }, ) logger.info("Sync completed: %s", results) return results async def _sync_audio_file( self, file_path: Path, sound_type: str, existing_sound_by_hash: dict | Sound | None, existing_sound_by_filename: dict | Sound | None, file_hash: str, results: ScanResults, ) -> None: """Sync a single audio file using hash-first identification strategy.""" filename = file_path.name duration = get_audio_duration(file_path) size = get_file_size(file_path) name = self.extract_name_from_filename(filename) # Extract attributes - handle both dict (normal) and Sound object (tests) existing_hash_filename = None existing_hash_name = None existing_hash_duration = None existing_hash_size = None existing_hash_id = None existing_hash_object = None if existing_sound_by_hash is not None: if isinstance(existing_sound_by_hash, dict): existing_hash_filename = existing_sound_by_hash["filename"] existing_hash_name = existing_sound_by_hash["name"] existing_hash_duration = existing_sound_by_hash["duration"] existing_hash_size = existing_sound_by_hash["size"] existing_hash_id = existing_sound_by_hash["id"] existing_hash_object = existing_sound_by_hash["sound_object"] else: # Sound object (for tests) existing_hash_filename = existing_sound_by_hash.filename existing_hash_name = existing_sound_by_hash.name existing_hash_duration = existing_sound_by_hash.duration existing_hash_size = existing_sound_by_hash.size existing_hash_id = existing_sound_by_hash.id existing_hash_object = existing_sound_by_hash existing_filename_id = None existing_filename_object = None if existing_sound_by_filename is not None: if isinstance(existing_sound_by_filename, dict): existing_filename_id = existing_sound_by_filename["id"] existing_filename_object = existing_sound_by_filename["sound_object"] else: # Sound object (for tests) existing_filename_id = existing_sound_by_filename.id existing_filename_object = existing_sound_by_filename # Hash-first identification strategy if existing_sound_by_hash is not None: # Content exists in database (same hash) if existing_hash_filename == filename: # Same hash, same filename - file unchanged logger.debug("Sound unchanged: %s", filename) results["skipped"] += 1 results["files"].append( { "filename": filename, "status": "skipped", "reason": "file unchanged", "name": existing_hash_name, "duration": existing_hash_duration, "size": existing_hash_size, "id": existing_hash_id, "error": None, "changes": None, }, ) else: # Same hash, different filename - file was renamed update_data = { "filename": filename, "name": name, } await self.sound_repo.update(existing_hash_object, update_data) logger.info( "Detected rename: %s -> %s (ID: %s)", existing_hash_filename, filename, existing_hash_id, ) results["updated"] += 1 results["files"].append( { "filename": filename, "status": "updated", "reason": "file was renamed", "name": name, "duration": existing_hash_duration, "size": existing_hash_size, "id": existing_hash_id, "error": None, "changes": ["filename", "name"], # Store old filename to prevent deletion "old_filename": existing_hash_filename, }, ) elif existing_sound_by_filename is not None: # Same filename but different hash - file was modified update_data = { "name": name, "duration": duration, "size": size, "hash": file_hash, } await self.sound_repo.update(existing_filename_object, update_data) logger.info( "Updated modified sound: %s (ID: %s)", name, existing_filename_id, ) results["updated"] += 1 results["files"].append( { "filename": filename, "status": "updated", "reason": "file was modified", "name": name, "duration": duration, "size": size, "id": existing_filename_id, "error": None, "changes": ["hash", "duration", "size", "name"], }, ) else: # New file - neither hash nor filename exists sound_data = { "type": sound_type, "name": name, "filename": filename, "duration": duration, "size": size, "hash": file_hash, "is_deletable": False, "is_music": False, "is_normalized": False, "play_count": 0, } sound = await self.sound_repo.create(sound_data) logger.info("Added new sound: %s (ID: %s)", sound.name, sound.id) results["added"] += 1 results["files"].append( { "filename": filename, "status": "added", "reason": None, "name": name, "duration": duration, "size": size, "id": sound.id, "error": None, "changes": None, }, ) async def scan_soundboard_directory(self) -> ScanResults: """Sync the default soundboard directory.""" soundboard_path = "sounds/originals/soundboard" return await self.scan_directory(soundboard_path, "SDB")