From 0fffce53b4815bffdbb5ed186856b2575a876e4a Mon Sep 17 00:00:00 2001 From: JSC Date: Mon, 28 Jul 2025 09:18:18 +0200 Subject: [PATCH] feat: Implement sound normalization service and API endpoints - Added SoundNormalizerService for normalizing audio files with support for one-pass and two-pass normalization methods. - Introduced API endpoints for normalizing all sounds and specific sounds by ID, including support for force normalization and handling of already normalized sounds. - Created comprehensive test suite for the sound normalizer service and its API endpoints, covering various scenarios including success, errors, and edge cases. - Refactored sound scanning service to utilize SHA-256 for file hashing instead of MD5 for improved security. - Enhanced logging and error handling throughout the sound normalization process. --- app/api/v1/__init__.py | 3 +- app/api/v1/sound_normalizer.py | 166 +++++ app/core/config.py | 5 + app/repositories/sound.py | 23 + app/services/sound_normalizer.py | 567 ++++++++++++++++ app/services/sound_scanner.py | 170 ++--- .../api/v1/test_sound_normalizer_endpoints.py | 613 ++++++++++++++++++ tests/services/test_sound_normalizer.py | 559 ++++++++++++++++ 8 files changed, 2031 insertions(+), 75 deletions(-) create mode 100644 app/api/v1/sound_normalizer.py create mode 100644 app/services/sound_normalizer.py create mode 100644 tests/api/v1/test_sound_normalizer_endpoints.py create mode 100644 tests/services/test_sound_normalizer.py diff --git a/app/api/v1/__init__.py b/app/api/v1/__init__.py index 3933b1d..8d2638e 100644 --- a/app/api/v1/__init__.py +++ b/app/api/v1/__init__.py @@ -2,7 +2,7 @@ from fastapi import APIRouter -from app.api.v1 import auth, main, socket, sounds +from app.api.v1 import auth, main, socket, sound_normalizer, sounds # V1 API router with v1 prefix api_router = APIRouter(prefix="/v1") @@ -12,3 +12,4 @@ api_router.include_router(main.router, tags=["main"]) api_router.include_router(auth.router, prefix="/auth", tags=["authentication"]) api_router.include_router(socket.router, tags=["socket"]) api_router.include_router(sounds.router, tags=["sounds"]) +api_router.include_router(sound_normalizer.router, tags=["sound-normalization"]) diff --git a/app/api/v1/sound_normalizer.py b/app/api/v1/sound_normalizer.py new file mode 100644 index 0000000..17f98bb --- /dev/null +++ b/app/api/v1/sound_normalizer.py @@ -0,0 +1,166 @@ +"""Sound normalization API endpoints.""" + +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.core.database import get_db +from app.core.dependencies import get_current_active_user_flexible +from app.models.user import User +from app.services.sound_normalizer import NormalizationResults, SoundNormalizerService + +router = APIRouter(prefix="/sounds/normalize", tags=["sound-normalization"]) + + +async def get_sound_normalizer_service( + session: Annotated[AsyncSession, Depends(get_db)], +) -> SoundNormalizerService: + """Get the sound normalizer service.""" + return SoundNormalizerService(session) + + +@router.post("/all") +async def normalize_all_sounds( + current_user: Annotated[User, Depends(get_current_active_user_flexible)], + normalizer_service: Annotated[ + SoundNormalizerService, Depends(get_sound_normalizer_service) + ], + force: bool = Query( + False, description="Force normalization of already normalized sounds" + ), + one_pass: bool | None = Query( + None, description="Use one-pass normalization (overrides config)" + ), +) -> dict[str, NormalizationResults | str]: + """Normalize all unnormalized sounds.""" + # Only allow admins to normalize sounds + if current_user.role not in ["admin", "superadmin"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only administrators can normalize sounds", + ) + + try: + results = await normalizer_service.normalize_all_sounds( + force=force, + one_pass=one_pass, + ) + return { + "message": "Sound normalization completed", + "results": results, + } + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to normalize sounds: {e!s}", + ) from e + + +@router.post("/type/{sound_type}") +async def normalize_sounds_by_type( + sound_type: str, + current_user: Annotated[User, Depends(get_current_active_user_flexible)], + normalizer_service: Annotated[ + SoundNormalizerService, Depends(get_sound_normalizer_service) + ], + force: bool = Query( + False, description="Force normalization of already normalized sounds" + ), + one_pass: bool | None = Query( + None, description="Use one-pass normalization (overrides config)" + ), +) -> dict[str, NormalizationResults | str]: + """Normalize all sounds of a specific type (SDB, TTS, EXT).""" + # Only allow admins to normalize sounds + if current_user.role not in ["admin", "superadmin"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only administrators can normalize sounds", + ) + + # Validate sound type + valid_types = ["SDB", "TTS", "EXT"] + if sound_type not in valid_types: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid sound type. Must be one of: {', '.join(valid_types)}", + ) + + try: + results = await normalizer_service.normalize_sounds_by_type( + sound_type=sound_type, + force=force, + one_pass=one_pass, + ) + return { + "message": f"Normalization of {sound_type} sounds completed", + "results": results, + } + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to normalize {sound_type} sounds: {e!s}", + ) from e + + +@router.post("/{sound_id}") +async def normalize_sound_by_id( + sound_id: int, + current_user: Annotated[User, Depends(get_current_active_user_flexible)], + normalizer_service: Annotated[ + SoundNormalizerService, Depends(get_sound_normalizer_service) + ], + force: bool = Query( + False, description="Force normalization of already normalized sound" + ), + one_pass: bool | None = Query( + None, description="Use one-pass normalization (overrides config)" + ), +) -> dict[str, str]: + """Normalize a specific sound by ID.""" + # Only allow admins to normalize sounds + if current_user.role not in ["admin", "superadmin"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only administrators can normalize sounds", + ) + + try: + # Get the sound + sound = await normalizer_service.sound_repo.get_by_id(sound_id) + if not sound: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Sound with ID {sound_id} not found", + ) + + # Normalize the sound + result = await normalizer_service.normalize_sound( + sound=sound, + force=force, + one_pass=one_pass, + ) + + # Check result status + if result["status"] == "error": + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to normalize sound: {result['error']}", + ) + + return { + "message": f"Sound normalization {result['status']}: {sound.filename}", + "status": result["status"], + "reason": result["reason"] or "", + "normalized_filename": result["normalized_filename"] or "", + } + + except HTTPException: + # Re-raise HTTPExceptions without wrapping them + raise + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to normalize sound: {e!s}", + ) from e diff --git a/app/core/config.py b/app/core/config.py index eebe918..648019a 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -47,5 +47,10 @@ class Settings(BaseSettings): GITHUB_CLIENT_ID: str = "" GITHUB_CLIENT_SECRET: str = "" + # Audio Normalization Configuration + NORMALIZED_AUDIO_FORMAT: str = "mp3" + NORMALIZED_AUDIO_BITRATE: str = "256k" + NORMALIZED_AUDIO_PASSES: int = 2 # 1 for one-pass, 2 for two-pass + settings = Settings() diff --git a/app/repositories/sound.py b/app/repositories/sound.py index 8cff109..bcf1aba 100644 --- a/app/repositories/sound.py +++ b/app/repositories/sound.py @@ -126,3 +126,26 @@ class SoundRepository: except Exception: logger.exception("Failed to get popular sounds") raise + + async def get_unnormalized_sounds(self) -> list[Sound]: + """Get all sounds that haven't been normalized yet.""" + try: + statement = select(Sound).where(Sound.is_normalized == False) # noqa: E712 + result = await self.session.exec(statement) + return list(result.all()) + except Exception: + logger.exception("Failed to get unnormalized sounds") + raise + + async def get_unnormalized_sounds_by_type(self, sound_type: str) -> list[Sound]: + """Get unnormalized sounds by type.""" + try: + statement = select(Sound).where( + Sound.type == sound_type, + Sound.is_normalized == False, # noqa: E712 + ) + result = await self.session.exec(statement) + return list(result.all()) + except Exception: + logger.exception("Failed to get unnormalized sounds by type: %s", sound_type) + raise diff --git a/app/services/sound_normalizer.py b/app/services/sound_normalizer.py new file mode 100644 index 0000000..45de40c --- /dev/null +++ b/app/services/sound_normalizer.py @@ -0,0 +1,567 @@ +"""Sound normalizer service for normalizing audio files using ffmpeg loudnorm.""" + +import hashlib +import json +import os +import re +from pathlib import Path +from typing import TypedDict + +import ffmpeg # type: ignore[import-untyped] +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.core.config import settings +from app.core.logging import get_logger +from app.models.sound import Sound +from app.repositories.sound import SoundRepository + +logger = get_logger(__name__) + + +class NormalizationInfo(TypedDict): + """Type definition for normalization information in results.""" + + filename: str + status: str + reason: str | None + original_path: str | None + normalized_path: str | None + normalized_filename: str | None + normalized_duration: int | None + normalized_size: int | None + normalized_hash: str | None + id: int | None + error: str | None + + +class NormalizationResults(TypedDict): + """Type definition for normalization results.""" + + processed: int + normalized: int + skipped: int + errors: int + files: list[NormalizationInfo] + + +class SoundNormalizerService: + """Service for normalizing audio files using ffmpeg loudnorm.""" + + def __init__(self, session: AsyncSession) -> None: + """Initialize the sound normalizer service.""" + self.session = session + self.sound_repo = SoundRepository(session) + + # Normalization settings from config + self.output_format = settings.NORMALIZED_AUDIO_FORMAT + self.output_bitrate = settings.NORMALIZED_AUDIO_BITRATE + self.passes = settings.NORMALIZED_AUDIO_PASSES + + # Directory mappings for different sound types + self.type_directories = { + "SDB": "sounds/normalized/soundboard", + "TTS": "sounds/normalized/text_to_speech", + "EXT": "sounds/normalized/extracted", + } + + # Ensure normalized directories exist + self._ensure_directories() + + def _ensure_directories(self) -> None: + """Ensure all normalized sound directories exist.""" + for directory in self.type_directories.values(): + Path(directory).mkdir(parents=True, exist_ok=True) + logger.debug("Ensured directory exists: %s", directory) + + def _get_normalized_path(self, sound: Sound) -> Path: + """Get the normalized file path for a sound.""" + return self._get_normalized_path_from_data(sound.type, sound.filename) + + def _get_normalized_path_from_data(self, sound_type: str, filename: str) -> Path: + """Get the normalized file path from sound data.""" + # Get the appropriate directory for the sound type + directory = self.type_directories.get(sound_type, "sounds/normalized/other") + + # Create the directory if it doesn't exist + Path(directory).mkdir(parents=True, exist_ok=True) + + # Generate filename: original_name.{format} + original_stem = Path(filename).stem + normalized_filename = f"{original_stem}.{self.output_format}" + + return Path(directory) / normalized_filename + + def _get_original_path(self, sound: Sound) -> Path: + """Get the original file path for a sound.""" + return self._get_original_path_from_data(sound.type, sound.filename) + + def _get_original_path_from_data(self, sound_type: str, filename: str) -> Path: + """Get the original file path from sound data.""" + # Map sound types to their original directories + type_to_original_dir = { + "SDB": "sounds/originals/soundboard", + "TTS": "sounds/originals/text_to_speech", + "EXT": "sounds/originals/extracted", + } + + original_dir = type_to_original_dir.get(sound_type, "sounds/originals/other") + return Path(original_dir) / filename + + def _get_file_hash(self, file_path: Path) -> str: + """Calculate SHA-256 hash of a file.""" + hash_sha256 = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_sha256.update(chunk) + return hash_sha256.hexdigest() + + def _get_file_size(self, file_path: Path) -> int: + """Get file size in bytes.""" + return file_path.stat().st_size + + def _get_audio_duration(self, file_path: Path) -> int: + """Get audio duration in milliseconds using ffmpeg.""" + try: + probe = ffmpeg.probe(str(file_path)) + duration = float(probe["format"]["duration"]) + return int(duration * 1000) # Convert to milliseconds + except Exception as e: + logger.warning("Failed to get duration for %s: %s", file_path, e) + return 0 + + async def _normalize_audio_one_pass( + self, + input_path: Path, + output_path: Path, + ) -> None: + """Normalize audio using one-pass loudnorm.""" + try: + logger.info( + "Starting one-pass normalization: %s -> %s", + input_path, + output_path, + ) + + stream = ffmpeg.input(str(input_path)) + stream = ffmpeg.filter(stream, "loudnorm", I=-23, TP=-2, LRA=7) + + # Apply output format and bitrate + output_args = {} + if self.output_format == "mp3": + output_args["acodec"] = "libmp3lame" + output_args["audio_bitrate"] = self.output_bitrate + elif self.output_format == "aac": + output_args["acodec"] = "aac" + output_args["audio_bitrate"] = self.output_bitrate + elif self.output_format == "opus": + output_args["acodec"] = "libopus" + output_args["audio_bitrate"] = self.output_bitrate + + stream = ffmpeg.output(stream, str(output_path), **output_args) + stream = ffmpeg.overwrite_output(stream) + + ffmpeg.run(stream, quiet=True, overwrite_output=True) + logger.info("One-pass normalization completed: %s", output_path) + + except Exception as e: + logger.exception("One-pass normalization failed for %s", input_path) + raise + + async def _normalize_audio_two_pass( + self, + input_path: Path, + output_path: Path, + ) -> None: + """Normalize audio using two-pass loudnorm for better quality.""" + try: + logger.info( + "Starting two-pass normalization: %s -> %s", input_path, output_path + ) + + # First pass: analyze + logger.debug("First pass: analyzing %s", input_path) + + stream = ffmpeg.input(str(input_path)) + stream = ffmpeg.filter( + stream, + "loudnorm", + I=-23, + TP=-2, + LRA=7, + print_format="json", + ) + # Output to null device with explicit format + null_output = "/dev/null" if os.name != "nt" else "NUL" + stream = ffmpeg.output(stream, null_output, format="null") + + # Run first pass and capture output + try: + result = ffmpeg.run(stream, capture_stderr=True, quiet=True) + analysis_output = result[1].decode("utf-8") + except ffmpeg.Error as e: + logger.error("FFmpeg first pass failed for %s. Stdout: %s, Stderr: %s", + input_path, e.stdout.decode() if e.stdout else "None", + e.stderr.decode() if e.stderr else "None") + raise + + # Extract loudnorm measurements from the output + # The JSON output is at the end of stderr + logger.debug("Loudnorm analysis output: %s", analysis_output) + + # Find JSON in the output + json_match = re.search(r'\{[^{}]*"input_i"[^{}]*\}', analysis_output) + if not json_match: + logger.error("Could not find JSON in loudnorm output: %s", analysis_output) + raise ValueError("Could not extract loudnorm analysis data") + + logger.debug("Found JSON match: %s", json_match.group()) + analysis_data = json.loads(json_match.group()) + + # Second pass: normalize with measured values + logger.debug("Second pass: normalizing %s with measured values", input_path) + + stream = ffmpeg.input(str(input_path)) + stream = ffmpeg.filter( + stream, + "loudnorm", + measured_I=analysis_data["input_i"], + measured_LRA=analysis_data["input_lra"], + measured_TP=analysis_data["input_tp"], + measured_thresh=analysis_data["input_thresh"], + offset=analysis_data["target_offset"], + ) + + # Apply output format and bitrate + output_args = {} + if self.output_format == "mp3": + output_args["acodec"] = "libmp3lame" + output_args["audio_bitrate"] = self.output_bitrate + elif self.output_format == "aac": + output_args["acodec"] = "aac" + output_args["audio_bitrate"] = self.output_bitrate + elif self.output_format == "opus": + output_args["acodec"] = "libopus" + output_args["audio_bitrate"] = self.output_bitrate + + stream = ffmpeg.output(stream, str(output_path), **output_args) + stream = ffmpeg.overwrite_output(stream) + + try: + ffmpeg.run(stream, quiet=True, overwrite_output=True) + logger.info("Two-pass normalization completed: %s", output_path) + except ffmpeg.Error as e: + logger.error("FFmpeg second pass failed for %s. Stdout: %s, Stderr: %s", + input_path, e.stdout.decode() if e.stdout else "None", + e.stderr.decode() if e.stderr else "None") + raise + + except Exception as e: + logger.exception("Two-pass normalization failed for %s", input_path) + raise + + async def normalize_sound( + self, + sound: Sound, + force: bool = False, + one_pass: bool | None = None, + sound_data: dict | None = None, + ) -> NormalizationInfo: + """Normalize a single sound.""" + # Use provided sound_data to avoid detached instance issues, or capture from sound + if sound_data: + filename = sound_data["filename"] + sound_id = sound_data["id"] + is_normalized = sound_data["is_normalized"] + sound_type = sound_data["type"] + else: + # Fallback to accessing sound properties (for single sound normalization) + filename = sound.filename + sound_id = sound.id + is_normalized = sound.is_normalized + sound_type = sound.type + + # Check if already normalized and not forcing + if is_normalized and not force: + return { + "filename": filename, + "status": "skipped", + "reason": "already normalized", + "original_path": None, + "normalized_path": None, + "normalized_filename": None, + "normalized_duration": None, + "normalized_size": None, + "normalized_hash": None, + "id": sound_id, + "error": None, + } + + try: + # Get paths using captured data to avoid accessing sound properties + original_path = self._get_original_path_from_data(sound_type, filename) + normalized_path = self._get_normalized_path_from_data(sound_type, filename) + + # Check if original file exists + if not original_path.exists(): + error_msg = f"Original file not found: {original_path}" + logger.error(error_msg) + return { + "filename": filename, + "status": "error", + "reason": None, + "original_path": str(original_path), + "normalized_path": None, + "normalized_filename": None, + "normalized_duration": None, + "normalized_size": None, + "normalized_hash": None, + "id": sound_id, + "error": error_msg, + } + + # Determine which normalization method to use + use_one_pass = one_pass if one_pass is not None else (self.passes == 1) + + # Perform normalization + if use_one_pass: + await self._normalize_audio_one_pass(original_path, normalized_path) + else: + await self._normalize_audio_two_pass(original_path, normalized_path) + + # Get normalized file info + normalized_duration = self._get_audio_duration(normalized_path) + normalized_size = self._get_file_size(normalized_path) + normalized_hash = self._get_file_hash(normalized_path) + normalized_filename = normalized_path.name + + # Update sound in database + update_data = { + "normalized_filename": normalized_filename, + "normalized_duration": normalized_duration, + "normalized_size": normalized_size, + "normalized_hash": normalized_hash, + "is_normalized": True, + } + + await self.sound_repo.update(sound, update_data) + logger.info("Normalized sound: %s -> %s", filename, normalized_filename) + + return { + "filename": filename, + "status": "normalized", + "reason": None, + "original_path": str(original_path), + "normalized_path": str(normalized_path), + "normalized_filename": normalized_filename, + "normalized_duration": normalized_duration, + "normalized_size": normalized_size, + "normalized_hash": normalized_hash, + "id": sound_id, + "error": None, + } + + except Exception as e: + error_msg = str(e) + logger.exception( + "Failed to normalize sound %s", + filename, + ) + return { + "filename": filename, + "status": "error", + "reason": None, + "original_path": ( + str(original_path) if "original_path" in locals() else None + ), + "normalized_path": ( + str(normalized_path) if "normalized_path" in locals() else None + ), + "normalized_filename": None, + "normalized_duration": None, + "normalized_size": None, + "normalized_hash": None, + "id": sound_id, + "error": error_msg, + } + + async def normalize_all_sounds( + self, + force: bool = False, + one_pass: bool | None = None, + ) -> NormalizationResults: + """Normalize all unnormalized sounds.""" + logger.info("Starting normalization of all sounds") + + results: NormalizationResults = { + "processed": 0, + "normalized": 0, + "skipped": 0, + "errors": 0, + "files": [], + } + + # Get sounds to normalize + if force: + # Get all sounds if forcing + sounds = [] + for sound_type in self.type_directories.keys(): + type_sounds = await self.sound_repo.get_by_type(sound_type) + sounds.extend(type_sounds) + else: + # Get only unnormalized sounds + sounds = await self.sound_repo.get_unnormalized_sounds() + + logger.info("Found %d sounds to process", len(sounds)) + + # Capture all sound data upfront to avoid session detachment issues + sound_data_list = [] + for sound in sounds: + sound_data_list.append( + { + "id": sound.id, + "filename": sound.filename, + "type": sound.type, + "is_normalized": sound.is_normalized, + "name": sound.name, + } + ) + + # Process each sound using captured data + for i, sound in enumerate(sounds): + results["processed"] += 1 + + # Use captured data to avoid detached instance issues + sound_data = sound_data_list[i] + sound_id = sound_data["id"] + sound_filename = sound_data["filename"] + + try: + normalization_info = await self.normalize_sound( + sound, + force=force, + one_pass=one_pass, + sound_data=sound_data, + ) + + results["files"].append(normalization_info) + + if normalization_info["status"] == "normalized": + results["normalized"] += 1 + elif normalization_info["status"] == "skipped": + results["skipped"] += 1 + elif normalization_info["status"] == "error": + results["errors"] += 1 + + except Exception as e: + logger.exception( + "Unexpected error processing sound %s", + sound_filename, + ) + results["errors"] += 1 + results["files"].append( + { + "filename": sound_filename, + "status": "error", + "reason": None, + "original_path": None, + "normalized_path": None, + "normalized_filename": None, + "normalized_duration": None, + "normalized_size": None, + "normalized_hash": None, + "id": sound_id, + "error": str(e), + } + ) + + logger.info("Normalization completed: %s", results) + return results + + async def normalize_sounds_by_type( + self, + sound_type: str, + force: bool = False, + one_pass: bool | None = None, + ) -> NormalizationResults: + """Normalize all sounds of a specific type.""" + logger.info("Starting normalization of %s sounds", sound_type) + + results: NormalizationResults = { + "processed": 0, + "normalized": 0, + "skipped": 0, + "errors": 0, + "files": [], + } + + # Get sounds to normalize + if force: + sounds = await self.sound_repo.get_by_type(sound_type) + else: + sounds = await self.sound_repo.get_unnormalized_sounds_by_type(sound_type) + + logger.info("Found %d %s sounds to process", len(sounds), sound_type) + + # Capture all sound data upfront to avoid session detachment issues + sound_data_list = [] + for sound in sounds: + sound_data_list.append( + { + "id": sound.id, + "filename": sound.filename, + "type": sound.type, + "is_normalized": sound.is_normalized, + "name": sound.name, + } + ) + + # Process each sound using captured data + for i, sound in enumerate(sounds): + results["processed"] += 1 + + # Use captured data to avoid detached instance issues + sound_data = sound_data_list[i] + sound_id = sound_data["id"] + sound_filename = sound_data["filename"] + + try: + normalization_info = await self.normalize_sound( + sound, + force=force, + one_pass=one_pass, + sound_data=sound_data, + ) + + results["files"].append(normalization_info) + + if normalization_info["status"] == "normalized": + results["normalized"] += 1 + elif normalization_info["status"] == "skipped": + results["skipped"] += 1 + elif normalization_info["status"] == "error": + results["errors"] += 1 + + except Exception as e: + logger.exception( + "Unexpected error processing sound %s", + sound_filename, + ) + results["errors"] += 1 + results["files"].append( + { + "filename": sound_filename, + "status": "error", + "reason": None, + "original_path": None, + "normalized_path": None, + "normalized_filename": None, + "normalized_duration": None, + "normalized_size": None, + "normalized_hash": None, + "id": sound_id, + "error": str(e), + } + ) + + logger.info("Type normalization completed: %s", results) + return results diff --git a/app/services/sound_scanner.py b/app/services/sound_scanner.py index 5098c0d..fe12ca7 100644 --- a/app/services/sound_scanner.py +++ b/app/services/sound_scanner.py @@ -16,6 +16,7 @@ logger = get_logger(__name__) class FileInfo(TypedDict): """Type definition for file information in scan results.""" + filename: str status: str reason: str | None @@ -29,6 +30,7 @@ class FileInfo(TypedDict): class ScanResults(TypedDict): """Type definition for scan results.""" + scanned: int added: int updated: int @@ -45,15 +47,23 @@ class SoundScannerService: """Initialize the sound scanner service.""" self.session = session self.sound_repo = SoundRepository(session) - self.supported_extensions = {".mp3", ".wav", ".opus", ".flac", ".ogg", ".m4a", ".aac"} + self.supported_extensions = { + ".mp3", + ".wav", + ".opus", + ".flac", + ".ogg", + ".m4a", + ".aac", + } def get_file_hash(self, file_path: Path) -> str: - """Calculate MD5 hash of a file.""" - hash_md5 = hashlib.md5() + """Calculate SHA-256 hash of a file.""" + hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): - hash_md5.update(chunk) - return hash_md5.hexdigest() + hash_sha256.update(chunk) + return hash_sha256.hexdigest() def get_audio_duration(self, file_path: Path) -> int: """Get audio duration in milliseconds using ffmpeg.""" @@ -76,8 +86,7 @@ class SoundScannerService: # Replace underscores and hyphens with spaces name = name.replace("_", " ").replace("-", " ") # Capitalize words - name = " ".join(word.capitalize() for word in name.split()) - return name + return " ".join(word.capitalize() for word in name.split()) async def scan_directory( self, @@ -113,7 +122,8 @@ class SoundScannerService: # Get all audio files from directory audio_files = [ - f for f in scan_path.iterdir() + f + for f in scan_path.iterdir() if f.is_file() and f.suffix.lower() in self.supported_extensions ] @@ -134,17 +144,19 @@ class SoundScannerService: except Exception as e: logger.exception("Error processing file %s", file_path) results["errors"] += 1 - results["files"].append({ - "filename": filename, - "status": "error", - "reason": None, - "name": None, - "duration": None, - "size": None, - "id": None, - "error": str(e), - "changes": None, - }) + results["files"].append( + { + "filename": filename, + "status": "error", + "reason": None, + "name": None, + "duration": None, + "size": None, + "id": None, + "error": str(e), + "changes": None, + } + ) # Delete sounds that no longer exist in directory for filename, sound in sounds_by_filename.items(): @@ -153,31 +165,35 @@ class SoundScannerService: await self.sound_repo.delete(sound) logger.info("Deleted sound no longer in directory: %s", filename) results["deleted"] += 1 - results["files"].append({ - "filename": filename, - "status": "deleted", - "reason": "file no longer exists", - "name": sound.name, - "duration": sound.duration, - "size": sound.size, - "id": sound.id, - "error": None, - "changes": None, - }) + results["files"].append( + { + "filename": filename, + "status": "deleted", + "reason": "file no longer exists", + "name": sound.name, + "duration": sound.duration, + "size": sound.size, + "id": sound.id, + "error": None, + "changes": None, + } + ) except Exception as e: logger.exception("Error deleting sound %s", filename) results["errors"] += 1 - results["files"].append({ - "filename": filename, - "status": "error", - "reason": "failed to delete", - "name": sound.name, - "duration": sound.duration, - "size": sound.size, - "id": sound.id, - "error": str(e), - "changes": None, - }) + results["files"].append( + { + "filename": filename, + "status": "error", + "reason": "failed to delete", + "name": sound.name, + "duration": sound.duration, + "size": sound.size, + "id": sound.id, + "error": str(e), + "changes": None, + } + ) logger.info("Sync completed: %s", results) return results @@ -215,17 +231,19 @@ class SoundScannerService: logger.info("Added new sound: %s (ID: %s)", sound.name, sound.id) results["added"] += 1 - results["files"].append({ - "filename": filename, - "status": "added", - "reason": None, - "name": name, - "duration": duration, - "size": size, - "id": sound.id, - "error": None, - "changes": None, - }) + results["files"].append( + { + "filename": filename, + "status": "added", + "reason": None, + "name": name, + "duration": duration, + "size": size, + "id": sound.id, + "error": None, + "changes": None, + } + ) elif existing_sound.hash != file_hash: # Update existing sound (file was modified) @@ -240,33 +258,37 @@ class SoundScannerService: logger.info("Updated modified sound: %s (ID: %s)", name, existing_sound.id) results["updated"] += 1 - results["files"].append({ - "filename": filename, - "status": "updated", - "reason": "file was modified", - "name": name, - "duration": duration, - "size": size, - "id": existing_sound.id, - "error": None, - "changes": ["hash", "duration", "size", "name"], - }) + results["files"].append( + { + "filename": filename, + "status": "updated", + "reason": "file was modified", + "name": name, + "duration": duration, + "size": size, + "id": existing_sound.id, + "error": None, + "changes": ["hash", "duration", "size", "name"], + } + ) else: # File unchanged, skip logger.debug("Sound unchanged: %s", filename) results["skipped"] += 1 - results["files"].append({ - "filename": filename, - "status": "skipped", - "reason": "file unchanged", - "name": existing_sound.name, - "duration": existing_sound.duration, - "size": existing_sound.size, - "id": existing_sound.id, - "error": None, - "changes": None, - }) + results["files"].append( + { + "filename": filename, + "status": "skipped", + "reason": "file unchanged", + "name": existing_sound.name, + "duration": existing_sound.duration, + "size": existing_sound.size, + "id": existing_sound.id, + "error": None, + "changes": None, + } + ) async def scan_soundboard_directory(self) -> ScanResults: """Sync the default soundboard directory.""" diff --git a/tests/api/v1/test_sound_normalizer_endpoints.py b/tests/api/v1/test_sound_normalizer_endpoints.py new file mode 100644 index 0000000..7f240ca --- /dev/null +++ b/tests/api/v1/test_sound_normalizer_endpoints.py @@ -0,0 +1,613 @@ +"""Tests for sound normalizer API endpoints.""" + +from unittest.mock import patch + +import pytest +from httpx import ASGITransport, AsyncClient + +from app.models.user import User +from app.services.sound_normalizer import NormalizationResults + + +class TestSoundNormalizerEndpoints: + """Test sound normalizer API endpoints.""" + + @pytest.mark.asyncio + async def test_normalize_all_sounds_success( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test successful normalization of all sounds.""" + mock_results: NormalizationResults = { + "processed": 3, + "normalized": 2, + "skipped": 1, + "errors": 0, + "files": [ + { + "filename": "test1.mp3", + "status": "normalized", + "reason": None, + "original_path": "/fake/test1.mp3", + "normalized_path": "/fake/test1_normalized.mp3", + "normalized_filename": "test1_normalized.mp3", + "normalized_duration": 5000, + "normalized_size": 1024, + "normalized_hash": "norm_hash1", + "id": 1, + "error": None, + }, + { + "filename": "test2.wav", + "status": "normalized", + "reason": None, + "original_path": "/fake/test2.wav", + "normalized_path": "/fake/test2_normalized.mp3", + "normalized_filename": "test2_normalized.mp3", + "normalized_duration": 7000, + "normalized_size": 2048, + "normalized_hash": "norm_hash2", + "id": 2, + "error": None, + }, + { + "filename": "test3.mp3", + "status": "skipped", + "reason": "already normalized", + "original_path": None, + "normalized_path": None, + "normalized_filename": None, + "normalized_duration": None, + "normalized_size": None, + "normalized_hash": None, + "id": 3, + "error": None, + }, + ], + } + + with patch( + "app.services.sound_normalizer.SoundNormalizerService.normalize_all_sounds" + ) as mock_normalize: + mock_normalize.return_value = mock_results + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/all" + ) + + assert response.status_code == 200 + data = response.json() + + assert "message" in data + assert "Sound normalization completed" in data["message"] + assert "results" in data + + results = data["results"] + assert results["processed"] == 3 + assert results["normalized"] == 2 + assert results["skipped"] == 1 + assert results["errors"] == 0 + assert len(results["files"]) == 3 + + @pytest.mark.asyncio + async def test_normalize_all_sounds_with_force( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test normalization with force parameter.""" + mock_results: NormalizationResults = { + "processed": 1, + "normalized": 1, + "skipped": 0, + "errors": 0, + "files": [], + } + + with patch( + "app.services.sound_normalizer.SoundNormalizerService.normalize_all_sounds" + ) as mock_normalize: + mock_normalize.return_value = mock_results + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/all", params={"force": True} + ) + + assert response.status_code == 200 + + # Verify force parameter was passed + mock_normalize.assert_called_once_with(force=True, one_pass=None) + + @pytest.mark.asyncio + async def test_normalize_all_sounds_with_one_pass( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test normalization with one_pass parameter.""" + mock_results: NormalizationResults = { + "processed": 1, + "normalized": 1, + "skipped": 0, + "errors": 0, + "files": [], + } + + with patch( + "app.services.sound_normalizer.SoundNormalizerService.normalize_all_sounds" + ) as mock_normalize: + mock_normalize.return_value = mock_results + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/all", params={"one_pass": True} + ) + + assert response.status_code == 200 + + # Verify one_pass parameter was passed + mock_normalize.assert_called_once_with(force=False, one_pass=True) + + @pytest.mark.asyncio + async def test_normalize_all_sounds_unauthenticated(self, client: AsyncClient): + """Test normalizing sounds without authentication.""" + response = await client.post("/api/v1/sounds/normalize/all") + + assert response.status_code == 401 + data = response.json() + assert "Could not validate credentials" in data["detail"] + + @pytest.mark.asyncio + async def test_normalize_all_sounds_non_admin( + self, + test_app, + test_user: User, + ): + """Test normalizing sounds with non-admin user.""" + from app.core.dependencies import get_current_active_user_flexible + + # Override the dependency to return regular user + async def override_get_current_user(): + test_user.role = "user" + return test_user + + test_app.dependency_overrides[get_current_active_user_flexible] = ( + override_get_current_user + ) + + headers = {"API-TOKEN": "test_api_token"} + + async with AsyncClient( + transport=ASGITransport(app=test_app), + base_url="http://test", + ) as client: + response = await client.post( + "/api/v1/sounds/normalize/all", headers=headers + ) + + assert response.status_code == 403 + data = response.json() + assert "Only administrators can normalize sounds" in data["detail"] + + # Clean up override + test_app.dependency_overrides.pop(get_current_active_user_flexible, None) + + @pytest.mark.asyncio + async def test_normalize_all_sounds_service_error( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test normalization when service raises an error.""" + with patch( + "app.services.sound_normalizer.SoundNormalizerService.normalize_all_sounds" + ) as mock_normalize: + mock_normalize.side_effect = Exception("Normalization service failed") + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/all" + ) + + assert response.status_code == 500 + data = response.json() + assert "Failed to normalize sounds" in data["detail"] + assert "Normalization service failed" in data["detail"] + + @pytest.mark.asyncio + async def test_normalize_sounds_by_type_success( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test successful normalization by sound type.""" + mock_results: NormalizationResults = { + "processed": 2, + "normalized": 2, + "skipped": 0, + "errors": 0, + "files": [ + { + "filename": "sdb1.mp3", + "status": "normalized", + "reason": None, + "original_path": "/fake/sdb1.mp3", + "normalized_path": "/fake/sdb1_normalized.mp3", + "normalized_filename": "sdb1_normalized.mp3", + "normalized_duration": 4000, + "normalized_size": 800, + "normalized_hash": "sdb_hash1", + "id": 10, + "error": None, + }, + { + "filename": "sdb2.wav", + "status": "normalized", + "reason": None, + "original_path": "/fake/sdb2.wav", + "normalized_path": "/fake/sdb2_normalized.mp3", + "normalized_filename": "sdb2_normalized.mp3", + "normalized_duration": 6000, + "normalized_size": 1200, + "normalized_hash": "sdb_hash2", + "id": 11, + "error": None, + }, + ], + } + + with patch( + "app.services.sound_normalizer.SoundNormalizerService.normalize_sounds_by_type" + ) as mock_normalize: + mock_normalize.return_value = mock_results + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/type/SDB" + ) + + assert response.status_code == 200 + data = response.json() + + assert "Normalization of SDB sounds completed" in data["message"] + assert "results" in data + + results = data["results"] + assert results["processed"] == 2 + assert results["normalized"] == 2 + assert len(results["files"]) == 2 + + # Verify the service was called with correct type + mock_normalize.assert_called_once_with( + sound_type="SDB", force=False, one_pass=None + ) + + @pytest.mark.asyncio + async def test_normalize_sounds_by_type_invalid_type( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test normalization with invalid sound type.""" + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/type/INVALID" + ) + + assert response.status_code == 400 + data = response.json() + assert "Invalid sound type" in data["detail"] + assert "Must be one of: SDB, TTS, EXT" in data["detail"] + + @pytest.mark.asyncio + async def test_normalize_sounds_by_type_with_params( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test normalization by type with force and one_pass parameters.""" + mock_results: NormalizationResults = { + "processed": 1, + "normalized": 1, + "skipped": 0, + "errors": 0, + "files": [], + } + + with patch( + "app.services.sound_normalizer.SoundNormalizerService.normalize_sounds_by_type" + ) as mock_normalize: + mock_normalize.return_value = mock_results + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/type/TTS", + params={"force": True, "one_pass": False}, + ) + + assert response.status_code == 200 + + # Verify parameters were passed correctly + mock_normalize.assert_called_once_with( + sound_type="TTS", force=True, one_pass=False + ) + + @pytest.mark.asyncio + async def test_normalize_sound_by_id_success( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test successful normalization of a specific sound.""" + # Mock the sound + mock_sound = type( + "Sound", + (), + { + "id": 42, + "filename": "specific_sound.mp3", + "type": "SDB", + "name": "Specific Sound", + }, + )() + + # Mock normalization result + mock_result = { + "filename": "specific_sound.mp3", + "status": "normalized", + "reason": None, + "original_path": "/fake/specific_sound.mp3", + "normalized_path": "/fake/specific_sound_normalized.mp3", + "normalized_filename": "specific_sound_normalized.mp3", + "normalized_duration": 8000, + "normalized_size": 1600, + "normalized_hash": "specific_hash", + "id": 42, + "error": None, + } + + with patch( + "app.services.sound_normalizer.SoundNormalizerService.normalize_sound" + ) as mock_normalize_sound, patch( + "app.repositories.sound.SoundRepository.get_by_id" + ) as mock_get_sound: + + mock_get_sound.return_value = mock_sound + mock_normalize_sound.return_value = mock_result + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/42" + ) + + assert response.status_code == 200 + data = response.json() + + assert "Sound normalization normalized" in data["message"] + assert "specific_sound.mp3" in data["message"] + assert data["status"] == "normalized" + assert data["normalized_filename"] == "specific_sound_normalized.mp3" + + # Verify sound was retrieved and normalized + mock_get_sound.assert_called_once_with(42) + mock_normalize_sound.assert_called_once() + + @pytest.mark.asyncio + async def test_normalize_sound_by_id_not_found( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test normalization of non-existent sound.""" + with patch( + "app.repositories.sound.SoundRepository.get_by_id" + ) as mock_get_sound: + mock_get_sound.return_value = None + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/999" + ) + + assert response.status_code == 404 + data = response.json() + assert "Sound with ID 999 not found" in data["detail"] + + @pytest.mark.asyncio + async def test_normalize_sound_by_id_normalization_error( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test normalization when the sound normalization fails.""" + # Mock the sound + mock_sound = type( + "Sound", + (), + { + "id": 42, + "filename": "error_sound.mp3", + "type": "SDB", + "name": "Error Sound", + }, + )() + + # Mock normalization error result + mock_result = { + "filename": "error_sound.mp3", + "status": "error", + "reason": None, + "original_path": "/fake/error_sound.mp3", + "normalized_path": None, + "normalized_filename": None, + "normalized_duration": None, + "normalized_size": None, + "normalized_hash": None, + "id": 42, + "error": "File format not supported", + } + + with patch( + "app.services.sound_normalizer.SoundNormalizerService.normalize_sound" + ) as mock_normalize_sound, patch( + "app.repositories.sound.SoundRepository.get_by_id" + ) as mock_get_sound: + + mock_get_sound.return_value = mock_sound + mock_normalize_sound.return_value = mock_result + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/42" + ) + + assert response.status_code == 500 + data = response.json() + assert "Failed to normalize sound" in data["detail"] + assert "File format not supported" in data["detail"] + + @pytest.mark.asyncio + async def test_normalize_sound_by_id_with_params( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test sound normalization with force and one_pass parameters.""" + # Mock the sound + mock_sound = type( + "Sound", + (), + { + "id": 42, + "filename": "param_sound.mp3", + "type": "SDB", + "name": "Param Sound", + }, + )() + + # Mock normalization result + mock_result = { + "filename": "param_sound.mp3", + "status": "normalized", + "reason": None, + "original_path": "/fake/param_sound.mp3", + "normalized_path": "/fake/param_sound_normalized.mp3", + "normalized_filename": "param_sound_normalized.mp3", + "normalized_duration": 5000, + "normalized_size": 1000, + "normalized_hash": "param_hash", + "id": 42, + "error": None, + } + + with patch( + "app.services.sound_normalizer.SoundNormalizerService.normalize_sound" + ) as mock_normalize_sound, patch( + "app.repositories.sound.SoundRepository.get_by_id" + ) as mock_get_sound: + + mock_get_sound.return_value = mock_sound + mock_normalize_sound.return_value = mock_result + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/42", + params={"force": True, "one_pass": True}, + ) + + assert response.status_code == 200 + + # Verify parameters were passed to normalize_sound + call_args = mock_normalize_sound.call_args + assert call_args[1]["force"] == True + assert call_args[1]["one_pass"] == True + + @pytest.mark.asyncio + async def test_normalize_sound_by_id_skipped( + self, + authenticated_admin_client: AsyncClient, + admin_user: User, + ): + """Test normalization when sound is already normalized and not forced.""" + # Mock the sound + mock_sound = type( + "Sound", + (), + { + "id": 42, + "filename": "already_normalized.mp3", + "type": "SDB", + "name": "Already Normalized", + }, + )() + + # Mock skipped result + mock_result = { + "filename": "already_normalized.mp3", + "status": "skipped", + "reason": "already normalized", + "original_path": None, + "normalized_path": None, + "normalized_filename": None, + "normalized_duration": None, + "normalized_size": None, + "normalized_hash": None, + "id": 42, + "error": None, + } + + with patch( + "app.services.sound_normalizer.SoundNormalizerService.normalize_sound" + ) as mock_normalize_sound, patch( + "app.repositories.sound.SoundRepository.get_by_id" + ) as mock_get_sound: + + mock_get_sound.return_value = mock_sound + mock_normalize_sound.return_value = mock_result + + response = await authenticated_admin_client.post( + "/api/v1/sounds/normalize/42" + ) + + assert response.status_code == 200 + data = response.json() + + assert "Sound normalization skipped" in data["message"] + assert data["status"] == "skipped" + assert data["reason"] == "already normalized" + + @pytest.mark.asyncio + async def test_normalize_sound_by_id_unauthenticated(self, client: AsyncClient): + """Test normalizing a specific sound without authentication.""" + response = await client.post("/api/v1/sounds/normalize/42") + + assert response.status_code == 401 + data = response.json() + assert "Could not validate credentials" in data["detail"] + + @pytest.mark.asyncio + async def test_normalize_sound_by_id_non_admin( + self, + test_app, + test_user: User, + ): + """Test normalizing a specific sound with non-admin user.""" + from app.core.dependencies import get_current_active_user_flexible + + # Override the dependency to return regular user + async def override_get_current_user(): + test_user.role = "user" + return test_user + + test_app.dependency_overrides[get_current_active_user_flexible] = ( + override_get_current_user + ) + + headers = {"API-TOKEN": "test_api_token"} + + async with AsyncClient( + transport=ASGITransport(app=test_app), + base_url="http://test", + ) as client: + response = await client.post( + "/api/v1/sounds/normalize/42", headers=headers + ) + + assert response.status_code == 403 + data = response.json() + assert "Only administrators can normalize sounds" in data["detail"] + + # Clean up override + test_app.dependency_overrides.pop(get_current_active_user_flexible, None) \ No newline at end of file diff --git a/tests/services/test_sound_normalizer.py b/tests/services/test_sound_normalizer.py new file mode 100644 index 0000000..f9afff1 --- /dev/null +++ b/tests/services/test_sound_normalizer.py @@ -0,0 +1,559 @@ +"""Tests for sound normalizer service.""" + +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, Mock, patch + +import pytest +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.models.sound import Sound +from app.services.sound_normalizer import SoundNormalizerService + + +class TestSoundNormalizerService: + """Test sound normalizer service.""" + + @pytest.fixture + def mock_session(self): + """Create a mock session.""" + return Mock(spec=AsyncSession) + + @pytest.fixture + def normalizer_service(self, mock_session): + """Create a normalizer service with mock session.""" + with patch("app.services.sound_normalizer.settings") as mock_settings: + mock_settings.NORMALIZED_AUDIO_FORMAT = "mp3" + mock_settings.NORMALIZED_AUDIO_BITRATE = "256k" + mock_settings.NORMALIZED_AUDIO_PASSES = 2 + return SoundNormalizerService(mock_session) + + def test_init(self, normalizer_service): + """Test normalizer service initialization.""" + assert normalizer_service.session is not None + assert normalizer_service.sound_repo is not None + assert normalizer_service.output_format == "mp3" + assert normalizer_service.output_bitrate == "256k" + assert normalizer_service.passes == 2 + assert len(normalizer_service.type_directories) == 3 + assert "SDB" in normalizer_service.type_directories + assert "TTS" in normalizer_service.type_directories + assert "EXT" in normalizer_service.type_directories + + def test_get_normalized_path(self, normalizer_service): + """Test normalized path generation.""" + sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test_audio.mp3", + duration=5000, + size=1024, + hash="test_hash", + ) + + normalized_path = normalizer_service._get_normalized_path(sound) + + assert "sounds/normalized/soundboard" in str(normalized_path) + assert "test_audio.mp3" == normalized_path.name + + def test_get_original_path(self, normalizer_service): + """Test original path generation.""" + sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test_audio.wav", + duration=5000, + size=1024, + hash="test_hash", + ) + + original_path = normalizer_service._get_original_path(sound) + + assert "sounds/originals/soundboard" in str(original_path) + assert "test_audio.wav" == original_path.name + + def test_get_file_hash(self, normalizer_service): + """Test file hash calculation.""" + # Create a temporary file + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write("test content for hash") + temp_path = Path(f.name) + + try: + hash_value = normalizer_service._get_file_hash(temp_path) + assert len(hash_value) == 64 # SHA-256 hash length + assert isinstance(hash_value, str) + finally: + temp_path.unlink() + + def test_get_file_size(self, normalizer_service): + """Test file size calculation.""" + # Create a temporary file + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write("test content for size") + temp_path = Path(f.name) + + try: + size = normalizer_service._get_file_size(temp_path) + assert size > 0 + assert isinstance(size, int) + finally: + temp_path.unlink() + + @patch("app.services.sound_normalizer.ffmpeg.probe") + def test_get_audio_duration_success(self, mock_probe, normalizer_service): + """Test successful audio duration extraction.""" + mock_probe.return_value = {"format": {"duration": "123.456"}} + + temp_path = Path("/fake/path/test.mp3") + duration = normalizer_service._get_audio_duration(temp_path) + + assert duration == 123456 # 123.456 seconds * 1000 = 123456 ms + mock_probe.assert_called_once_with(str(temp_path)) + + @patch("app.services.sound_normalizer.ffmpeg.probe") + def test_get_audio_duration_failure(self, mock_probe, normalizer_service): + """Test audio duration extraction failure.""" + mock_probe.side_effect = Exception("FFmpeg error") + + temp_path = Path("/fake/path/test.mp3") + duration = normalizer_service._get_audio_duration(temp_path) + + assert duration == 0 + mock_probe.assert_called_once_with(str(temp_path)) + + @pytest.mark.asyncio + async def test_normalize_sound_already_normalized(self, normalizer_service): + """Test normalizing a sound that's already normalized.""" + sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=5000, + size=1024, + hash="test_hash", + is_normalized=True, + ) + + result = await normalizer_service.normalize_sound(sound) + + assert result["status"] == "skipped" + assert result["reason"] == "already normalized" + assert result["filename"] == "test.mp3" + assert result["id"] == 1 + + @pytest.mark.asyncio + async def test_normalize_sound_force_already_normalized(self, normalizer_service): + """Test force normalizing a sound that's already normalized.""" + sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=5000, + size=1024, + hash="test_hash", + is_normalized=True, + ) + + # Mock file operations and ffmpeg + with patch.object(normalizer_service, "_get_original_path") as mock_orig_path, \ + patch.object(normalizer_service, "_get_normalized_path") as mock_norm_path, \ + patch.object(normalizer_service, "_normalize_audio_two_pass") as mock_normalize, \ + patch.object(normalizer_service, "_get_audio_duration", return_value=6000), \ + patch.object(normalizer_service, "_get_file_size", return_value=2048), \ + patch.object(normalizer_service, "_get_file_hash", return_value="new_hash"): + + # Setup path mocks + mock_orig_path.return_value = Path("/fake/original.mp3") + mock_norm_path.return_value = Path("/fake/normalized.mp3") + + # Mock file existence + with patch("pathlib.Path.exists", return_value=True): + # Mock repository update + normalizer_service.sound_repo.update = AsyncMock() + + result = await normalizer_service.normalize_sound(sound, force=True) + + assert result["status"] == "normalized" + assert result["filename"] == "test.mp3" + assert result["normalized_duration"] == 6000 + assert result["normalized_size"] == 2048 + assert result["normalized_hash"] == "new_hash" + + # Verify update was called + normalizer_service.sound_repo.update.assert_called_once() + + @pytest.mark.asyncio + async def test_normalize_sound_file_not_found(self, normalizer_service): + """Test normalizing a sound where original file doesn't exist.""" + sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="missing.mp3", + duration=5000, + size=1024, + hash="test_hash", + is_normalized=False, + ) + + with patch.object(normalizer_service, "_get_original_path") as mock_path: + mock_path.return_value = Path("/fake/missing.mp3") + + # Mock file doesn't exist + with patch("pathlib.Path.exists", return_value=False): + result = await normalizer_service.normalize_sound(sound) + + assert result["status"] == "error" + assert "Original file not found" in result["error"] + assert result["filename"] == "missing.mp3" + + @pytest.mark.asyncio + async def test_normalize_sound_one_pass(self, normalizer_service): + """Test normalizing a sound using one-pass method.""" + sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=5000, + size=1024, + hash="test_hash", + is_normalized=False, + ) + + with patch.object(normalizer_service, "_get_original_path") as mock_orig_path, \ + patch.object(normalizer_service, "_get_normalized_path") as mock_norm_path, \ + patch.object(normalizer_service, "_normalize_audio_one_pass") as mock_normalize, \ + patch.object(normalizer_service, "_get_audio_duration", return_value=5500), \ + patch.object(normalizer_service, "_get_file_size", return_value=1500), \ + patch.object(normalizer_service, "_get_file_hash", return_value="norm_hash"): + + # Setup path mocks + mock_orig_path.return_value = Path("/fake/original.mp3") + mock_norm_path.return_value = Path("/fake/normalized.mp3") + + # Mock file existence + with patch("pathlib.Path.exists", return_value=True): + # Mock repository update + normalizer_service.sound_repo.update = AsyncMock() + + result = await normalizer_service.normalize_sound(sound, one_pass=True) + + assert result["status"] == "normalized" + assert result["normalized_duration"] == 5500 + assert result["normalized_size"] == 1500 + assert result["normalized_hash"] == "norm_hash" + + # Verify one-pass was used + mock_normalize.assert_called_once() + + @pytest.mark.asyncio + async def test_normalize_sound_normalization_error(self, normalizer_service): + """Test handling normalization errors.""" + sound = Sound( + id=1, + type="SDB", + name="Test Sound", + filename="test.mp3", + duration=5000, + size=1024, + hash="test_hash", + is_normalized=False, + ) + + with patch.object(normalizer_service, "_get_original_path") as mock_orig_path, \ + patch.object(normalizer_service, "_get_normalized_path") as mock_norm_path: + + # Setup path mocks + mock_orig_path.return_value = Path("/fake/original.mp3") + mock_norm_path.return_value = Path("/fake/normalized.mp3") + + # Mock file existence but normalization fails + with patch("pathlib.Path.exists", return_value=True), \ + patch.object(normalizer_service, "_normalize_audio_two_pass") as mock_normalize: + + mock_normalize.side_effect = Exception("Normalization failed") + + result = await normalizer_service.normalize_sound(sound) + + assert result["status"] == "error" + assert "Normalization failed" in result["error"] + assert result["filename"] == "test.mp3" + + @pytest.mark.asyncio + async def test_normalize_all_sounds(self, normalizer_service): + """Test normalizing all unnormalized sounds.""" + sounds = [ + Sound( + id=1, + type="SDB", + name="Sound 1", + filename="sound1.mp3", + duration=5000, + size=1024, + hash="hash1", + is_normalized=False, + ), + Sound( + id=2, + type="TTS", + name="Sound 2", + filename="sound2.wav", + duration=3000, + size=512, + hash="hash2", + is_normalized=False, + ), + ] + + # Mock repository calls + normalizer_service.sound_repo.get_unnormalized_sounds = AsyncMock(return_value=sounds) + + # Mock individual normalization + with patch.object(normalizer_service, "normalize_sound") as mock_normalize: + mock_normalize.side_effect = [ + { + "filename": "sound1.mp3", + "status": "normalized", + "reason": None, + "original_path": "/fake/sound1.mp3", + "normalized_path": "/fake/sound1_normalized.mp3", + "normalized_filename": "sound1_normalized.mp3", + "normalized_duration": 5000, + "normalized_size": 1024, + "normalized_hash": "norm_hash1", + "id": 1, + "error": None, + }, + { + "filename": "sound2.wav", + "status": "normalized", + "reason": None, + "original_path": "/fake/sound2.wav", + "normalized_path": "/fake/sound2_normalized.mp3", + "normalized_filename": "sound2_normalized.mp3", + "normalized_duration": 3000, + "normalized_size": 512, + "normalized_hash": "norm_hash2", + "id": 2, + "error": None, + }, + ] + + results = await normalizer_service.normalize_all_sounds() + + assert results["processed"] == 2 + assert results["normalized"] == 2 + assert results["skipped"] == 0 + assert results["errors"] == 0 + assert len(results["files"]) == 2 + + @pytest.mark.asyncio + async def test_normalize_sounds_by_type(self, normalizer_service): + """Test normalizing sounds by type.""" + sdb_sounds = [ + Sound( + id=1, + type="SDB", + name="SDB Sound", + filename="sdb.mp3", + duration=5000, + size=1024, + hash="sdb_hash", + is_normalized=False, + ), + ] + + # Mock repository calls + normalizer_service.sound_repo.get_unnormalized_sounds_by_type = AsyncMock( + return_value=sdb_sounds + ) + + # Mock individual normalization + with patch.object(normalizer_service, "normalize_sound") as mock_normalize: + mock_normalize.return_value = { + "filename": "sdb.mp3", + "status": "normalized", + "reason": None, + "original_path": "/fake/sdb.mp3", + "normalized_path": "/fake/sdb_normalized.mp3", + "normalized_filename": "sdb_normalized.mp3", + "normalized_duration": 5000, + "normalized_size": 1024, + "normalized_hash": "sdb_norm_hash", + "id": 1, + "error": None, + } + + results = await normalizer_service.normalize_sounds_by_type("SDB") + + assert results["processed"] == 1 + assert results["normalized"] == 1 + assert results["skipped"] == 0 + assert results["errors"] == 0 + assert len(results["files"]) == 1 + + # Verify correct repository method was called + normalizer_service.sound_repo.get_unnormalized_sounds_by_type.assert_called_once_with("SDB") + + @pytest.mark.asyncio + async def test_normalize_sounds_with_errors(self, normalizer_service): + """Test normalizing sounds with some errors.""" + sounds = [ + Sound( + id=1, + type="SDB", + name="Good Sound", + filename="good.mp3", + duration=5000, + size=1024, + hash="good_hash", + is_normalized=False, + ), + Sound( + id=2, + type="SDB", + name="Bad Sound", + filename="bad.mp3", + duration=3000, + size=512, + hash="bad_hash", + is_normalized=False, + ), + ] + + # Mock repository calls + normalizer_service.sound_repo.get_unnormalized_sounds = AsyncMock(return_value=sounds) + + # Mock individual normalization with one success and one error + with patch.object(normalizer_service, "normalize_sound") as mock_normalize: + mock_normalize.side_effect = [ + { + "filename": "good.mp3", + "status": "normalized", + "reason": None, + "original_path": "/fake/good.mp3", + "normalized_path": "/fake/good_normalized.mp3", + "normalized_filename": "good_normalized.mp3", + "normalized_duration": 5000, + "normalized_size": 1024, + "normalized_hash": "good_norm_hash", + "id": 1, + "error": None, + }, + { + "filename": "bad.mp3", + "status": "error", + "reason": None, + "original_path": "/fake/bad.mp3", + "normalized_path": None, + "normalized_filename": None, + "normalized_duration": None, + "normalized_size": None, + "normalized_hash": None, + "id": 2, + "error": "File processing failed", + }, + ] + + results = await normalizer_service.normalize_all_sounds() + + assert results["processed"] == 2 + assert results["normalized"] == 1 + assert results["skipped"] == 0 + assert results["errors"] == 1 + assert len(results["files"]) == 2 + + # Check error file details + error_file = next(f for f in results["files"] if f["status"] == "error") + assert error_file["filename"] == "bad.mp3" + assert error_file["error"] == "File processing failed" + + @pytest.mark.asyncio + @patch("app.services.sound_normalizer.ffmpeg") + async def test_normalize_audio_one_pass_mp3( + self, + mock_ffmpeg, + normalizer_service, + ): + """Test one-pass audio normalization for MP3.""" + input_path = Path("/fake/input.wav") + output_path = Path("/fake/output.mp3") + + # Mock ffmpeg chain + mock_stream = Mock() + mock_ffmpeg.input.return_value = mock_stream + mock_ffmpeg.filter.return_value = mock_stream + mock_ffmpeg.output.return_value = mock_stream + mock_ffmpeg.overwrite_output.return_value = mock_stream + + await normalizer_service._normalize_audio_one_pass(input_path, output_path) + + # Verify ffmpeg chain was called correctly + mock_ffmpeg.input.assert_called_once_with(str(input_path)) + mock_ffmpeg.filter.assert_called_once_with(mock_stream, "loudnorm", I=-23, TP=-2, LRA=7) + mock_ffmpeg.output.assert_called_once() + mock_ffmpeg.run.assert_called_once() + + # Check output arguments include MP3 codec and bitrate + output_call_args = mock_ffmpeg.output.call_args + assert output_call_args[0][1] == str(output_path) # output path + output_kwargs = output_call_args[1] + assert output_kwargs["acodec"] == "libmp3lame" + assert output_kwargs["audio_bitrate"] == "256k" + + @pytest.mark.asyncio + @patch("app.services.sound_normalizer.ffmpeg") + async def test_normalize_audio_two_pass_analysis( + self, + mock_ffmpeg, + normalizer_service, + ): + """Test two-pass audio normalization analysis phase.""" + input_path = Path("/fake/input.wav") + output_path = Path("/fake/output.mp3") + + # Mock ffmpeg chain + mock_stream = Mock() + mock_ffmpeg.input.return_value = mock_stream + mock_ffmpeg.filter.return_value = mock_stream + mock_ffmpeg.output.return_value = mock_stream + mock_ffmpeg.overwrite_output.return_value = mock_stream + + # Mock analysis output with valid JSON + analysis_json = '''{ + "input_i": "-23.0", + "input_lra": "11.0", + "input_tp": "-2.0", + "input_thresh": "-33.0", + "target_offset": "0.0" + }''' + + mock_ffmpeg.run.side_effect = [ + (None, analysis_json.encode("utf-8")), # First pass analysis + None, # Second pass normalization + ] + + await normalizer_service._normalize_audio_two_pass(input_path, output_path) + + # Verify two ffmpeg runs occurred + assert mock_ffmpeg.run.call_count == 2 + + # Verify analysis pass used print_format=json + first_filter_call = mock_ffmpeg.filter.call_args_list[0] + assert "print_format" in first_filter_call[1] + assert first_filter_call[1]["print_format"] == "json" + + # Verify second pass used measured values + second_filter_call = mock_ffmpeg.filter.call_args_list[1] + measured_args = second_filter_call[1] + assert "measured_I" in measured_args + assert "measured_LRA" in measured_args + assert "measured_TP" in measured_args + assert "measured_thresh" in measured_args + assert "offset" in measured_args \ No newline at end of file