"""Sound scanner service for scanning and importing audio files.""" from pathlib import Path from typing import TypedDict from sqlmodel.ext.asyncio.session import AsyncSession from app.core.logging import get_logger from app.models.sound import Sound from app.repositories.sound import SoundRepository from app.utils.audio import get_audio_duration, get_file_hash, get_file_size logger = get_logger(__name__) class FileInfo(TypedDict): """Type definition for file information in scan results.""" filename: str status: str reason: str | None name: str | None duration: int | None size: int | None id: int | None error: str | None changes: list[str] | None class ScanResults(TypedDict): """Type definition for scan results.""" scanned: int added: int updated: int deleted: int skipped: int duplicates: int errors: int files: list[FileInfo] class SoundScannerService: """Service for scanning and importing audio files.""" def __init__(self, session: AsyncSession) -> None: """Initialize the sound scanner service.""" self.session = session self.sound_repo = SoundRepository(session) self.supported_extensions = { ".mp3", ".wav", ".opus", ".flac", ".ogg", ".m4a", ".aac", } # Directory mappings for normalized files (matching sound_normalizer) self.normalized_directories = { "SDB": "sounds/normalized/soundboard", "TTS": "sounds/normalized/text_to_speech", "EXT": "sounds/normalized/extracted", } def extract_name_from_filename(self, filename: str) -> str: """Extract a clean name from filename.""" # Remove extension name = Path(filename).stem # Replace underscores and hyphens with spaces name = name.replace("_", " ").replace("-", " ") # Capitalize words return " ".join(word.capitalize() for word in name.split()) def _get_normalized_path(self, sound_type: str, filename: str) -> Path: """Get the normalized file path for a sound.""" directory = self.normalized_directories.get(sound_type, "sounds/normalized/other") return Path(directory) / filename def _rename_normalized_file(self, sound_type: str, old_filename: str, new_filename: str) -> bool: """Rename a normalized file if it exists. Returns True if renamed, False if not found.""" old_path = self._get_normalized_path(sound_type, old_filename) new_path = self._get_normalized_path(sound_type, new_filename) if old_path.exists(): try: # Ensure the directory exists new_path.parent.mkdir(parents=True, exist_ok=True) old_path.rename(new_path) logger.info("Renamed normalized file: %s -> %s", old_path, new_path) return True except Exception as e: logger.error("Failed to rename normalized file %s -> %s: %s", old_path, new_path, e) return False return False def _delete_normalized_file(self, sound_type: str, filename: str) -> bool: """Delete a normalized file if it exists. Returns True if deleted, False if not found.""" normalized_path = self._get_normalized_path(sound_type, filename) if normalized_path.exists(): try: normalized_path.unlink() logger.info("Deleted normalized file: %s", normalized_path) return True except Exception as e: logger.error("Failed to delete normalized file %s: %s", normalized_path, e) return False return False async def scan_directory( self, directory_path: str, sound_type: str = "SDB", ) -> ScanResults: """Sync a directory with the database (add/update/delete sounds).""" scan_path = Path(directory_path) if not scan_path.exists(): msg = f"Directory does not exist: {directory_path}" raise ValueError(msg) if not scan_path.is_dir(): msg = f"Path is not a directory: {directory_path}" raise ValueError(msg) results: ScanResults = { "scanned": 0, "added": 0, "updated": 0, "deleted": 0, "skipped": 0, "duplicates": 0, "errors": 0, "files": [], } logger.info("Starting sync of directory: %s", directory_path) # Get all existing sounds of this type from database existing_sounds = await self.sound_repo.get_by_type(sound_type) # Create lookup dictionaries with immediate attribute access # to avoid session detachment sounds_by_hash = {} sounds_by_filename = {} for sound in existing_sounds: # Capture all attributes immediately while session is valid sound_data = { "id": sound.id, "hash": sound.hash, "filename": sound.filename, "name": sound.name, "duration": sound.duration, "size": sound.size, "type": sound.type, "is_normalized": sound.is_normalized, "normalized_filename": sound.normalized_filename, "sound_object": sound, # Keep reference for database operations } sounds_by_hash[sound.hash] = sound_data sounds_by_filename[sound.filename] = sound_data # Get all audio files from directory audio_files = [ f for f in scan_path.iterdir() if f.is_file() and f.suffix.lower() in self.supported_extensions ] # Process each file in directory processed_filenames = set() for file_path in audio_files: results["scanned"] += 1 filename = file_path.name processed_filenames.add(filename) try: # Calculate hash first to enable hash-based lookup file_hash = get_file_hash(file_path) existing_sound_by_hash = sounds_by_hash.get(file_hash) existing_sound_by_filename = sounds_by_filename.get(filename) await self._sync_audio_file( file_path, sound_type, existing_sound_by_hash, existing_sound_by_filename, file_hash, results, ) # Check if this was a rename operation and mark old filename as processed if results["files"] and results["files"][-1].get("old_filename"): old_filename = results["files"][-1]["old_filename"] processed_filenames.add(old_filename) logger.debug("Marked old filename as processed: %s", old_filename) # Remove temporary tracking field from results del results["files"][-1]["old_filename"] except Exception as e: logger.exception("Error processing file %s", file_path) results["errors"] += 1 results["files"].append( { "filename": filename, "status": "error", "reason": None, "name": None, "duration": None, "size": None, "id": None, "error": str(e), "changes": None, }, ) # Delete sounds that no longer exist in directory for filename, sound_data in sounds_by_filename.items(): if filename not in processed_filenames: # Attributes already captured in sound_data dictionary sound_name = sound_data["name"] sound_duration = sound_data["duration"] sound_size = sound_data["size"] sound_id = sound_data["id"] sound_object = sound_data["sound_object"] sound_type = sound_data["type"] sound_is_normalized = sound_data["is_normalized"] sound_normalized_filename = sound_data["normalized_filename"] try: # Delete the sound from database first await self.sound_repo.delete(sound_object) logger.info("Deleted sound no longer in directory: %s", filename) # If the sound had a normalized file, delete it too if sound_is_normalized and sound_normalized_filename: normalized_base = Path(sound_normalized_filename).name self._delete_normalized_file(sound_type, normalized_base) results["deleted"] += 1 results["files"].append( { "filename": filename, "status": "deleted", "reason": "file no longer exists", "name": sound_name, "duration": sound_duration, "size": sound_size, "id": sound_id, "error": None, "changes": None, }, ) except Exception as e: logger.exception("Error deleting sound %s", filename) results["errors"] += 1 results["files"].append( { "filename": filename, "status": "error", "reason": "failed to delete", "name": sound_name, "duration": sound_duration, "size": sound_size, "id": sound_id, "error": str(e), "changes": None, }, ) logger.info("Sync completed: %s", results) return results async def _sync_audio_file( self, file_path: Path, sound_type: str, existing_sound_by_hash: dict | Sound | None, existing_sound_by_filename: dict | Sound | None, file_hash: str, results: ScanResults, ) -> None: """Sync a single audio file using hash-first identification strategy.""" filename = file_path.name duration = get_audio_duration(file_path) size = get_file_size(file_path) name = self.extract_name_from_filename(filename) # Extract attributes - handle both dict (normal) and Sound object (tests) existing_hash_filename = None existing_hash_name = None existing_hash_duration = None existing_hash_size = None existing_hash_id = None existing_hash_object = None existing_hash_type = None existing_hash_is_normalized = None existing_hash_normalized_filename = None if existing_sound_by_hash is not None: if isinstance(existing_sound_by_hash, dict): existing_hash_filename = existing_sound_by_hash["filename"] existing_hash_name = existing_sound_by_hash["name"] existing_hash_duration = existing_sound_by_hash["duration"] existing_hash_size = existing_sound_by_hash["size"] existing_hash_id = existing_sound_by_hash["id"] existing_hash_object = existing_sound_by_hash["sound_object"] existing_hash_type = existing_sound_by_hash["type"] existing_hash_is_normalized = existing_sound_by_hash["is_normalized"] existing_hash_normalized_filename = existing_sound_by_hash["normalized_filename"] else: # Sound object (for tests) existing_hash_filename = existing_sound_by_hash.filename existing_hash_name = existing_sound_by_hash.name existing_hash_duration = existing_sound_by_hash.duration existing_hash_size = existing_sound_by_hash.size existing_hash_id = existing_sound_by_hash.id existing_hash_object = existing_sound_by_hash existing_hash_type = existing_sound_by_hash.type existing_hash_is_normalized = existing_sound_by_hash.is_normalized existing_hash_normalized_filename = existing_sound_by_hash.normalized_filename existing_filename_id = None existing_filename_object = None if existing_sound_by_filename is not None: if isinstance(existing_sound_by_filename, dict): existing_filename_id = existing_sound_by_filename["id"] existing_filename_object = existing_sound_by_filename["sound_object"] else: # Sound object (for tests) existing_filename_id = existing_sound_by_filename.id existing_filename_object = existing_sound_by_filename # Hash-first identification strategy if existing_sound_by_hash is not None: # Content exists in database (same hash) if existing_hash_filename == filename: # Same hash, same filename - file unchanged logger.debug("Sound unchanged: %s", filename) results["skipped"] += 1 results["files"].append( { "filename": filename, "status": "skipped", "reason": "file unchanged", "name": existing_hash_name, "duration": existing_hash_duration, "size": existing_hash_size, "id": existing_hash_id, "error": None, "changes": None, }, ) else: # Same hash, different filename - could be rename or duplicate # Check if both files exist to determine if it's a duplicate old_file_path = file_path.parent / existing_hash_filename if old_file_path.exists(): # Both files exist with same hash - this is a duplicate logger.warning( "Duplicate file detected: '%s' has same content as existing '%s' (hash: %s). " "Skipping duplicate file.", filename, existing_hash_filename, file_hash[:8] + "...", ) results["skipped"] += 1 results["duplicates"] += 1 results["files"].append( { "filename": filename, "status": "skipped", "reason": "duplicate content", "name": existing_hash_name, "duration": existing_hash_duration, "size": existing_hash_size, "id": existing_hash_id, "error": None, "changes": None, }, ) else: # Old file doesn't exist - this is a genuine rename update_data = { "filename": filename, "name": name, } # If the sound has a normalized file, rename it too if existing_hash_is_normalized and existing_hash_normalized_filename: # Extract base filename without path for normalized file old_normalized_base = Path(existing_hash_normalized_filename).name new_normalized_base = Path(filename).stem + Path(existing_hash_normalized_filename).suffix renamed = self._rename_normalized_file( existing_hash_type, old_normalized_base, new_normalized_base ) if renamed: update_data["normalized_filename"] = new_normalized_base logger.info( "Renamed normalized file: %s -> %s", old_normalized_base, new_normalized_base ) await self.sound_repo.update(existing_hash_object, update_data) logger.info( "Detected rename: %s -> %s (ID: %s)", existing_hash_filename, filename, existing_hash_id, ) # Build changes list changes = ["filename", "name"] if "normalized_filename" in update_data: changes.append("normalized_filename") results["updated"] += 1 results["files"].append( { "filename": filename, "status": "updated", "reason": "file was renamed", "name": name, "duration": existing_hash_duration, "size": existing_hash_size, "id": existing_hash_id, "error": None, "changes": changes, # Store old filename to prevent deletion "old_filename": existing_hash_filename, }, ) elif existing_sound_by_filename is not None: # Same filename but different hash - file was modified update_data = { "name": name, "duration": duration, "size": size, "hash": file_hash, } await self.sound_repo.update(existing_filename_object, update_data) logger.info( "Updated modified sound: %s (ID: %s)", name, existing_filename_id, ) results["updated"] += 1 results["files"].append( { "filename": filename, "status": "updated", "reason": "file was modified", "name": name, "duration": duration, "size": size, "id": existing_filename_id, "error": None, "changes": ["hash", "duration", "size", "name"], }, ) else: # New file - neither hash nor filename exists sound_data = { "type": sound_type, "name": name, "filename": filename, "duration": duration, "size": size, "hash": file_hash, "is_deletable": False, "is_music": False, "is_normalized": False, "play_count": 0, } sound = await self.sound_repo.create(sound_data) logger.info("Added new sound: %s (ID: %s)", sound.name, sound.id) results["added"] += 1 results["files"].append( { "filename": filename, "status": "added", "reason": None, "name": name, "duration": duration, "size": size, "id": sound.id, "error": None, "changes": None, }, ) async def scan_soundboard_directory(self) -> ScanResults: """Sync the default soundboard directory.""" soundboard_path = "sounds/originals/soundboard" return await self.scan_directory(soundboard_path, "SDB")