"""Sound normalization service using ffmpeg loudnorm filter.""" import hashlib import logging from pathlib import Path import ffmpeg from pydub import AudioSegment from app.database import db from app.models.sound import Sound logger = logging.getLogger(__name__) class SoundNormalizerService: """Service for normalizing sound files using ffmpeg loudnorm.""" SUPPORTED_EXTENSIONS = { ".mp3", ".wav", ".ogg", ".flac", ".m4a", ".aac", ".opus", } SOUNDS_DIR = "sounds/soundboard" NORMALIZED_DIR = "sounds/normalized/soundboard" LOUDNORM_PARAMS = { "integrated": -16, "true_peak": -1.5, "lra": 11.0, "print_format": "summary", } @staticmethod def normalize_sound(sound_id: int, overwrite: bool = False) -> dict: """Normalize a specific sound file using ffmpeg loudnorm. Args: sound_id: ID of the sound to normalize overwrite: Whether to overwrite existing normalized file Returns: dict: Result of the normalization operation """ try: sound = Sound.query.get(sound_id) if not sound: return { "success": False, "error": f"Sound with ID {sound_id} not found", } source_path = Path(SoundNormalizerService.SOUNDS_DIR) / sound.filename if not source_path.exists(): return { "success": False, "error": f"Source file not found: {source_path}", } # Always output as WAV regardless of input format filename_without_ext = Path(sound.filename).stem normalized_filename = f"{filename_without_ext}.wav" normalized_path = Path(SoundNormalizerService.NORMALIZED_DIR) / normalized_filename normalized_path.parent.mkdir(parents=True, exist_ok=True) if normalized_path.exists() and not overwrite: return { "success": False, "error": f"Normalized file already exists: {normalized_path}. Use overwrite=True to replace it.", } logger.info( f"Starting normalization of {sound.name} ({sound.filename})", ) result = SoundNormalizerService._normalize_with_ffmpeg( str(source_path), str(normalized_path), ) if result["success"]: # Calculate normalized file metadata normalized_metadata = ( SoundNormalizerService._get_normalized_metadata( str(normalized_path), ) ) # Update sound record with normalized information sound.set_normalized_info( normalized_filename=normalized_filename, normalized_duration=normalized_metadata["duration"], normalized_size=normalized_metadata["size"], normalized_hash=normalized_metadata["hash"], ) # Commit the database changes db.session.commit() logger.info(f"Successfully normalized {sound.name}") return { "success": True, "sound_id": sound_id, "sound_name": sound.name, "source_path": str(source_path), "normalized_path": str(normalized_path), "normalized_filename": normalized_filename, "normalized_duration": normalized_metadata["duration"], "normalized_size": normalized_metadata["size"], "normalized_hash": normalized_metadata["hash"], "loudnorm_stats": result.get("stats", {}), } return result except Exception as e: logger.error(f"Error normalizing sound {sound_id}: {e}") return {"success": False, "error": str(e)} @staticmethod def normalize_all_sounds( overwrite: bool = False, limit: int = None, ) -> dict: """Normalize all soundboard files. Args: overwrite: Whether to overwrite existing normalized files limit: Maximum number of files to process (None for all) Returns: dict: Summary of the normalization operation """ try: query = Sound.query.filter_by(type="SDB") if limit: query = query.limit(limit) sounds = query.all() if not sounds: return { "success": True, "message": "No soundboard files found to normalize", "processed": 0, "successful": 0, "failed": 0, "skipped": 0, } logger.info(f"Starting bulk normalization of {len(sounds)} sounds") processed = 0 successful = 0 failed = 0 skipped = 0 errors = [] for sound in sounds: result = SoundNormalizerService.normalize_sound( sound.id, overwrite, ) processed += 1 if result["success"]: successful += 1 elif "already exists" in result.get("error", ""): skipped += 1 else: failed += 1 errors.append(f"{sound.name}: {result['error']}") logger.info( f"Bulk normalization completed: {successful} successful, {failed} failed, {skipped} skipped", ) return { "success": True, "message": f"Processed {processed} sounds: {successful} successful, {failed} failed, {skipped} skipped", "processed": processed, "successful": successful, "failed": failed, "skipped": skipped, "errors": errors, } except Exception as e: logger.error(f"Error during bulk normalization: {e}") return { "success": False, "error": str(e), "processed": 0, "successful": 0, "failed": 0, "skipped": 0, } @staticmethod def _normalize_with_ffmpeg(source_path: str, output_path: str) -> dict: """Run ffmpeg loudnorm on a single file using python-ffmpeg. Args: source_path: Path to source audio file output_path: Path for normalized output file (will be WAV format) Returns: dict: Result with success status and loudnorm statistics """ try: params = SoundNormalizerService.LOUDNORM_PARAMS logger.debug( f"Running ffmpeg normalization: {source_path} -> {output_path}", ) # Create ffmpeg input stream input_stream = ffmpeg.input(source_path) # Apply loudnorm filter loudnorm_filter = f"loudnorm=I={params['integrated']}:TP={params['true_peak']}:LRA={params['lra']}:print_format={params['print_format']}" # Create output stream with WAV format output_stream = ffmpeg.output( input_stream, output_path, acodec="pcm_s16le", # 16-bit PCM for WAV ar=44100, # 44.1kHz sample rate af=loudnorm_filter, y=None, # Overwrite output file ) # Run the ffmpeg process out, err = ffmpeg.run( output_stream, capture_stdout=True, capture_stderr=True, ) # Parse loudnorm statistics from stderr stats = SoundNormalizerService._parse_loudnorm_stats( err.decode() if err else "", ) if not Path(output_path).exists(): return { "success": False, "error": "Output file was not created", } return {"success": True, "stats": stats} except ffmpeg.Error as e: error_msg = ( f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}" ) logger.error(error_msg) return {"success": False, "error": error_msg} except Exception as e: logger.error(f"Error running ffmpeg: {e}") return {"success": False, "error": str(e)} @staticmethod def _parse_loudnorm_stats(stderr_output: str) -> dict: """Parse loudnorm statistics from ffmpeg stderr output. Args: stderr_output: ffmpeg stderr output containing loudnorm stats Returns: dict: Parsed loudnorm statistics """ stats = {} if not stderr_output: return stats lines = stderr_output.split("\n") for line in lines: line = line.strip() if "Input Integrated:" in line: try: stats["input_integrated"] = float(line.split()[-2]) except (ValueError, IndexError): pass elif "Input True Peak:" in line: try: stats["input_true_peak"] = float(line.split()[-2]) except (ValueError, IndexError): pass elif "Input LRA:" in line: try: stats["input_lra"] = float(line.split()[-1]) except (ValueError, IndexError): pass elif "Output Integrated:" in line: try: stats["output_integrated"] = float(line.split()[-2]) except (ValueError, IndexError): pass elif "Output True Peak:" in line: try: stats["output_true_peak"] = float(line.split()[-2]) except (ValueError, IndexError): pass elif "Output LRA:" in line: try: stats["output_lra"] = float(line.split()[-1]) except (ValueError, IndexError): pass return stats @staticmethod def _get_normalized_metadata(file_path: str) -> dict: """Calculate metadata for normalized file. Args: file_path: Path to the normalized audio file Returns: dict: Metadata including duration and hash """ try: # Get file size file_size = Path(file_path).stat().st_size # Calculate file hash file_hash = SoundNormalizerService._calculate_file_hash(file_path) # Get duration using pydub audio = AudioSegment.from_wav(file_path) duration = len(audio) # Duration in milliseconds return { "duration": duration, "size": file_size, "hash": file_hash, } except Exception as e: logger.error(f"Error calculating metadata for {file_path}: {e}") return { "duration": 0, "size": Path(file_path).stat().st_size, "hash": "", } @staticmethod def _calculate_file_hash(file_path: str) -> str: """Calculate SHA256 hash of file contents.""" sha256_hash = hashlib.sha256() with Path(file_path).open("rb") as f: # Read file in chunks to handle large files for chunk in iter(lambda: f.read(4096), b""): sha256_hash.update(chunk) return sha256_hash.hexdigest() @staticmethod def get_normalization_status() -> dict: """Get statistics about normalized vs original files. Returns: dict: Statistics about normalization status """ try: total_sounds = Sound.query.filter_by(type="SDB").count() normalized_count = 0 total_original_size = 0 total_normalized_size = 0 sounds = Sound.query.filter_by(type="SDB").all() for sound in sounds: original_path = Path(SoundNormalizerService.SOUNDS_DIR) / sound.filename if original_path.exists(): total_original_size += original_path.stat().st_size # Use database field to check if normalized, not file existence if sound.is_normalized and sound.normalized_filename: normalized_count += 1 normalized_path = Path(SoundNormalizerService.NORMALIZED_DIR) / sound.normalized_filename if normalized_path.exists(): total_normalized_size += normalized_path.stat().st_size return { "total_sounds": total_sounds, "normalized_count": normalized_count, "normalization_percentage": ( (normalized_count / total_sounds * 100) if total_sounds > 0 else 0 ), "total_original_size": total_original_size, "total_normalized_size": total_normalized_size, "size_difference": ( total_normalized_size - total_original_size if normalized_count > 0 else 0 ), } except Exception as e: logger.error(f"Error getting normalization status: {e}") return { "error": str(e), "total_sounds": 0, "normalized_count": 0, "normalization_percentage": 0, } @staticmethod def check_ffmpeg_availability() -> dict: """Check if ffmpeg is available and supports loudnorm filter. Returns: dict: Information about ffmpeg availability and capabilities """ try: # Create a minimal test audio file to check ffmpeg import tempfile with tempfile.NamedTemporaryFile( suffix=".wav", delete=False, ) as temp_file: temp_path = temp_file.name try: # Try a simple ffmpeg operation to check availability test_input = ffmpeg.input( "anullsrc=channel_layout=stereo:sample_rate=44100", f="lavfi", t=0.1, ) test_output = ffmpeg.output(test_input, temp_path) ffmpeg.run( test_output, capture_stdout=True, capture_stderr=True, quiet=True, ) # If we get here, basic ffmpeg is working # Now test loudnorm filter try: norm_input = ffmpeg.input(temp_path) norm_output = ffmpeg.output( norm_input, "/dev/null", af="loudnorm=I=-16:TP=-1.5:LRA=11.0", f="null", ) ffmpeg.run( norm_output, capture_stdout=True, capture_stderr=True, quiet=True, ) has_loudnorm = True except ffmpeg.Error: has_loudnorm = False return { "available": True, "version": "ffmpeg-python wrapper available", "has_loudnorm": has_loudnorm, "ready": has_loudnorm, } finally: # Clean up temp file temp_file_path = Path(temp_path) if temp_file_path.exists(): temp_file_path.unlink() except Exception as e: return { "available": False, "error": f"ffmpeg not available via python-ffmpeg: {e!s}", }