- Added SoundNormalizerService for normalizing audio files with support for one-pass and two-pass normalization methods. - Introduced API endpoints for normalizing all sounds and specific sounds by ID, including support for force normalization and handling of already normalized sounds. - Created comprehensive test suite for the sound normalizer service and its API endpoints, covering various scenarios including success, errors, and edge cases. - Refactored sound scanning service to utilize SHA-256 for file hashing instead of MD5 for improved security. - Enhanced logging and error handling throughout the sound normalization process.
297 lines
9.8 KiB
Python
297 lines
9.8 KiB
Python
"""Sound scanner service for scanning and importing audio files."""
|
|
|
|
import hashlib
|
|
from pathlib import Path
|
|
from typing import TypedDict
|
|
|
|
import ffmpeg # type: ignore[import-untyped]
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from app.core.logging import get_logger
|
|
from app.models.sound import Sound
|
|
from app.repositories.sound import SoundRepository
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class FileInfo(TypedDict):
|
|
"""Type definition for file information in scan results."""
|
|
|
|
filename: str
|
|
status: str
|
|
reason: str | None
|
|
name: str | None
|
|
duration: int | None
|
|
size: int | None
|
|
id: int | None
|
|
error: str | None
|
|
changes: list[str] | None
|
|
|
|
|
|
class ScanResults(TypedDict):
|
|
"""Type definition for scan results."""
|
|
|
|
scanned: int
|
|
added: int
|
|
updated: int
|
|
deleted: int
|
|
skipped: int
|
|
errors: int
|
|
files: list[FileInfo]
|
|
|
|
|
|
class SoundScannerService:
|
|
"""Service for scanning and importing audio files."""
|
|
|
|
def __init__(self, session: AsyncSession) -> None:
|
|
"""Initialize the sound scanner service."""
|
|
self.session = session
|
|
self.sound_repo = SoundRepository(session)
|
|
self.supported_extensions = {
|
|
".mp3",
|
|
".wav",
|
|
".opus",
|
|
".flac",
|
|
".ogg",
|
|
".m4a",
|
|
".aac",
|
|
}
|
|
|
|
def get_file_hash(self, file_path: Path) -> str:
|
|
"""Calculate SHA-256 hash of a file."""
|
|
hash_sha256 = hashlib.sha256()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_sha256.update(chunk)
|
|
return hash_sha256.hexdigest()
|
|
|
|
def get_audio_duration(self, file_path: Path) -> int:
|
|
"""Get audio duration in milliseconds using ffmpeg."""
|
|
try:
|
|
probe = ffmpeg.probe(str(file_path))
|
|
duration = float(probe["format"]["duration"])
|
|
return int(duration * 1000) # Convert to milliseconds
|
|
except Exception as e:
|
|
logger.warning("Failed to get duration for %s: %s", file_path, e)
|
|
return 0
|
|
|
|
def get_file_size(self, file_path: Path) -> int:
|
|
"""Get file size in bytes."""
|
|
return file_path.stat().st_size
|
|
|
|
def extract_name_from_filename(self, filename: str) -> str:
|
|
"""Extract a clean name from filename."""
|
|
# Remove extension
|
|
name = Path(filename).stem
|
|
# Replace underscores and hyphens with spaces
|
|
name = name.replace("_", " ").replace("-", " ")
|
|
# Capitalize words
|
|
return " ".join(word.capitalize() for word in name.split())
|
|
|
|
async def scan_directory(
|
|
self,
|
|
directory_path: str,
|
|
sound_type: str = "SDB",
|
|
) -> ScanResults:
|
|
"""Sync a directory with the database (add/update/delete sounds)."""
|
|
scan_path = Path(directory_path)
|
|
|
|
if not scan_path.exists():
|
|
msg = f"Directory does not exist: {directory_path}"
|
|
raise ValueError(msg)
|
|
|
|
if not scan_path.is_dir():
|
|
msg = f"Path is not a directory: {directory_path}"
|
|
raise ValueError(msg)
|
|
|
|
results: ScanResults = {
|
|
"scanned": 0,
|
|
"added": 0,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
|
|
logger.info("Starting sync of directory: %s", directory_path)
|
|
|
|
# Get all existing sounds of this type from database
|
|
existing_sounds = await self.sound_repo.get_by_type(sound_type)
|
|
sounds_by_filename = {sound.filename: sound for sound in existing_sounds}
|
|
|
|
# Get all audio files from directory
|
|
audio_files = [
|
|
f
|
|
for f in scan_path.iterdir()
|
|
if f.is_file() and f.suffix.lower() in self.supported_extensions
|
|
]
|
|
|
|
# Process each file in directory
|
|
processed_filenames = set()
|
|
for file_path in audio_files:
|
|
results["scanned"] += 1
|
|
filename = file_path.name
|
|
processed_filenames.add(filename)
|
|
|
|
try:
|
|
await self._sync_audio_file(
|
|
file_path,
|
|
sound_type,
|
|
sounds_by_filename.get(filename),
|
|
results,
|
|
)
|
|
except Exception as e:
|
|
logger.exception("Error processing file %s", file_path)
|
|
results["errors"] += 1
|
|
results["files"].append(
|
|
{
|
|
"filename": filename,
|
|
"status": "error",
|
|
"reason": None,
|
|
"name": None,
|
|
"duration": None,
|
|
"size": None,
|
|
"id": None,
|
|
"error": str(e),
|
|
"changes": None,
|
|
}
|
|
)
|
|
|
|
# Delete sounds that no longer exist in directory
|
|
for filename, sound in sounds_by_filename.items():
|
|
if filename not in processed_filenames:
|
|
try:
|
|
await self.sound_repo.delete(sound)
|
|
logger.info("Deleted sound no longer in directory: %s", filename)
|
|
results["deleted"] += 1
|
|
results["files"].append(
|
|
{
|
|
"filename": filename,
|
|
"status": "deleted",
|
|
"reason": "file no longer exists",
|
|
"name": sound.name,
|
|
"duration": sound.duration,
|
|
"size": sound.size,
|
|
"id": sound.id,
|
|
"error": None,
|
|
"changes": None,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.exception("Error deleting sound %s", filename)
|
|
results["errors"] += 1
|
|
results["files"].append(
|
|
{
|
|
"filename": filename,
|
|
"status": "error",
|
|
"reason": "failed to delete",
|
|
"name": sound.name,
|
|
"duration": sound.duration,
|
|
"size": sound.size,
|
|
"id": sound.id,
|
|
"error": str(e),
|
|
"changes": None,
|
|
}
|
|
)
|
|
|
|
logger.info("Sync completed: %s", results)
|
|
return results
|
|
|
|
async def _sync_audio_file(
|
|
self,
|
|
file_path: Path,
|
|
sound_type: str,
|
|
existing_sound: Sound | None,
|
|
results: ScanResults,
|
|
) -> None:
|
|
"""Sync a single audio file (add new or update existing)."""
|
|
filename = file_path.name
|
|
file_hash = self.get_file_hash(file_path)
|
|
duration = self.get_audio_duration(file_path)
|
|
size = self.get_file_size(file_path)
|
|
name = self.extract_name_from_filename(filename)
|
|
|
|
if existing_sound is None:
|
|
# Add new sound
|
|
sound_data = {
|
|
"type": sound_type,
|
|
"name": name,
|
|
"filename": filename,
|
|
"duration": duration,
|
|
"size": size,
|
|
"hash": file_hash,
|
|
"is_deletable": False,
|
|
"is_music": False,
|
|
"is_normalized": False,
|
|
"play_count": 0,
|
|
}
|
|
|
|
sound = await self.sound_repo.create(sound_data)
|
|
logger.info("Added new sound: %s (ID: %s)", sound.name, sound.id)
|
|
|
|
results["added"] += 1
|
|
results["files"].append(
|
|
{
|
|
"filename": filename,
|
|
"status": "added",
|
|
"reason": None,
|
|
"name": name,
|
|
"duration": duration,
|
|
"size": size,
|
|
"id": sound.id,
|
|
"error": None,
|
|
"changes": None,
|
|
}
|
|
)
|
|
|
|
elif existing_sound.hash != file_hash:
|
|
# Update existing sound (file was modified)
|
|
update_data = {
|
|
"name": name,
|
|
"duration": duration,
|
|
"size": size,
|
|
"hash": file_hash,
|
|
}
|
|
|
|
await self.sound_repo.update(existing_sound, update_data)
|
|
logger.info("Updated modified sound: %s (ID: %s)", name, existing_sound.id)
|
|
|
|
results["updated"] += 1
|
|
results["files"].append(
|
|
{
|
|
"filename": filename,
|
|
"status": "updated",
|
|
"reason": "file was modified",
|
|
"name": name,
|
|
"duration": duration,
|
|
"size": size,
|
|
"id": existing_sound.id,
|
|
"error": None,
|
|
"changes": ["hash", "duration", "size", "name"],
|
|
}
|
|
)
|
|
|
|
else:
|
|
# File unchanged, skip
|
|
logger.debug("Sound unchanged: %s", filename)
|
|
results["skipped"] += 1
|
|
results["files"].append(
|
|
{
|
|
"filename": filename,
|
|
"status": "skipped",
|
|
"reason": "file unchanged",
|
|
"name": existing_sound.name,
|
|
"duration": existing_sound.duration,
|
|
"size": existing_sound.size,
|
|
"id": existing_sound.id,
|
|
"error": None,
|
|
"changes": None,
|
|
}
|
|
)
|
|
|
|
async def scan_soundboard_directory(self) -> ScanResults:
|
|
"""Sync the default soundboard directory."""
|
|
soundboard_path = "sounds/originals/soundboard"
|
|
return await self.scan_directory(soundboard_path, "SDB")
|