feat: Implement sound normalization service and API endpoints

- Added SoundNormalizerService for normalizing audio files with support for one-pass and two-pass normalization methods.
- Introduced API endpoints for normalizing all sounds and specific sounds by ID, including support for force normalization and handling of already normalized sounds.
- Created comprehensive test suite for the sound normalizer service and its API endpoints, covering various scenarios including success, errors, and edge cases.
- Refactored sound scanning service to utilize SHA-256 for file hashing instead of MD5 for improved security.
- Enhanced logging and error handling throughout the sound normalization process.
This commit is contained in:
JSC
2025-07-28 09:18:18 +02:00
parent 36949a1f1c
commit 0fffce53b4
8 changed files with 2031 additions and 75 deletions

View File

@@ -2,7 +2,7 @@
from fastapi import APIRouter from fastapi import APIRouter
from app.api.v1 import auth, main, socket, sounds from app.api.v1 import auth, main, socket, sound_normalizer, sounds
# V1 API router with v1 prefix # V1 API router with v1 prefix
api_router = APIRouter(prefix="/v1") api_router = APIRouter(prefix="/v1")
@@ -12,3 +12,4 @@ api_router.include_router(main.router, tags=["main"])
api_router.include_router(auth.router, prefix="/auth", tags=["authentication"]) api_router.include_router(auth.router, prefix="/auth", tags=["authentication"])
api_router.include_router(socket.router, tags=["socket"]) api_router.include_router(socket.router, tags=["socket"])
api_router.include_router(sounds.router, tags=["sounds"]) api_router.include_router(sounds.router, tags=["sounds"])
api_router.include_router(sound_normalizer.router, tags=["sound-normalization"])

View File

@@ -0,0 +1,166 @@
"""Sound normalization API endpoints."""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.database import get_db
from app.core.dependencies import get_current_active_user_flexible
from app.models.user import User
from app.services.sound_normalizer import NormalizationResults, SoundNormalizerService
router = APIRouter(prefix="/sounds/normalize", tags=["sound-normalization"])
async def get_sound_normalizer_service(
session: Annotated[AsyncSession, Depends(get_db)],
) -> SoundNormalizerService:
"""Get the sound normalizer service."""
return SoundNormalizerService(session)
@router.post("/all")
async def normalize_all_sounds(
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
normalizer_service: Annotated[
SoundNormalizerService, Depends(get_sound_normalizer_service)
],
force: bool = Query(
False, description="Force normalization of already normalized sounds"
),
one_pass: bool | None = Query(
None, description="Use one-pass normalization (overrides config)"
),
) -> dict[str, NormalizationResults | str]:
"""Normalize all unnormalized sounds."""
# Only allow admins to normalize sounds
if current_user.role not in ["admin", "superadmin"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can normalize sounds",
)
try:
results = await normalizer_service.normalize_all_sounds(
force=force,
one_pass=one_pass,
)
return {
"message": "Sound normalization completed",
"results": results,
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to normalize sounds: {e!s}",
) from e
@router.post("/type/{sound_type}")
async def normalize_sounds_by_type(
sound_type: str,
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
normalizer_service: Annotated[
SoundNormalizerService, Depends(get_sound_normalizer_service)
],
force: bool = Query(
False, description="Force normalization of already normalized sounds"
),
one_pass: bool | None = Query(
None, description="Use one-pass normalization (overrides config)"
),
) -> dict[str, NormalizationResults | str]:
"""Normalize all sounds of a specific type (SDB, TTS, EXT)."""
# Only allow admins to normalize sounds
if current_user.role not in ["admin", "superadmin"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can normalize sounds",
)
# Validate sound type
valid_types = ["SDB", "TTS", "EXT"]
if sound_type not in valid_types:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid sound type. Must be one of: {', '.join(valid_types)}",
)
try:
results = await normalizer_service.normalize_sounds_by_type(
sound_type=sound_type,
force=force,
one_pass=one_pass,
)
return {
"message": f"Normalization of {sound_type} sounds completed",
"results": results,
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to normalize {sound_type} sounds: {e!s}",
) from e
@router.post("/{sound_id}")
async def normalize_sound_by_id(
sound_id: int,
current_user: Annotated[User, Depends(get_current_active_user_flexible)],
normalizer_service: Annotated[
SoundNormalizerService, Depends(get_sound_normalizer_service)
],
force: bool = Query(
False, description="Force normalization of already normalized sound"
),
one_pass: bool | None = Query(
None, description="Use one-pass normalization (overrides config)"
),
) -> dict[str, str]:
"""Normalize a specific sound by ID."""
# Only allow admins to normalize sounds
if current_user.role not in ["admin", "superadmin"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only administrators can normalize sounds",
)
try:
# Get the sound
sound = await normalizer_service.sound_repo.get_by_id(sound_id)
if not sound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Sound with ID {sound_id} not found",
)
# Normalize the sound
result = await normalizer_service.normalize_sound(
sound=sound,
force=force,
one_pass=one_pass,
)
# Check result status
if result["status"] == "error":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to normalize sound: {result['error']}",
)
return {
"message": f"Sound normalization {result['status']}: {sound.filename}",
"status": result["status"],
"reason": result["reason"] or "",
"normalized_filename": result["normalized_filename"] or "",
}
except HTTPException:
# Re-raise HTTPExceptions without wrapping them
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to normalize sound: {e!s}",
) from e

View File

@@ -47,5 +47,10 @@ class Settings(BaseSettings):
GITHUB_CLIENT_ID: str = "" GITHUB_CLIENT_ID: str = ""
GITHUB_CLIENT_SECRET: str = "" GITHUB_CLIENT_SECRET: str = ""
# Audio Normalization Configuration
NORMALIZED_AUDIO_FORMAT: str = "mp3"
NORMALIZED_AUDIO_BITRATE: str = "256k"
NORMALIZED_AUDIO_PASSES: int = 2 # 1 for one-pass, 2 for two-pass
settings = Settings() settings = Settings()

View File

@@ -126,3 +126,26 @@ class SoundRepository:
except Exception: except Exception:
logger.exception("Failed to get popular sounds") logger.exception("Failed to get popular sounds")
raise raise
async def get_unnormalized_sounds(self) -> list[Sound]:
"""Get all sounds that haven't been normalized yet."""
try:
statement = select(Sound).where(Sound.is_normalized == False) # noqa: E712
result = await self.session.exec(statement)
return list(result.all())
except Exception:
logger.exception("Failed to get unnormalized sounds")
raise
async def get_unnormalized_sounds_by_type(self, sound_type: str) -> list[Sound]:
"""Get unnormalized sounds by type."""
try:
statement = select(Sound).where(
Sound.type == sound_type,
Sound.is_normalized == False, # noqa: E712
)
result = await self.session.exec(statement)
return list(result.all())
except Exception:
logger.exception("Failed to get unnormalized sounds by type: %s", sound_type)
raise

View File

