"""Sound file scanning service for discovering and importing audio files.""" import hashlib import logging from pathlib import Path from pydub import AudioSegment from pydub.utils import mediainfo from app.database import db from app.models.sound import Sound logger = logging.getLogger(__name__) class SoundScannerService: """Service for scanning and importing sound files.""" # Supported audio file extensions SUPPORTED_EXTENSIONS = {".mp3", ".wav", ".ogg", ".flac", ".m4a", ".aac"} # Default soundboard directory DEFAULT_SOUNDBOARD_DIR = "sounds/soundboard" @staticmethod def scan_soundboard_directory( directory: str | None = None, ) -> dict: """Scan the soundboard directory and add new files to the database. Args: directory: Directory to scan (defaults to sounds/soundboard) Returns: dict: Summary of the scan operation """ scan_dir = directory or SoundScannerService.DEFAULT_SOUNDBOARD_DIR try: # Ensure directory exists scan_path = Path(scan_dir) if not scan_path.exists(): logger.warning( f"Soundboard directory does not exist: {scan_dir}", ) return { "success": False, "error": f"Directory not found: {scan_dir}", "files_found": 0, "files_added": 0, "files_skipped": 0, } logger.info(f"Starting soundboard scan in: {scan_dir}") files_found = 0 files_added = 0 files_skipped = 0 errors = [] # Walk through directory and subdirectories for file_path in scan_path.rglob("*"): if file_path.is_file(): filename = file_path.name # Check if file has supported extension if not SoundScannerService._is_supported_audio_file( filename, ): continue files_found += 1 try: # Process the audio file result = SoundScannerService._process_audio_file( str(file_path), scan_dir, ) if result["added"]: files_added += 1 logger.debug(f"Added sound: {filename}") elif result.get("updated"): files_added += ( 1 # Count updates as additions for reporting ) logger.debug(f"Updated sound: {filename}") else: files_skipped += 1 logger.debug( f"Skipped sound: {filename} ({result['reason']})", ) except Exception as e: error_msg = f"Error processing {filename}: {e!s}" logger.error(error_msg) errors.append(error_msg) files_skipped += 1 # Commit all changes db.session.commit() logger.info( f"Soundboard scan completed: {files_found} files found, " f"{files_added} added, {files_skipped} skipped", ) return { "success": True, "directory": scan_dir, "files_found": files_found, "files_added": files_added, "files_skipped": files_skipped, "errors": errors, "message": f"Scan completed: {files_added} new sounds added", } except Exception as e: db.session.rollback() logger.error(f"Error during soundboard scan: {e!s}") return { "success": False, "error": str(e), "files_found": 0, "files_added": 0, "files_skipped": 0, "message": "Soundboard scan failed", } @staticmethod def _is_supported_audio_file(filename: str) -> bool: """Check if file has a supported audio extension.""" return ( Path(filename).suffix.lower() in SoundScannerService.SUPPORTED_EXTENSIONS ) @staticmethod def _process_audio_file(file_path: str, base_dir: str) -> dict: """Process a single audio file and add it to database if new. Args: file_path: Full path to the audio file base_dir: Base directory for relative path calculation Returns: dict: Processing result with added flag and reason """ # Calculate file hash for deduplication file_hash = SoundScannerService._calculate_file_hash(file_path) # Get file metadata metadata = SoundScannerService._extract_audio_metadata(file_path) # Calculate relative filename from base directory relative_path = Path(file_path).relative_to(Path(base_dir)) # Check if file already exists in database by hash existing_sound = Sound.find_by_hash(file_hash) if existing_sound: return { "added": False, "reason": f"File already exists as '{existing_sound.name}'", } # Check if filename already exists in database existing_filename_sound = Sound.find_by_filename(str(relative_path)) if existing_filename_sound: # Remove normalized files and clear normalized info SoundScannerService._clear_normalized_files(existing_filename_sound) existing_filename_sound.clear_normalized_info() # Update existing sound with new file information existing_filename_sound.update_file_info( filename=str(relative_path), duration=metadata["duration"], size=metadata["size"], hash_value=file_hash, ) return { "added": False, "updated": True, "sound_id": existing_filename_sound.id, "reason": f"Updated existing sound '{existing_filename_sound.name}' with new file data", } # Generate sound name from filename (without extension) sound_name = Path(file_path).stem # Check if name already exists and make it unique if needed counter = 1 original_name = sound_name while Sound.find_by_name(sound_name): sound_name = f"{original_name}_{counter}" counter += 1 # Create new sound record sound = Sound.create_sound( sound_type="SDB", # Soundboard type name=sound_name, filename=str(relative_path), duration=metadata["duration"], size=metadata["size"], hash_value=file_hash, is_music=False, is_deletable=False, commit=False, # Don't commit individually, let scanner handle transaction ) return { "added": True, "sound_id": sound.id, "reason": "New file added successfully", } @staticmethod def _calculate_file_hash(file_path: str) -> str: """Calculate SHA256 hash of file contents.""" sha256_hash = hashlib.sha256() with Path(file_path).open("rb") as f: # Read file in chunks to handle large files for chunk in iter(lambda: f.read(4096), b""): sha256_hash.update(chunk) return sha256_hash.hexdigest() @staticmethod def _clear_normalized_files(sound: Sound) -> None: """Remove normalized files for a sound if they exist.""" if sound.is_normalized and sound.normalized_filename: # Import here to avoid circular imports from app.services.sound_normalizer_service import ( SoundNormalizerService, ) normalized_path = ( Path(SoundNormalizerService.NORMALIZED_DIR) / sound.normalized_filename ) if normalized_path.exists(): try: normalized_path.unlink() logger.info(f"Removed normalized file: {normalized_path}") except Exception as e: logger.warning( f"Could not remove normalized file {normalized_path}: {e}" ) @staticmethod def _extract_audio_metadata(file_path: str) -> dict: """Extract metadata from audio file using pydub and mediainfo.""" try: # Get file size file_size = Path(file_path).stat().st_size # Load audio file with pydub for basic info audio = AudioSegment.from_file(file_path) # Extract basic metadata from AudioSegment duration = len(audio) channels = audio.channels sample_rate = audio.frame_rate # Use mediainfo for more accurate bitrate information bitrate = None try: info = mediainfo(file_path) if info and "bit_rate" in info: bitrate = int(info["bit_rate"]) elif info and "bitrate" in info: bitrate = int(info["bitrate"]) except (ValueError, KeyError, TypeError): # Fallback to calculated bitrate if mediainfo fails if duration > 0: file_size_bits = file_size * 8 bitrate = int(file_size_bits / duration / 1000) return { "duration": duration, "size": file_size, "bitrate": bitrate, "channels": channels, "sample_rate": sample_rate, } except Exception as e: logger.warning(f"Could not extract metadata from {file_path}: {e}") return { "duration": 0, "size": Path(file_path).stat().st_size, "bitrate": None, "channels": None, "sample_rate": None, } @staticmethod def get_scan_statistics() -> dict: """Get statistics about sounds in the database.""" total_sounds = Sound.query.count() sdb_sounds = Sound.query.filter_by(type="SDB").count() music_sounds = Sound.query.filter_by(is_music=True).count() # Calculate total size and duration sounds = Sound.query.all() total_size = sum(sound.size for sound in sounds) total_duration = sum(sound.duration for sound in sounds) total_plays = sum(sound.play_count for sound in sounds) return { "total_sounds": total_sounds, "soundboard_sounds": sdb_sounds, "music_sounds": music_sounds, "total_size_bytes": total_size, "total_duration": total_duration, "total_plays": total_plays, "most_played": [ sound.to_dict() for sound in Sound.get_most_played(5) ], }