"""Sound file scanning service for discovering and importing audio files.""" import hashlib import logging from pathlib import Path import ffmpeg from app.database import db from app.models.sound import Sound logger = logging.getLogger(__name__) class SoundScannerService: """Service for scanning and importing sound files.""" # Supported audio file extensions SUPPORTED_EXTENSIONS = {".mp3", ".wav", ".ogg", ".flac", ".m4a", ".aac"} # Default soundboard directory DEFAULT_SOUNDBOARD_DIR = "sounds/soundboard" @staticmethod def scan_soundboard_directory( directory: str | None = None, ) -> dict: """Scan the soundboard directory and add new files to the database. Args: directory: Directory to scan (defaults to sounds/soundboard) Returns: dict: Summary of the scan operation """ scan_dir = directory or SoundScannerService.DEFAULT_SOUNDBOARD_DIR try: # Ensure directory exists scan_path = Path(scan_dir) if not scan_path.exists(): logger.warning( f"Soundboard directory does not exist: {scan_dir}", ) return { "success": False, "error": f"Directory not found: {scan_dir}", "files_found": 0, "files_added": 0, "files_skipped": 0, } logger.info(f"Starting soundboard scan in: {scan_dir}") files_found = 0 files_added = 0 files_skipped = 0 errors = [] # Walk through directory and subdirectories for file_path in scan_path.rglob("*"): if file_path.is_file(): filename = file_path.name # Check if file has supported extension if not SoundScannerService._is_supported_audio_file( filename, ): continue files_found += 1 try: # Process the audio file result = SoundScannerService._process_audio_file( str(file_path), scan_dir, ) if result["added"]: files_added += 1 logger.debug(f"Added sound: {filename}") elif result.get("updated"): files_added += ( 1 # Count updates as additions for reporting ) logger.debug(f"Updated sound: {filename}") else: files_skipped += 1 logger.debug( f"Skipped sound: {filename} ({result['reason']})", ) except Exception as e: error_msg = f"Error processing {filename}: {e!s}" logger.error(error_msg) errors.append(error_msg) files_skipped += 1 # Commit all changes db.session.commit() logger.info( f"Soundboard scan completed: {files_found} files found, " f"{files_added} added, {files_skipped} skipped", ) return { "success": True, "directory": scan_dir, "files_found": files_found, "files_added": files_added, "files_skipped": files_skipped, "errors": errors, "message": f"Scan completed: {files_added} new sounds added", } except Exception as e: db.session.rollback() logger.error(f"Error during soundboard scan: {e!s}") return { "success": False, "error": str(e), "files_found": 0, "files_added": 0, "files_skipped": 0, "message": "Soundboard scan failed", } @staticmethod def _is_supported_audio_file(filename: str) -> bool: """Check if file has a supported audio extension.""" return ( Path(filename).suffix.lower() in SoundScannerService.SUPPORTED_EXTENSIONS ) @staticmethod def _process_audio_file(file_path: str, base_dir: str) -> dict: """Process a single audio file and add it to database if new.""" file_hash = SoundScannerService._calculate_file_hash(file_path) metadata = SoundScannerService._extract_audio_metadata(file_path) relative_path = Path(file_path).relative_to(Path(base_dir)) # Check for existing file by hash (duplicate content) if existing_sound := Sound.find_by_hash(file_hash): return SoundScannerService._handle_duplicate_file(existing_sound) # Check for existing filename (file replacement) if existing_filename_sound := Sound.find_by_filename( str(relative_path) ): return SoundScannerService._handle_file_replacement( existing_filename_sound, str(relative_path), metadata, file_hash, ) # Create new sound record return SoundScannerService._create_new_sound( file_path, str(relative_path), metadata, file_hash, ) @staticmethod def _handle_duplicate_file(existing_sound: Sound) -> dict: """Handle case where file content already exists in database.""" return { "added": False, "reason": f"File already exists as '{existing_sound.name}'", } @staticmethod def _handle_file_replacement( existing_sound: Sound, relative_path: str, metadata: dict, file_hash: str, ) -> dict: """Handle case where filename exists but content may be different.""" # Remove normalized files and clear normalized info SoundScannerService._clear_normalized_files(existing_sound) existing_sound.clear_normalized_info() # Update existing sound with new file information existing_sound.update_file_info( filename=relative_path, duration=metadata["duration"], size=metadata["size"], hash_value=file_hash, ) return { "added": False, "updated": True, "sound_id": existing_sound.id, "reason": f"Updated existing sound '{existing_sound.name}' with new file data", } @staticmethod def _create_new_sound( file_path: str, relative_path: str, metadata: dict, file_hash: str, ) -> dict: """Create a new sound record in the database.""" sound_name = SoundScannerService._generate_unique_sound_name( Path(file_path).stem, ) sound = Sound.create_sound( sound_type="SDB", name=sound_name, filename=relative_path, duration=metadata["duration"], size=metadata["size"], hash_value=file_hash, is_music=False, is_deletable=False, commit=False, ) return { "added": True, "sound_id": sound.id, "reason": "New file added successfully", } @staticmethod def _generate_unique_sound_name(base_name: str) -> str: """Generate a unique sound name by appending numbers if needed.""" sound_name = base_name counter = 1 while Sound.find_by_name(sound_name): sound_name = f"{base_name}_{counter}" counter += 1 return sound_name @staticmethod def _calculate_file_hash(file_path: str) -> str: """Calculate SHA256 hash of file contents.""" sha256_hash = hashlib.sha256() with Path(file_path).open("rb") as f: # Read file in chunks to handle large files for chunk in iter(lambda: f.read(4096), b""): sha256_hash.update(chunk) return sha256_hash.hexdigest() @staticmethod def _clear_normalized_files(sound: Sound) -> None: """Remove normalized files for a sound if they exist.""" if sound.is_normalized and sound.normalized_filename: # Import here to avoid circular imports from app.services.sound_normalizer_service import ( SoundNormalizerService, ) normalized_path = ( Path(SoundNormalizerService.NORMALIZED_DIR) / sound.normalized_filename ) if normalized_path.exists(): try: normalized_path.unlink() logger.info(f"Removed normalized file: {normalized_path}") except Exception as e: logger.warning( f"Could not remove normalized file {normalized_path}: {e}", ) @staticmethod def _extract_audio_metadata(file_path: str) -> dict: """Extract metadata from audio file using ffmpeg-python.""" try: # Get file size file_size = Path(file_path).stat().st_size # Use ffmpeg to probe audio metadata probe = ffmpeg.probe(file_path) audio_stream = next( (s for s in probe['streams'] if s['codec_type'] == 'audio'), None ) if not audio_stream: raise ValueError("No audio stream found in file") # Extract metadata from ffmpeg probe duration = int(float(audio_stream.get('duration', 0)) * 1000) # Convert to milliseconds channels = int(audio_stream.get('channels', 0)) sample_rate = int(audio_stream.get('sample_rate', 0)) bitrate = int(audio_stream.get('bit_rate', 0)) if audio_stream.get('bit_rate') else None # Fallback bitrate calculation if not available if not bitrate and duration > 0: file_size_bits = file_size * 8 bitrate = int(file_size_bits / (duration / 1000)) return { "duration": duration, "size": file_size, "bitrate": bitrate, "channels": channels, "sample_rate": sample_rate, } except Exception as e: logger.warning(f"Could not extract metadata from {file_path}: {e}") return { "duration": 0, "size": Path(file_path).stat().st_size, "bitrate": None, "channels": None, "sample_rate": None, } @staticmethod def get_scan_statistics() -> dict: """Get statistics about sounds in the database.""" total_sounds = Sound.query.count() sdb_sounds = Sound.query.filter_by(type="SDB").count() music_sounds = Sound.query.filter_by(is_music=True).count() # Calculate total size and duration sounds = Sound.query.all() total_size = sum(sound.size for sound in sounds) total_duration = sum(sound.duration for sound in sounds) total_plays = sum(sound.play_count for sound in sounds) return { "total_sounds": total_sounds, "soundboard_sounds": sdb_sounds, "music_sounds": music_sounds, "total_size_bytes": total_size, "total_duration": total_duration, "total_plays": total_plays, "most_played": [ sound.to_dict() for sound in Sound.get_most_played(5) ], }