@@ -0,0 +1,567 @@
"""Sound normalizer service for normalizing audio files using ffmpeg loudnorm."""
import hashlib
import json
import os
import re
from pathlib import Path
from typing import TypedDict
import ffmpeg # type: ignore[import-untyped]
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.config import settings
from app.core.logging import get_logger
from app.models.sound import Sound
from app.repositories.sound import SoundRepository
logger = get_logger(__name__)
class NormalizationInfo(TypedDict):
"""Type definition for normalization information in results."""
filename: str
status: str
reason: str | None
original_path: str | None
normalized_path: str | None
normalized_filename: str | None
normalized_duration: int | None
normalized_size: int | None
normalized_hash: str | None
id: int | None
error: str | None
class NormalizationResults(TypedDict):
"""Type definition for normalization results."""
processed: int
normalized: int
skipped: int
errors: int
files: list[NormalizationInfo]
class SoundNormalizerService:
"""Service for normalizing audio files using ffmpeg loudnorm."""
def __init__(self, session: AsyncSession) -> None:
"""Initialize the sound normalizer service."""
self.session = session
self.sound_repo = SoundRepository(session)
# Normalization settings from config
self.output_format = settings.NORMALIZED_AUDIO_FORMAT
self.output_bitrate = settings.NORMALIZED_AUDIO_BITRATE
self.passes = settings.NORMALIZED_AUDIO_PASSES
# Directory mappings for different sound types
self.type_directories = {
"SDB": "sounds/normalized/soundboard",
"TTS": "sounds/normalized/text_to_speech",
"EXT": "sounds/normalized/extracted",
}
# Ensure normalized directories exist
self._ensure_directories()
def _ensure_directories(self) -> None:
"""Ensure all normalized sound directories exist."""
for directory in self.type_directories.values():
Path(directory).mkdir(parents=True, exist_ok=True)
logger.debug("Ensured directory exists: %s", directory)
def _get_normalized_path(self, sound: Sound) -> Path:
"""Get the normalized file path for a sound."""
return self._get_normalized_path_from_data(sound.type, sound.filename)
def _get_normalized_path_from_data(self, sound_type: str, filename: str) -> Path:
"""Get the normalized file path from sound data."""
# Get the appropriate directory for the sound type
directory = self.type_directories.get(sound_type, "sounds/normalized/other")
# Create the directory if it doesn't exist
Path(directory).mkdir(parents=True, exist_ok=True)
# Generate filename: original_name.{format}
original_stem = Path(filename).stem
normalized_filename = f"{original_stem}.{self.output_format}"
return Path(directory) / normalized_filename
def _get_original_path(self, sound: Sound) -> Path:
"""Get the original file path for a sound."""
return self._get_original_path_from_data(sound.type, sound.filename)
def _get_original_path_from_data(self, sound_type: str, filename: str) -> Path:
"""Get the original file path from sound data."""
# Map sound types to their original directories
type_to_original_dir = {
"SDB": "sounds/originals/soundboard",
"TTS": "sounds/originals/text_to_speech",
"EXT": "sounds/originals/extracted",
}
original_dir = type_to_original_dir.get(sound_type, "sounds/originals/other")
return Path(original_dir) / filename
def _get_file_hash(self, file_path: Path) -> str:
"""Calculate SHA-256 hash of a file."""
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
def _get_file_size(self, file_path: Path) -> int:
"""Get file size in bytes."""
return file_path.stat().st_size
def _get_audio_duration(self, file_path: Path) -> int:
"""Get audio duration in milliseconds using ffmpeg."""
try:
probe = ffmpeg.probe(str(file_path))
duration = float(probe["format"]["duration"])
return int(duration * 1000) # Convert to milliseconds
except Exception as e:
logger.warning("Failed to get duration for %s: %s", file_path, e)
return 0
async def _normalize_audio_one_pass(
self,
input_path: Path,
output_path: Path,
) -> None:
"""Normalize audio using one-pass loudnorm."""
try:
logger.info(
"Starting one-pass normalization: %s -> %s",
input_path,
output_path,
)
stream = ffmpeg.input(str(input_path))
stream = ffmpeg.filter(stream, "loudnorm", I=-23, TP=-2, LRA=7)
# Apply output format and bitrate
output_args = {}
if self.output_format == "mp3":
output_args["acodec"] = "libmp3lame"
output_args["audio_bitrate"] = self.output_bitrate
elif self.output_format == "aac":
output_args["acodec"] = "aac"
output_args["audio_bitrate"] = self.output_bitrate
elif self.output_format == "opus":
output_args["acodec"] = "libopus"
output_args["audio_bitrate"] = self.output_bitrate
stream = ffmpeg.output(stream, str(output_path), **output_args)
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream, quiet=True, overwrite_output=True)
logger.info("One-pass normalization completed: %s", output_path)
except Exception as e:
logger.exception("One-pass normalization failed for %s", input_path)
raise
async def _normalize_audio_two_pass(
self,
input_path: Path,
output_path: Path,
) -> None:
"""Normalize audio using two-pass loudnorm for better quality."""
try:
logger.info(
"Starting two-pass normalization: %s -> %s", input_path, output_path
)
# First pass: analyze
logger.debug("First pass: analyzing %s", input_path)
stream = ffmpeg.input(str(input_path))
stream = ffmpeg.filter(
stream,
"loudnorm",
I=-23,
TP=-2,
LRA=7,
print_format="json",
)
# Output to null device with explicit format
null_output = "/dev/null" if os.name != "nt" else "NUL"
stream = ffmpeg.output(stream, null_output, format="null")
# Run first pass and capture output
try:
result = ffmpeg.run(stream, capture_stderr=True, quiet=True)
analysis_output = result[1].decode("utf-8")
except ffmpeg.Error as e:
logger.error("FFmpeg first pass failed for %s. Stdout: %s, Stderr: %s",
input_path, e.stdout.decode() if e.stdout else "None",
e.stderr.decode() if e.stderr else "None")
raise
# Extract loudnorm measurements from the output
# The JSON output is at the end of stderr
logger.debug("Loudnorm analysis output: %s", analysis_output)
# Find JSON in the output
json_match = re.search(r'\{[^{}]*"input_i"[^{}]*\}', analysis_output)
if not json_match:
logger.error("Could not find JSON in loudnorm output: %s", analysis_output)
raise ValueError("Could not extract loudnorm analysis data")
logger.debug("Found JSON match: %s", json_match.group())
analysis_data = json.loads(json_match.group())
# Second pass: normalize with measured values
logger.debug("Second pass: normalizing %s with measured values", input_path)
stream = ffmpeg.input(str(input_path))
stream = ffmpeg.filter(
stream,
"loudnorm",
measured_I=analysis_data["input_i"],
measured_LRA=analysis_data["input_lra"],
measured_TP=analysis_data["input_tp"],
measured_thresh=analysis_data["input_thresh"],
offset=analysis_data["target_offset"],
)
# Apply output format and bitrate
output_args = {}
if self.output_format == "mp3":
output_args["acodec"] = "libmp3lame"
output_args["audio_bitrate"] = self.output_bitrate
elif self.output_format == "aac":
output_args["acodec"] = "aac"
output_args["audio_bitrate"] = self.output_bitrate
elif self.output_format == "opus":
output_args["acodec"] = "libopus"
output_args["audio_bitrate"] = self.output_bitrate
stream = ffmpeg.output(stream, str(output_path), **output_args)
stream = ffmpeg.overwrite_output(stream)
try:
ffmpeg.run(stream, quiet=True, overwrite_output=True)
logger.info("Two-pass normalization completed: %s", output_path)
except ffmpeg.Error as e:
logger.error("FFmpeg second pass failed for %s. Stdout: %s, Stderr: %s",
input_path, e.stdout.decode() if e.stdout else "None",
e.stderr.decode() if e.stderr else "None")
raise
except Exception as e:
logger.exception("Two-pass normalization failed for %s", input_path)
raise
async def normalize_sound(
self,
sound: Sound,
force: bool = False,
one_pass: bool | None = None,
sound_data: dict | None = None,
) -> NormalizationInfo:
"""Normalize a single sound."""
# Use provided sound_data to avoid detached instance issues, or capture from sound
if sound_data:
filename = sound_data["filename"]
sound_id = sound_data["id"]
is_normalized = sound_data["is_normalized"]
sound_type = sound_data["type"]
else:
# Fallback to accessing sound properties (for single sound normalization)
filename = sound.filename
sound_id = sound.id
is_normalized = sound.is_normalized
sound_type = sound.type
# Check if already normalized and not forcing
if is_normalized and not force:
return {
"filename": filename,
"status": "skipped",
"reason": "already normalized",
"original_path": None,
"normalized_path": None,
"normalized_filename": None,
"normalized_duration": None,
"normalized_size": None,
"normalized_hash": None,
"id": sound_id,
"error": None,
}
try:
# Get paths using captured data to avoid accessing sound properties
original_path = self._get_original_path_from_data(sound_type, filename)
normalized_path = self._get_normalized_path_from_data(sound_type, filename)
# Check if original file exists
if not original_path.exists():
error_msg = f"Original file not found: {original_path}"
logger.error(error_msg)
return {
"filename": filename,
"status": "error",
"reason": None,
"original_path": str(original_path),
"normalized_path": None,
"normalized_filename": None,
"normalized_duration": None,
"normalized_size": None,
"normalized_hash": None,
"id": sound_id,
"error": error_msg,
}
# Determine which normalization method to use
use_one_pass = one_pass if one_pass is not None else (self.passes == 1)
# Perform normalization
if use_one_pass:
await self._normalize_audio_one_pass(original_path, normalized_path)
else:
await self._normalize_audio_two_pass(original_path, normalized_path)
# Get normalized file info
normalized_duration = self._get_audio_duration(normalized_path)
normalized_size = self._get_file_size(normalized_path)
normalized_hash = self._get_file_hash(normalized_path)
normalized_filename = normalized_path.name
# Update sound in database
update_data = {
"normalized_filename": normalized_filename,
"normalized_duration": normalized_duration,
"normalized_size": normalized_size,
"normalized_hash": normalized_hash,
"is_normalized": True,
}
await self.sound_repo.update(sound, update_data)
logger.info("Normalized sound: %s -> %s", filename, normalized_filename)
return {
"filename": filename,
"status": "normalized",
"reason": None,
"original_path": str(original_path),
"normalized_path": str(normalized_path),
"normalized_filename": normalized_filename,
"normalized_duration": normalized_duration,
"normalized_size": normalized_size,
"normalized_hash": normalized_hash,
"id": sound_id,
"error": None,
}
except Exception as e:
error_msg = str(e)
logger.exception(
"Failed to normalize sound %s",
filename,
)
return {
"filename": filename,
"status": "error",
"reason": None,
"original_path": (
str(original_path) if "original_path" in locals() else None
),
"normalized_path": (
str(normalized_path) if "normalized_path" in locals() else None
),
"normalized_filename": None,
"normalized_duration": None,
"normalized_size": None,
"normalized_hash": None,
"id": sound_id,
"error": error_msg,
}
async def normalize_all_sounds(
self,
force: bool = False,
one_pass: bool | None = None,
) -> NormalizationResults:
"""Normalize all unnormalized sounds."""
logger.info("Starting normalization of all sounds")
results: NormalizationResults = {
"processed": 0,
"normalized": 0,
"skipped": 0,
"errors": 0,
"files": [],
}
# Get sounds to normalize
if force:
# Get all sounds if forcing
sounds = []
for sound_type in self.type_directories.keys():
type_sounds = await self.sound_repo.get_by_type(sound_type)
sounds.extend(type_sounds)
else:
# Get only unnormalized sounds
sounds = await self.sound_repo.get_unnormalized_sounds()
logger.info("Found %d sounds to process", len(sounds))
# Capture all sound data upfront to avoid session detachment issues
sound_data_list = []
for sound in sounds:
sound_data_list.append(
{
"id": sound.id,
"filename": sound.filename,
"type": sound.type,
"is_normalized": sound.is_normalized,
"name": sound.name,
}
)
# Process each sound using captured data
for i, sound in enumerate(sounds):
results["processed"] += 1
# Use captured data to avoid detached instance issues
sound_data = sound_data_list[i]
sound_id = sound_data["id"]
sound_filename = sound_data["filename"]
try:
normalization_info = await self.normalize_sound(
sound,
force=force,
one_pass=one_pass,
sound_data=sound_data,
)
results["files"].append(normalization_info)
if normalization_info["status"] == "normalized":
results["normalized"] += 1
elif normalization_info["status"] == "skipped":
results["skipped"] += 1
elif normalization_info["status"] == "error":
results["errors"] += 1
except Exception as e:
logger.exception(
"Unexpected error processing sound %s",
sound_filename,
)
results["errors"] += 1
results["files"].append(
{
"filename": sound_filename,
"status": "error",
"reason": None,
"original_path": None,
"normalized_path": None,
"normalized_filename": None,
"normalized_duration": None,
"normalized_size": None,
"normalized_hash": None,
"id": sound_id,
"error": str(e),
}
)
logger.info("Normalization completed: %s", results)
return results
async def normalize_sounds_by_type(
self,
sound_type: str,
force: bool = False,
one_pass: bool | None = None,
) -> NormalizationResults:
"""Normalize all sounds of a specific type."""
logger.info("Starting normalization of %s sounds", sound_type)
results: NormalizationResults = {
"processed": 0,
"normalized": 0,
"skipped": 0,
"errors": 0,
"files": [],
}
# Get sounds to normalize
if force:
sounds = await self.sound_repo.get_by_type(sound_type)
else:
sounds = await self.sound_repo.get_unnormalized_sounds_by_type(sound_type)
logger.info("Found %d %s sounds to process", len(sounds), sound_type)
# Capture all sound data upfront to avoid session detachment issues
sound_data_list = []
for sound in sounds:
sound_data_list.append(
{
"id": sound.id,
"filename": sound.filename,
"type": sound.type,
"is_normalized": sound.is_normalized,
"name": sound.name,
}
)
# Process each sound using captured data
for i, sound in enumerate(sounds):
results["processed"] += 1
# Use captured data to avoid detached instance issues
sound_data = sound_data_list[i]
sound_id = sound_data["id"]
sound_filename = sound_data["filename"]
try:
normalization_info = await self.normalize_sound(
sound,
force=force,
one_pass=one_pass,
sound_data=sound_data,
)
results["files"].append(normalization_info)
if normalization_info["status"] == "normalized":
results["normalized"] += 1
elif normalization_info["status"] == "skipped":
results["skipped"] += 1
elif normalization_info["status"] == "error":
results["errors"] += 1
except Exception as e:
logger.exception(
"Unexpected error processing sound %s",
sound_filename,
)
results["errors"] += 1
results["files"].append(
{
"filename": sound_filename,
"status": "error",
"reason": None,
"original_path": None,
"normalized_path": None,
"normalized_filename": None,
"normalized_duration": None,
"normalized_size": None,
"normalized_hash": None,
"id": sound_id,
"error": str(e),
}
)
logger.info("Type normalization completed: %s", results)
return results

View File

@@ -16,6 +16,7 @@ logger = get_logger(__name__)
class FileInfo(TypedDict): class FileInfo(TypedDict):
"""Type definition for file information in scan results.""" """Type definition for file information in scan results."""
filename: str filename: str
status: str status: str
reason: str | None reason: str | None
@@ -29,6 +30,7 @@ class FileInfo(TypedDict):
class ScanResults(TypedDict): class ScanResults(TypedDict):
"""Type definition for scan results.""" """Type definition for scan results."""
scanned: int scanned: int
added: int added: int
updated: int updated: int
@@ -45,15 +47,23 @@ class SoundScannerService:
"""Initialize the sound scanner service.""" """Initialize the sound scanner service."""
self.session = session self.session = session
self.sound_repo = SoundRepository(session) self.sound_repo = SoundRepository(session)
self.supported_extensions = {".mp3", ".wav", ".opus", ".flac", ".ogg", ".m4a", ".aac"} self.supported_extensions = {
".mp3",
".wav",
".opus",
".flac",
".ogg",
".m4a",
".aac",
}
def get_file_hash(self, file_path: Path) -> str: def get_file_hash(self, file_path: Path) -> str:
"""Calculate MD5 hash of a file.""" """Calculate SHA-256 hash of a file."""
hash_md5 = hashlib.md5() hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f: with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""): for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk) hash_sha256.update(chunk)
return hash_md5.hexdigest() return hash_sha256.hexdigest()
def get_audio_duration(self, file_path: Path) -> int: def get_audio_duration(self, file_path: Path) -> int:
"""Get audio duration in milliseconds using ffmpeg.""" """Get audio duration in milliseconds using ffmpeg."""
@@ -76,8 +86,7 @@ class SoundScannerService:
# Replace underscores and hyphens with spaces # Replace underscores and hyphens with spaces
name = name.replace("_", " ").replace("-", " ") name = name.replace("_", " ").replace("-", " ")
# Capitalize words # Capitalize words
name = " ".join(word.capitalize() for word in name.split()) return " ".join(word.capitalize() for word in name.split())
return name
async def scan_directory( async def scan_directory(
self, self,
@@ -113,7 +122,8 @@ class SoundScannerService:
# Get all audio files from directory # Get all audio files from directory
audio_files = [ audio_files = [
f for f in scan_path.iterdir() f
for f in scan_path.iterdir()
if f.is_file() and f.suffix.lower() in self.supported_extensions if f.is_file() and f.suffix.lower() in self.supported_extensions
] ]
@@ -134,17 +144,19 @@ class SoundScannerService:
except Exception as e: except Exception as e:
logger.exception("Error processing file %s", file_path) logger.exception("Error processing file %s", file_path)
results["errors"] += 1 results["errors"] += 1
results["files"].append({ results["files"].append(
"filename": filename, {
"status": "error", "filename": filename,
"reason": None, "status": "error",
"name": None, "reason": None,
"duration": None, "name": None,
"size": None, "duration": None,
"id": None, "size": None,
"error": str(e), "id": None,
"changes": None, "error": str(e),
}) "changes": None,
}
)
# Delete sounds that no longer exist in directory # Delete sounds that no longer exist in directory
for filename, sound in sounds_by_filename.items(): for filename, sound in sounds_by_filename.items():
@@ -153,31 +165,35 @@ class SoundScannerService:
await self.sound_repo.delete(sound) await self.sound_repo.delete(sound)
logger.info("Deleted sound no longer in directory: %s", filename) logger.info("Deleted sound no longer in directory: %s", filename)
results["deleted"] += 1 results["deleted"] += 1
results["files"].append({ results["files"].append(
"filename": filename, {
"status": "deleted", "filename": filename,
"reason": "file no longer exists", "status": "deleted",
"name": sound.name, "reason": "file no longer exists",
"duration": sound.duration, "name": sound.name,
"size": sound.size, "duration": sound.duration,
"id": sound.id, "size": sound.size,
"error": None, "id": sound.id,
"changes": None, "error": None,
}) "changes": None,
}
)
except Exception as e: except Exception as e:
logger.exception("Error deleting sound %s", filename) logger.exception("Error deleting sound %s", filename)
results["errors"] += 1 results["errors"] += 1
results["files"].append({ results["files"].append(
"filename": filename, {
"status": "error", "filename": filename,
"reason": "failed to delete", "status": "error",
"name": sound.name, "reason": "failed to delete",
"duration": sound.duration, "name": sound.name,
"size": sound.size, "duration": sound.duration,
"id": sound.id, "size": sound.size,
"error": str(e), "id": sound.id,
"changes": None, "error": str(e),
}) "changes": None,
}
)
logger.info("Sync completed: %s", results) logger.info("Sync completed: %s", results)
return results return results
@@ -215,17 +231,19 @@ class SoundScannerService:
logger.info("Added new sound: %s (ID: %s)", sound.name, sound.id) logger.info("Added new sound: %s (ID: %s)", sound.name, sound.id)
results["added"] += 1 results["added"] += 1
results["files"].append({ results["files"].append(
"filename": filename, {
"status": "added", "filename": filename,
"reason": None, "status": "added",
"name": name, "reason": None,
"duration": duration, "name": name,
"size": size, "duration": duration,
"id": sound.id, "size": size,
"error": None, "id": sound.id,
"changes": None, "error": None,
}) "changes": None,
}
)
elif existing_sound.hash != file_hash: elif existing_sound.hash != file_hash:
# Update existing sound (file was modified) # Update existing sound (file was modified)
@@ -240,33 +258,37 @@ class SoundScannerService:
logger.info("Updated modified sound: %s (ID: %s)", name, existing_sound.id) logger.info("Updated modified sound: %s (ID: %s)", name, existing_sound.id)
results["updated"] += 1 results["updated"] += 1
results["files"].append({ results["files"].append(
"filename": filename, {
"status": "updated", "filename": filename,
"reason": "file was modified", "status": "updated",
"name": name, "reason": "file was modified",
"duration": duration, "name": name,
"size": size, "duration": duration,
"id": existing_sound.id, "size": size,
"error": None, "id": existing_sound.id,
"changes": ["hash", "duration", "size", "name"], "error": None,
}) "changes": ["hash", "duration", "size", "name"],
}
)
else: else:
# File unchanged, skip # File unchanged, skip
logger.debug("Sound unchanged: %s", filename) logger.debug("Sound unchanged: %s", filename)
results["skipped"] += 1 results["skipped"] += 1
results["files"].append({ results["files"].append(
"filename": filename, {
"status": "skipped", "filename": filename,
"reason": "file unchanged", "status": "skipped",
"name": existing_sound.name, "reason": "file unchanged",
"duration": existing_sound.duration, "name": existing_sound.name,
"size": existing_sound.size, "duration": existing_sound.duration,
"id": existing_sound.id, "size": existing_sound.size,
"error": None, "id": existing_sound.id,
"changes": None, "error": None,
}) "changes": None,
}
)
async def scan_soundboard_directory(self) -> ScanResults: async def scan_soundboard_directory(self) -> ScanResults:
"""Sync the default soundboard directory.""" """Sync the default soundboard directory."""

View File

@@ -0,0 +1,613 @@
"""Tests for sound normalizer API endpoints."""
from unittest.mock import patch
import pytest
from httpx import ASGITransport, AsyncClient
from app.models.user import User
from app.services.sound_normalizer import NormalizationResults
class TestSoundNormalizerEndpoints:
"""Test sound normalizer API endpoints."""
@pytest.mark.asyncio
async def test_normalize_all_sounds_success(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test successful normalization of all sounds."""
mock_results: NormalizationResults = {
"processed": 3,
"normalized": 2,
"skipped": 1,
"errors": 0,
"files": [
{
"filename": "test1.mp3",
"status": "normalized",
"reason": None,
"original_path": "/fake/test1.mp3",
"normalized_path": "/fake/test1_normalized.mp3",
"normalized_filename": "test1_normalized.mp3",
"normalized_duration": 5000,
"normalized_size": 1024,
"normalized_hash": "norm_hash1",
"id": 1,
"error": None,
},
{
"filename": "test2.wav",
"status": "normalized",
"reason": None,
"original_path": "/fake/test2.wav",
"normalized_path": "/fake/test2_normalized.mp3",
"normalized_filename": "test2_normalized.mp3",
"normalized_duration": 7000,
"normalized_size": 2048,
"normalized_hash": "norm_hash2",
"id": 2,
"error": None,
},
{
"filename": "test3.mp3",
"status": "skipped",
"reason": "already normalized",
"original_path": None,
"normalized_path": None,
"normalized_filename": None,
"normalized_duration": None,
"normalized_size": None,
"normalized_hash": None,
"id": 3,
"error": None,
},
],
}
with patch(
"app.services.sound_normalizer.SoundNormalizerService.normalize_all_sounds"
) as mock_normalize:
mock_normalize.return_value = mock_results
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/all"
)
assert response.status_code == 200
data = response.json()
assert "message" in data
assert "Sound normalization completed" in data["message"]
assert "results" in data
results = data["results"]
assert results["processed"] == 3
assert results["normalized"] == 2
assert results["skipped"] == 1
assert results["errors"] == 0
assert len(results["files"]) == 3
@pytest.mark.asyncio
async def test_normalize_all_sounds_with_force(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test normalization with force parameter."""
mock_results: NormalizationResults = {
"processed": 1,
"normalized": 1,
"skipped": 0,
"errors": 0,
"files": [],
}
with patch(
"app.services.sound_normalizer.SoundNormalizerService.normalize_all_sounds"
) as mock_normalize:
mock_normalize.return_value = mock_results
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/all", params={"force": True}
)
assert response.status_code == 200
# Verify force parameter was passed
mock_normalize.assert_called_once_with(force=True, one_pass=None)
@pytest.mark.asyncio
async def test_normalize_all_sounds_with_one_pass(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test normalization with one_pass parameter."""
mock_results: NormalizationResults = {
"processed": 1,
"normalized": 1,
"skipped": 0,
"errors": 0,
"files": [],
}
with patch(
"app.services.sound_normalizer.SoundNormalizerService.normalize_all_sounds"
) as mock_normalize:
mock_normalize.return_value = mock_results
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/all", params={"one_pass": True}
)
assert response.status_code == 200
# Verify one_pass parameter was passed
mock_normalize.assert_called_once_with(force=False, one_pass=True)
@pytest.mark.asyncio
async def test_normalize_all_sounds_unauthenticated(self, client: AsyncClient):
"""Test normalizing sounds without authentication."""
response = await client.post("/api/v1/sounds/normalize/all")
assert response.status_code == 401
data = response.json()
assert "Could not validate credentials" in data["detail"]
@pytest.mark.asyncio
async def test_normalize_all_sounds_non_admin(
self,
test_app,
test_user: User,
):
"""Test normalizing sounds with non-admin user."""
from app.core.dependencies import get_current_active_user_flexible
# Override the dependency to return regular user
async def override_get_current_user():
test_user.role = "user"
return test_user
test_app.dependency_overrides[get_current_active_user_flexible] = (
override_get_current_user
)
headers = {"API-TOKEN": "test_api_token"}
async with AsyncClient(
transport=ASGITransport(app=test_app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/sounds/normalize/all", headers=headers
)
assert response.status_code == 403
data = response.json()
assert "Only administrators can normalize sounds" in data["detail"]
# Clean up override
test_app.dependency_overrides.pop(get_current_active_user_flexible, None)
@pytest.mark.asyncio
async def test_normalize_all_sounds_service_error(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test normalization when service raises an error."""
with patch(
"app.services.sound_normalizer.SoundNormalizerService.normalize_all_sounds"
) as mock_normalize:
mock_normalize.side_effect = Exception("Normalization service failed")
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/all"
)
assert response.status_code == 500
data = response.json()
assert "Failed to normalize sounds" in data["detail"]
assert "Normalization service failed" in data["detail"]
@pytest.mark.asyncio
async def test_normalize_sounds_by_type_success(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test successful normalization by sound type."""
mock_results: NormalizationResults = {
"processed": 2,
"normalized": 2,
"skipped": 0,
"errors": 0,
"files": [
{
"filename": "sdb1.mp3",
"status": "normalized",
"reason": None,
"original_path": "/fake/sdb1.mp3",
"normalized_path": "/fake/sdb1_normalized.mp3",
"normalized_filename": "sdb1_normalized.mp3",
"normalized_duration": 4000,
"normalized_size": 800,
"normalized_hash": "sdb_hash1",
"id": 10,
"error": None,
},
{
"filename": "sdb2.wav",
"status": "normalized",
"reason": None,
"original_path": "/fake/sdb2.wav",
"normalized_path": "/fake/sdb2_normalized.mp3",
"normalized_filename": "sdb2_normalized.mp3",
"normalized_duration": 6000,
"normalized_size": 1200,
"normalized_hash": "sdb_hash2",
"id": 11,
"error": None,
},
],
}
with patch(
"app.services.sound_normalizer.SoundNormalizerService.normalize_sounds_by_type"
) as mock_normalize:
mock_normalize.return_value = mock_results
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/type/SDB"
)
assert response.status_code == 200
data = response.json()
assert "Normalization of SDB sounds completed" in data["message"]
assert "results" in data
results = data["results"]
assert results["processed"] == 2
assert results["normalized"] == 2
assert len(results["files"]) == 2
# Verify the service was called with correct type
mock_normalize.assert_called_once_with(
sound_type="SDB", force=False, one_pass=None
)
@pytest.mark.asyncio
async def test_normalize_sounds_by_type_invalid_type(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test normalization with invalid sound type."""
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/type/INVALID"
)
assert response.status_code == 400
data = response.json()
assert "Invalid sound type" in data["detail"]
assert "Must be one of: SDB, TTS, EXT" in data["detail"]
@pytest.mark.asyncio
async def test_normalize_sounds_by_type_with_params(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test normalization by type with force and one_pass parameters."""
mock_results: NormalizationResults = {
"processed": 1,
"normalized": 1,
"skipped": 0,
"errors": 0,
"files": [],
}
with patch(
"app.services.sound_normalizer.SoundNormalizerService.normalize_sounds_by_type"
) as mock_normalize:
mock_normalize.return_value = mock_results
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/type/TTS",
params={"force": True, "one_pass": False},
)
assert response.status_code == 200
# Verify parameters were passed correctly
mock_normalize.assert_called_once_with(
sound_type="TTS", force=True, one_pass=False
)
@pytest.mark.asyncio
async def test_normalize_sound_by_id_success(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test successful normalization of a specific sound."""
# Mock the sound
mock_sound = type(
"Sound",
(),
{
"id": 42,
"filename": "specific_sound.mp3",
"type": "SDB",
"name": "Specific Sound",
},
)()
# Mock normalization result
mock_result = {
"filename": "specific_sound.mp3",
"status": "normalized",
"reason": None,
"original_path": "/fake/specific_sound.mp3",
"normalized_path": "/fake/specific_sound_normalized.mp3",
"normalized_filename": "specific_sound_normalized.mp3",
"normalized_duration": 8000,
"normalized_size": 1600,
"normalized_hash": "specific_hash",
"id": 42,
"error": None,
}
with patch(
"app.services.sound_normalizer.SoundNormalizerService.normalize_sound"
) as mock_normalize_sound, patch(
"app.repositories.sound.SoundRepository.get_by_id"
) as mock_get_sound:
mock_get_sound.return_value = mock_sound
mock_normalize_sound.return_value = mock_result
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/42"
)
assert response.status_code == 200
data = response.json()
assert "Sound normalization normalized" in data["message"]
assert "specific_sound.mp3" in data["message"]
assert data["status"] == "normalized"
assert data["normalized_filename"] == "specific_sound_normalized.mp3"
# Verify sound was retrieved and normalized
mock_get_sound.assert_called_once_with(42)
mock_normalize_sound.assert_called_once()
@pytest.mark.asyncio
async def test_normalize_sound_by_id_not_found(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test normalization of non-existent sound."""
with patch(
"app.repositories.sound.SoundRepository.get_by_id"
) as mock_get_sound:
mock_get_sound.return_value = None
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/999"
)
assert response.status_code == 404
data = response.json()
assert "Sound with ID 999 not found" in data["detail"]
@pytest.mark.asyncio
async def test_normalize_sound_by_id_normalization_error(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test normalization when the sound normalization fails."""
# Mock the sound
mock_sound = type(
"Sound",
(),
{
"id": 42,
"filename": "error_sound.mp3",
"type": "SDB",
"name": "Error Sound",
},
)()
# Mock normalization error result
mock_result = {
"filename": "error_sound.mp3",
"status": "error",
"reason": None,
"original_path": "/fake/error_sound.mp3",
"normalized_path": None,
"normalized_filename": None,
"normalized_duration": None,
"normalized_size": None,
"normalized_hash": None,
"id": 42,
"error": "File format not supported",
}
with patch(
"app.services.sound_normalizer.SoundNormalizerService.normalize_sound"
) as mock_normalize_sound, patch(
"app.repositories.sound.SoundRepository.get_by_id"
) as mock_get_sound:
mock_get_sound.return_value = mock_sound
mock_normalize_sound.return_value = mock_result
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/42"
)
assert response.status_code == 500
data = response.json()
assert "Failed to normalize sound" in data["detail"]
assert "File format not supported" in data["detail"]
@pytest.mark.asyncio
async def test_normalize_sound_by_id_with_params(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test sound normalization with force and one_pass parameters."""
# Mock the sound
mock_sound = type(
"Sound",
(),
{
"id": 42,
"filename": "param_sound.mp3",
"type": "SDB",
"name": "Param Sound",
},
)()
# Mock normalization result
mock_result = {
"filename": "param_sound.mp3",
"status": "normalized",
"reason": None,
"original_path": "/fake/param_sound.mp3",
"normalized_path": "/fake/param_sound_normalized.mp3",
"normalized_filename": "param_sound_normalized.mp3",
"normalized_duration": 5000,
"normalized_size": 1000,
"normalized_hash": "param_hash",
"id": 42,
"error": None,
}
with patch(
"app.services.sound_normalizer.SoundNormalizerService.normalize_sound"
) as mock_normalize_sound, patch(
"app.repositories.sound.SoundRepository.get_by_id"
) as mock_get_sound:
mock_get_sound.return_value = mock_sound
mock_normalize_sound.return_value = mock_result
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/42",
params={"force": True, "one_pass": True},
)
assert response.status_code == 200
# Verify parameters were passed to normalize_sound
call_args = mock_normalize_sound.call_args
assert call_args[1]["force"] == True
assert call_args[1]["one_pass"] == True
@pytest.mark.asyncio
async def test_normalize_sound_by_id_skipped(
self,
authenticated_admin_client: AsyncClient,
admin_user: User,
):
"""Test normalization when sound is already normalized and not forced."""
# Mock the sound
mock_sound = type(
"Sound",
(),
{
"id": 42,
"filename": "already_normalized.mp3",
"type": "SDB",
"name": "Already Normalized",
},
)()
# Mock skipped result
mock_result = {
"filename": "already_normalized.mp3",
"status": "skipped",
"reason": "already normalized",
"original_path": None,
"normalized_path": None,
"normalized_filename": None,
"normalized_duration": None,
"normalized_size": None,
"normalized_hash": None,
"id": 42,
"error": None,
}
with patch(
"app.services.sound_normalizer.SoundNormalizerService.normalize_sound"
) as mock_normalize_sound, patch(
"app.repositories.sound.SoundRepository.get_by_id"
) as mock_get_sound:
mock_get_sound.return_value = mock_sound
mock_normalize_sound.return_value = mock_result
response = await authenticated_admin_client.post(
"/api/v1/sounds/normalize/42"
)
assert response.status_code == 200
data = response.json()
assert "Sound normalization skipped" in data["message"]
assert data["status"] == "skipped"
assert data["reason"] == "already normalized"
@pytest.mark.asyncio
async def test_normalize_sound_by_id_unauthenticated(self, client: AsyncClient):
"""Test normalizing a specific sound without authentication."""
response = await client.post("/api/v1/sounds/normalize/42")
assert response.status_code == 401
data = response.json()
assert "Could not validate credentials" in data["detail"]
@pytest.mark.asyncio
async def test_normalize_sound_by_id_non_admin(
self,
test_app,
test_user: User,
):
"""Test normalizing a specific sound with non-admin user."""
from app.core.dependencies import get_current_active_user_flexible
# Override the dependency to return regular user
async def override_get_current_user():
test_user.role = "user"
return test_user
test_app.dependency_overrides[get_current_active_user_flexible] = (
override_get_current_user
)
headers = {"API-TOKEN": "test_api_token"}
async with AsyncClient(
transport=ASGITransport(app=test_app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/sounds/normalize/42", headers=headers
)
assert response.status_code == 403
data = response.json()
assert "Only administrators can normalize sounds" in data["detail"]
# Clean up override
test_app.dependency_overrides.pop(get_current_active_user_flexible, None)

View File

@@ -0,0 +1,559 @@
"""Tests for sound normalizer service."""
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, Mock, patch
import pytest
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.sound import Sound
from app.services.sound_normalizer import SoundNormalizerService
class TestSoundNormalizerService:
"""Test sound normalizer service."""
@pytest.fixture
def mock_session(self):
"""Create a mock session."""
return Mock(spec=AsyncSession)
@pytest.fixture
def normalizer_service(self, mock_session):
"""Create a normalizer service with mock session."""
with patch("app.services.sound_normalizer.settings") as mock_settings:
mock_settings.NORMALIZED_AUDIO_FORMAT = "mp3"
mock_settings.NORMALIZED_AUDIO_BITRATE = "256k"
mock_settings.NORMALIZED_AUDIO_PASSES = 2
return SoundNormalizerService(mock_session)
def test_init(self, normalizer_service):
"""Test normalizer service initialization."""
assert normalizer_service.session is not None
assert normalizer_service.sound_repo is not None
assert normalizer_service.output_format == "mp3"
assert normalizer_service.output_bitrate == "256k"
assert normalizer_service.passes == 2
assert len(normalizer_service.type_directories) == 3
assert "SDB" in normalizer_service.type_directories
assert "TTS" in normalizer_service.type_directories
assert "EXT" in normalizer_service.type_directories
def test_get_normalized_path(self, normalizer_service):
"""Test normalized path generation."""
sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test_audio.mp3",
duration=5000,
size=1024,
hash="test_hash",
)
normalized_path = normalizer_service._get_normalized_path(sound)
assert "sounds/normalized/soundboard" in str(normalized_path)
assert "test_audio.mp3" == normalized_path.name
def test_get_original_path(self, normalizer_service):
"""Test original path generation."""
sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test_audio.wav",
duration=5000,
size=1024,
hash="test_hash",
)
original_path = normalizer_service._get_original_path(sound)
assert "sounds/originals/soundboard" in str(original_path)
assert "test_audio.wav" == original_path.name
def test_get_file_hash(self, normalizer_service):
"""Test file hash calculation."""
# Create a temporary file
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
f.write("test content for hash")
temp_path = Path(f.name)
try:
hash_value = normalizer_service._get_file_hash(temp_path)
assert len(hash_value) == 64 # SHA-256 hash length
assert isinstance(hash_value, str)
finally:
temp_path.unlink()
def test_get_file_size(self, normalizer_service):
"""Test file size calculation."""
# Create a temporary file
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
f.write("test content for size")
temp_path = Path(f.name)
try:
size = normalizer_service._get_file_size(temp_path)
assert size > 0
assert isinstance(size, int)
finally:
temp_path.unlink()
@patch("app.services.sound_normalizer.ffmpeg.probe")
def test_get_audio_duration_success(self, mock_probe, normalizer_service):
"""Test successful audio duration extraction."""
mock_probe.return_value = {"format": {"duration": "123.456"}}
temp_path = Path("/fake/path/test.mp3")
duration = normalizer_service._get_audio_duration(temp_path)
assert duration == 123456 # 123.456 seconds * 1000 = 123456 ms
mock_probe.assert_called_once_with(str(temp_path))
@patch("app.services.sound_normalizer.ffmpeg.probe")
def test_get_audio_duration_failure(self, mock_probe, normalizer_service):
"""Test audio duration extraction failure."""
mock_probe.side_effect = Exception("FFmpeg error")
temp_path = Path("/fake/path/test.mp3")
duration = normalizer_service._get_audio_duration(temp_path)
assert duration == 0
mock_probe.assert_called_once_with(str(temp_path))
@pytest.mark.asyncio
async def test_normalize_sound_already_normalized(self, normalizer_service):
"""Test normalizing a sound that's already normalized."""
sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=5000,
size=1024,
hash="test_hash",
is_normalized=True,
)
result = await normalizer_service.normalize_sound(sound)
assert result["status"] == "skipped"
assert result["reason"] == "already normalized"
assert result["filename"] == "test.mp3"
assert result["id"] == 1
@pytest.mark.asyncio
async def test_normalize_sound_force_already_normalized(self, normalizer_service):
"""Test force normalizing a sound that's already normalized."""
sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=5000,
size=1024,
hash="test_hash",
is_normalized=True,
)
# Mock file operations and ffmpeg
with patch.object(normalizer_service, "_get_original_path") as mock_orig_path, \
patch.object(normalizer_service, "_get_normalized_path") as mock_norm_path, \
patch.object(normalizer_service, "_normalize_audio_two_pass") as mock_normalize, \
patch.object(normalizer_service, "_get_audio_duration", return_value=6000), \
patch.object(normalizer_service, "_get_file_size", return_value=2048), \
patch.object(normalizer_service, "_get_file_hash", return_value="new_hash"):
# Setup path mocks
mock_orig_path.return_value = Path("/fake/original.mp3")
mock_norm_path.return_value = Path("/fake/normalized.mp3")
# Mock file existence
with patch("pathlib.Path.exists", return_value=True):
# Mock repository update
normalizer_service.sound_repo.update = AsyncMock()
result = await normalizer_service.normalize_sound(sound, force=True)
assert result["status"] == "normalized"
assert result["filename"] == "test.mp3"
assert result["normalized_duration"] == 6000
assert result["normalized_size"] == 2048
assert result["normalized_hash"] == "new_hash"
# Verify update was called
normalizer_service.sound_repo.update.assert_called_once()
@pytest.mark.asyncio
async def test_normalize_sound_file_not_found(self, normalizer_service):
"""Test normalizing a sound where original file doesn't exist."""
sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="missing.mp3",
duration=5000,
size=1024,
hash="test_hash",
is_normalized=False,
)
with patch.object(normalizer_service, "_get_original_path") as mock_path:
mock_path.return_value = Path("/fake/missing.mp3")
# Mock file doesn't exist
with patch("pathlib.Path.exists", return_value=False):
result = await normalizer_service.normalize_sound(sound)
assert result["status"] == "error"
assert "Original file not found" in result["error"]
assert result["filename"] == "missing.mp3"
@pytest.mark.asyncio
async def test_normalize_sound_one_pass(self, normalizer_service):
"""Test normalizing a sound using one-pass method."""
sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=5000,
size=1024,
hash="test_hash",
is_normalized=False,
)
with patch.object(normalizer_service, "_get_original_path") as mock_orig_path, \
patch.object(normalizer_service, "_get_normalized_path") as mock_norm_path, \
patch.object(normalizer_service, "_normalize_audio_one_pass") as mock_normalize, \
patch.object(normalizer_service, "_get_audio_duration", return_value=5500), \
patch.object(normalizer_service, "_get_file_size", return_value=1500), \
patch.object(normalizer_service, "_get_file_hash", return_value="norm_hash"):
# Setup path mocks
mock_orig_path.return_value = Path("/fake/original.mp3")
mock_norm_path.return_value = Path("/fake/normalized.mp3")
# Mock file existence
with patch("pathlib.Path.exists", return_value=True):
# Mock repository update
normalizer_service.sound_repo.update = AsyncMock()
result = await normalizer_service.normalize_sound(sound, one_pass=True)
assert result["status"] == "normalized"
assert result["normalized_duration"] == 5500
assert result["normalized_size"] == 1500
assert result["normalized_hash"] == "norm_hash"
# Verify one-pass was used
mock_normalize.assert_called_once()
@pytest.mark.asyncio
async def test_normalize_sound_normalization_error(self, normalizer_service):
"""Test handling normalization errors."""
sound = Sound(
id=1,
type="SDB",
name="Test Sound",
filename="test.mp3",
duration=5000,
size=1024,
hash="test_hash",
is_normalized=False,
)
with patch.object(normalizer_service, "_get_original_path") as mock_orig_path, \
patch.object(normalizer_service, "_get_normalized_path") as mock_norm_path:
# Setup path mocks
mock_orig_path.return_value = Path("/fake/original.mp3")
mock_norm_path.return_value = Path("/fake/normalized.mp3")
# Mock file existence but normalization fails
with patch("pathlib.Path.exists", return_value=True), \
patch.object(normalizer_service, "_normalize_audio_two_pass") as mock_normalize:
mock_normalize.side_effect = Exception("Normalization failed")
result = await normalizer_service.normalize_sound(sound)
assert result["status"] == "error"
assert "Normalization failed" in result["error"]
assert result["filename"] == "test.mp3"
@pytest.mark.asyncio
async def test_normalize_all_sounds(self, normalizer_service):
"""Test normalizing all unnormalized sounds."""
sounds = [
Sound(
id=1,
type="SDB",
name="Sound 1",
filename="sound1.mp3",
duration=5000,
size=1024,
hash="hash1",
is_normalized=False,
),
Sound(
id=2,
type="TTS",
name="Sound 2",
filename="sound2.wav",
duration=3000,
size=512,
hash="hash2",
is_normalized=False,
),
]
# Mock repository calls
normalizer_service.sound_repo.get_unnormalized_sounds = AsyncMock(return_value=sounds)
# Mock individual normalization
with patch.object(normalizer_service, "normalize_sound") as mock_normalize:
mock_normalize.side_effect = [
{
"filename": "sound1.mp3",
"status": "normalized",
"reason": None,
"original_path": "/fake/sound1.mp3",
"normalized_path": "/fake/sound1_normalized.mp3",
"normalized_filename": "sound1_normalized.mp3",
"normalized_duration": 5000,
"normalized_size": 1024,
"normalized_hash": "norm_hash1",
"id": 1,
"error": None,
},
{
"filename": "sound2.wav",
"status": "normalized",
"reason": None,
"original_path": "/fake/sound2.wav",
"normalized_path": "/fake/sound2_normalized.mp3",
"normalized_filename": "sound2_normalized.mp3",
"normalized_duration": 3000,
"normalized_size": 512,
"normalized_hash": "norm_hash2",
"id": 2,
"error": None,
},
]
results = await normalizer_service.normalize_all_sounds()
assert results["processed"] == 2
assert results["normalized"] == 2
assert results["skipped"] == 0
assert results["errors"] == 0
assert len(results["files"]) == 2
@pytest.mark.asyncio
async def test_normalize_sounds_by_type(self, normalizer_service):
"""Test normalizing sounds by type."""
sdb_sounds = [
Sound(
id=1,
type="SDB",
name="SDB Sound",
filename="sdb.mp3",
duration=5000,
size=1024,
hash="sdb_hash",
is_normalized=False,
),
]
# Mock repository calls
normalizer_service.sound_repo.get_unnormalized_sounds_by_type = AsyncMock(
return_value=sdb_sounds
)
# Mock individual normalization
with patch.object(normalizer_service, "normalize_sound") as mock_normalize:
mock_normalize.return_value = {
"filename": "sdb.mp3",
"status": "normalized",
"reason": None,
"original_path": "/fake/sdb.mp3",
"normalized_path": "/fake/sdb_normalized.mp3",
"normalized_filename": "sdb_normalized.mp3",
"normalized_duration": 5000,
"normalized_size": 1024,
"normalized_hash": "sdb_norm_hash",
"id": 1,
"error": None,
}
results = await normalizer_service.normalize_sounds_by_type("SDB")
assert results["processed"] == 1
assert results["normalized"] == 1
assert results["skipped"] == 0
assert results["errors"] == 0
assert len(results["files"]) == 1
# Verify correct repository method was called
normalizer_service.sound_repo.get_unnormalized_sounds_by_type.assert_called_once_with("SDB")
@pytest.mark.asyncio
async def test_normalize_sounds_with_errors(self, normalizer_service):
"""Test normalizing sounds with some errors."""
sounds = [
Sound(
id=1,
type="SDB",
name="Good Sound",
filename="good.mp3",
duration=5000,
size=1024,
hash="good_hash",
is_normalized=False,
),
Sound(
id=2,
type="SDB",
name="Bad Sound",
filename="bad.mp3",
duration=3000,
size=512,
hash="bad_hash",
is_normalized=False,
),
]
# Mock repository calls
normalizer_service.sound_repo.get_unnormalized_sounds = AsyncMock(return_value=sounds)
# Mock individual normalization with one success and one error
with patch.object(normalizer_service, "normalize_sound") as mock_normalize:
mock_normalize.side_effect = [
{
"filename": "good.mp3",
"status": "normalized",
"reason": None,
"original_path": "/fake/good.mp3",
"normalized_path": "/fake/good_normalized.mp3",
"normalized_filename": "good_normalized.mp3",
"normalized_duration": 5000,
"normalized_size": 1024,
"normalized_hash": "good_norm_hash",
"id": 1,
"error": None,
},
{
"filename": "bad.mp3",
"status": "error",
"reason": None,
"original_path": "/fake/bad.mp3",
"normalized_path": None,
"normalized_filename": None,
"normalized_duration": None,
"normalized_size": None,
"normalized_hash": None,
"id": 2,
"error": "File processing failed",
},
]
results = await normalizer_service.normalize_all_sounds()
assert results["processed"] == 2
assert results["normalized"] == 1
assert results["skipped"] == 0
assert results["errors"] == 1
assert len(results["files"]) == 2
# Check error file details
error_file = next(f for f in results["files"] if f["status"] == "error")
assert error_file["filename"] == "bad.mp3"
assert error_file["error"] == "File processing failed"
@pytest.mark.asyncio
@patch("app.services.sound_normalizer.ffmpeg")
async def test_normalize_audio_one_pass_mp3(
self,
mock_ffmpeg,
normalizer_service,
):
"""Test one-pass audio normalization for MP3."""
input_path = Path("/fake/input.wav")
output_path = Path("/fake/output.mp3")
# Mock ffmpeg chain
mock_stream = Mock()
mock_ffmpeg.input.return_value = mock_stream
mock_ffmpeg.filter.return_value = mock_stream
mock_ffmpeg.output.return_value = mock_stream
mock_ffmpeg.overwrite_output.return_value = mock_stream
await normalizer_service._normalize_audio_one_pass(input_path, output_path)
# Verify ffmpeg chain was called correctly
mock_ffmpeg.input.assert_called_once_with(str(input_path))
mock_ffmpeg.filter.assert_called_once_with(mock_stream, "loudnorm", I=-23, TP=-2, LRA=7)
mock_ffmpeg.output.assert_called_once()
mock_ffmpeg.run.assert_called_once()
# Check output arguments include MP3 codec and bitrate
output_call_args = mock_ffmpeg.output.call_args
assert output_call_args[0][1] == str(output_path) # output path
output_kwargs = output_call_args[1]
assert output_kwargs["acodec"] == "libmp3lame"
assert output_kwargs["audio_bitrate"] == "256k"
@pytest.mark.asyncio
@patch("app.services.sound_normalizer.ffmpeg")
async def test_normalize_audio_two_pass_analysis(
self,
mock_ffmpeg,
normalizer_service,
):
"""Test two-pass audio normalization analysis phase."""
input_path = Path("/fake/input.wav")
output_path = Path("/fake/output.mp3")
# Mock ffmpeg chain
mock_stream = Mock()
mock_ffmpeg.input.return_value = mock_stream
mock_ffmpeg.filter.return_value = mock_stream
mock_ffmpeg.output.return_value = mock_stream
mock_ffmpeg.overwrite_output.return_value = mock_stream
# Mock analysis output with valid JSON
analysis_json = '''{
"input_i": "-23.0",
"input_lra": "11.0",
"input_tp": "-2.0",
"input_thresh": "-33.0",
"target_offset": "0.0"
}'''
mock_ffmpeg.run.side_effect = [
(None, analysis_json.encode("utf-8")), # First pass analysis
None, # Second pass normalization
]
await normalizer_service._normalize_audio_two_pass(input_path, output_path)
# Verify two ffmpeg runs occurred
assert mock_ffmpeg.run.call_count == 2
# Verify analysis pass used print_format=json
first_filter_call = mock_ffmpeg.filter.call_args_list[0]
assert "print_format" in first_filter_call[1]
assert first_filter_call[1]["print_format"] == "json"
# Verify second pass used measured values
second_filter_call = mock_ffmpeg.filter.call_args_list[1]
measured_args = second_filter_call[1]
assert "measured_I" in measured_args
assert "measured_LRA" in measured_args
assert "measured_TP" in measured_args
assert "measured_thresh" in measured_args
assert "offset" in measured_args