- Implement tests for admin extraction API endpoints including status retrieval, deletion of extractions, and permission checks. - Add tests for user extraction deletion, ensuring proper handling of permissions and non-existent extractions. - Enhance sound endpoint tests to include duplicate handling in responses. - Refactor favorite service tests to utilize mock dependencies for better maintainability and clarity. - Update sound scanner tests to improve file handling and ensure proper deletion of associated files.
620 lines
22 KiB
Python
620 lines
22 KiB
Python
"""Sound scanner service for scanning and importing audio files."""
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import TypedDict
|
|
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from app.core.logging import get_logger
|
|
from app.models.sound import Sound
|
|
from app.repositories.sound import SoundRepository
|
|
from app.utils.audio import get_audio_duration, get_file_hash, get_file_size
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class AudioFileInfo:
|
|
"""Data class for audio file metadata."""
|
|
|
|
filename: str
|
|
name: str
|
|
duration: int
|
|
size: int
|
|
file_hash: str
|
|
|
|
|
|
@dataclass
|
|
class SyncContext:
|
|
"""Context data for audio file synchronization."""
|
|
|
|
file_path: Path
|
|
sound_type: str
|
|
existing_sound_by_hash: dict | Sound | None
|
|
existing_sound_by_filename: dict | Sound | None
|
|
file_hash: str
|
|
|
|
|
|
class FileInfo(TypedDict):
|
|
"""Type definition for file information in scan results."""
|
|
|
|
filename: str
|
|
status: str
|
|
reason: str | None
|
|
name: str | None
|
|
duration: int | None
|
|
size: int | None
|
|
id: int | None
|
|
error: str | None
|
|
changes: list[str] | None
|
|
|
|
|
|
class ScanResults(TypedDict):
|
|
"""Type definition for scan results."""
|
|
|
|
scanned: int
|
|
added: int
|
|
updated: int
|
|
deleted: int
|
|
skipped: int
|
|
duplicates: int
|
|
errors: int
|
|
files: list[FileInfo]
|
|
|
|
|
|
class SoundScannerService:
|
|
"""Service for scanning and importing audio files."""
|
|
|
|
def __init__(self, session: AsyncSession) -> None:
|
|
"""Initialize the sound scanner service."""
|
|
self.session = session
|
|
self.sound_repo = SoundRepository(session)
|
|
self.supported_extensions = {
|
|
".mp3",
|
|
".wav",
|
|
".opus",
|
|
".flac",
|
|
".ogg",
|
|
".m4a",
|
|
".aac",
|
|
}
|
|
|
|
# Directory mappings for normalized files (matching sound_normalizer)
|
|
self.normalized_directories = {
|
|
"SDB": "sounds/normalized/soundboard",
|
|
"TTS": "sounds/normalized/text_to_speech",
|
|
"EXT": "sounds/normalized/extracted",
|
|
}
|
|
|
|
def extract_name_from_filename(self, filename: str) -> str:
|
|
"""Extract a clean name from filename."""
|
|
# Remove extension
|
|
name = Path(filename).stem
|
|
# Replace underscores and hyphens with spaces
|
|
name = name.replace("_", " ").replace("-", " ")
|
|
# Capitalize words
|
|
return " ".join(word.capitalize() for word in name.split())
|
|
|
|
def _get_normalized_path(self, sound_type: str, filename: str) -> Path:
|
|
"""Get the normalized file path for a sound."""
|
|
directory = self.normalized_directories.get(
|
|
sound_type, "sounds/normalized/other",
|
|
)
|
|
return Path(directory) / filename
|
|
|
|
def _rename_normalized_file(
|
|
self, sound_type: str, old_filename: str, new_filename: str,
|
|
) -> bool:
|
|
"""Rename normalized file if exists. Returns True if renamed, else False."""
|
|
old_path = self._get_normalized_path(sound_type, old_filename)
|
|
new_path = self._get_normalized_path(sound_type, new_filename)
|
|
|
|
if old_path.exists():
|
|
try:
|
|
# Ensure the directory exists
|
|
new_path.parent.mkdir(parents=True, exist_ok=True)
|
|
old_path.rename(new_path)
|
|
logger.info("Renamed normalized file: %s -> %s", old_path, new_path)
|
|
except OSError:
|
|
logger.exception(
|
|
"Failed to rename normalized file %s -> %s",
|
|
old_path,
|
|
new_path,
|
|
)
|
|
return False
|
|
else:
|
|
return True
|
|
return False
|
|
|
|
def _delete_normalized_file(self, sound_type: str, filename: str) -> bool:
|
|
"""Delete normalized file if exists. Returns True if deleted, else False."""
|
|
normalized_path = self._get_normalized_path(sound_type, filename)
|
|
|
|
if normalized_path.exists():
|
|
try:
|
|
normalized_path.unlink()
|
|
logger.info("Deleted normalized file: %s", normalized_path)
|
|
except OSError:
|
|
logger.exception(
|
|
"Failed to delete normalized file %s", normalized_path,
|
|
)
|
|
return False
|
|
else:
|
|
return True
|
|
return False
|
|
|
|
def _extract_sound_attributes(self, sound_data: dict | Sound | None) -> dict:
|
|
"""Extract attributes from sound data (dict or Sound object)."""
|
|
if sound_data is None:
|
|
return {}
|
|
|
|
if isinstance(sound_data, dict):
|
|
return {
|
|
"filename": sound_data.get("filename"),
|
|
"name": sound_data.get("name"),
|
|
"duration": sound_data.get("duration"),
|
|
"size": sound_data.get("size"),
|
|
"id": sound_data.get("id"),
|
|
"object": sound_data.get("sound_object"),
|
|
"type": sound_data.get("type"),
|
|
"is_normalized": sound_data.get("is_normalized"),
|
|
"normalized_filename": sound_data.get("normalized_filename"),
|
|
}
|
|
# Sound object (for tests)
|
|
return {
|
|
"filename": sound_data.filename,
|
|
"name": sound_data.name,
|
|
"duration": sound_data.duration,
|
|
"size": sound_data.size,
|
|
"id": sound_data.id,
|
|
"object": sound_data,
|
|
"type": sound_data.type,
|
|
"is_normalized": sound_data.is_normalized,
|
|
"normalized_filename": sound_data.normalized_filename,
|
|
}
|
|
|
|
def _handle_unchanged_file(
|
|
self,
|
|
filename: str,
|
|
existing_attrs: dict,
|
|
results: ScanResults,
|
|
) -> None:
|
|
"""Handle unchanged file (same hash, same filename)."""
|
|
logger.debug("Sound unchanged: %s", filename)
|
|
results["skipped"] += 1
|
|
results["files"].append({
|
|
"filename": filename,
|
|
"status": "skipped",
|
|
"reason": "file unchanged",
|
|
"name": existing_attrs["name"],
|
|
"duration": existing_attrs["duration"],
|
|
"size": existing_attrs["size"],
|
|
"id": existing_attrs["id"],
|
|
"error": None,
|
|
"changes": None,
|
|
})
|
|
|
|
def _handle_duplicate_file(
|
|
self,
|
|
filename: str,
|
|
existing_filename: str,
|
|
file_hash: str,
|
|
existing_attrs: dict,
|
|
results: ScanResults,
|
|
) -> None:
|
|
"""Handle duplicate file (same hash, different filename)."""
|
|
logger.warning(
|
|
"Duplicate file detected: '%s' has same content as existing "
|
|
"'%s' (hash: %s). Skipping duplicate file.",
|
|
filename,
|
|
existing_filename,
|
|
file_hash[:8] + "...",
|
|
)
|
|
results["skipped"] += 1
|
|
results["duplicates"] += 1
|
|
results["files"].append({
|
|
"filename": filename,
|
|
"status": "skipped",
|
|
"reason": "duplicate content",
|
|
"name": existing_attrs["name"],
|
|
"duration": existing_attrs["duration"],
|
|
"size": existing_attrs["size"],
|
|
"id": existing_attrs["id"],
|
|
"error": None,
|
|
"changes": None,
|
|
})
|
|
|
|
async def _handle_file_rename(
|
|
self,
|
|
file_info: AudioFileInfo,
|
|
existing_attrs: dict,
|
|
results: ScanResults,
|
|
) -> None:
|
|
"""Handle file rename (same hash, different filename)."""
|
|
update_data = {
|
|
"filename": file_info.filename,
|
|
"name": file_info.name,
|
|
}
|
|
|
|
# If the sound has a normalized file, rename it too
|
|
if existing_attrs["is_normalized"] and existing_attrs["normalized_filename"]:
|
|
old_normalized_base = Path(existing_attrs["normalized_filename"]).name
|
|
new_normalized_base = (
|
|
Path(file_info.filename).stem
|
|
+ Path(existing_attrs["normalized_filename"]).suffix
|
|
)
|
|
|
|
renamed = self._rename_normalized_file(
|
|
existing_attrs["type"],
|
|
old_normalized_base,
|
|
new_normalized_base,
|
|
)
|
|
|
|
if renamed:
|
|
update_data["normalized_filename"] = new_normalized_base
|
|
logger.info(
|
|
"Renamed normalized file: %s -> %s",
|
|
old_normalized_base,
|
|
new_normalized_base,
|
|
)
|
|
|
|
await self.sound_repo.update(existing_attrs["object"], update_data)
|
|
logger.info(
|
|
"Detected rename: %s -> %s (ID: %s)",
|
|
existing_attrs["filename"],
|
|
file_info.filename,
|
|
existing_attrs["id"],
|
|
)
|
|
|
|
# Build changes list
|
|
changes = ["filename", "name"]
|
|
if "normalized_filename" in update_data:
|
|
changes.append("normalized_filename")
|
|
|
|
results["updated"] += 1
|
|
results["files"].append({
|
|
"filename": file_info.filename,
|
|
"status": "updated",
|
|
"reason": "file was renamed",
|
|
"name": file_info.name,
|
|
"duration": existing_attrs["duration"],
|
|
"size": existing_attrs["size"],
|
|
"id": existing_attrs["id"],
|
|
"error": None,
|
|
"changes": changes,
|
|
# Store old filename to prevent deletion
|
|
"old_filename": existing_attrs["filename"],
|
|
})
|
|
|
|
async def _handle_file_modification(
|
|
self,
|
|
file_info: AudioFileInfo,
|
|
existing_attrs: dict,
|
|
results: ScanResults,
|
|
) -> None:
|
|
"""Handle file modification (same filename, different hash)."""
|
|
update_data = {
|
|
"name": file_info.name,
|
|
"duration": file_info.duration,
|
|
"size": file_info.size,
|
|
"hash": file_info.file_hash,
|
|
}
|
|
|
|
await self.sound_repo.update(existing_attrs["object"], update_data)
|
|
logger.info(
|
|
"Updated modified sound: %s (ID: %s)",
|
|
file_info.name,
|
|
existing_attrs["id"],
|
|
)
|
|
|
|
results["updated"] += 1
|
|
results["files"].append({
|
|
"filename": file_info.filename,
|
|
"status": "updated",
|
|
"reason": "file was modified",
|
|
"name": file_info.name,
|
|
"duration": file_info.duration,
|
|
"size": file_info.size,
|
|
"id": existing_attrs["id"],
|
|
"error": None,
|
|
"changes": ["hash", "duration", "size", "name"],
|
|
})
|
|
|
|
async def _handle_new_file(
|
|
self,
|
|
file_info: AudioFileInfo,
|
|
sound_type: str,
|
|
results: ScanResults,
|
|
) -> None:
|
|
"""Handle new file (neither hash nor filename exists)."""
|
|
sound_data = {
|
|
"type": sound_type,
|
|
"name": file_info.name,
|
|
"filename": file_info.filename,
|
|
"duration": file_info.duration,
|
|
"size": file_info.size,
|
|
"hash": file_info.file_hash,
|
|
"is_deletable": False,
|
|
"is_music": False,
|
|
"is_normalized": False,
|
|
"play_count": 0,
|
|
}
|
|
|
|
sound = await self.sound_repo.create(sound_data)
|
|
logger.info("Added new sound: %s (ID: %s)", sound.name, sound.id)
|
|
|
|
results["added"] += 1
|
|
results["files"].append({
|
|
"filename": file_info.filename,
|
|
"status": "added",
|
|
"reason": None,
|
|
"name": file_info.name,
|
|
"duration": file_info.duration,
|
|
"size": file_info.size,
|
|
"id": sound.id,
|
|
"error": None,
|
|
"changes": None,
|
|
})
|
|
|
|
async def _load_existing_sounds(self, sound_type: str) -> tuple[dict, dict]:
|
|
"""Load existing sounds and create lookup dictionaries."""
|
|
existing_sounds = await self.sound_repo.get_by_type(sound_type)
|
|
|
|
# Create lookup dictionaries with immediate attribute access
|
|
# to avoid session detachment
|
|
sounds_by_hash = {}
|
|
sounds_by_filename = {}
|
|
|
|
for sound in existing_sounds:
|
|
# Capture all attributes immediately while session is valid
|
|
sound_data = {
|
|
"id": sound.id,
|
|
"hash": sound.hash,
|
|
"filename": sound.filename,
|
|
"name": sound.name,
|
|
"duration": sound.duration,
|
|
"size": sound.size,
|
|
"type": sound.type,
|
|
"is_normalized": sound.is_normalized,
|
|
"normalized_filename": sound.normalized_filename,
|
|
"sound_object": sound, # Keep reference for database operations
|
|
}
|
|
sounds_by_hash[sound.hash] = sound_data
|
|
sounds_by_filename[sound.filename] = sound_data
|
|
|
|
return sounds_by_hash, sounds_by_filename
|
|
|
|
async def _process_audio_files(
|
|
self,
|
|
scan_path: Path,
|
|
sound_type: str,
|
|
sounds_by_hash: dict,
|
|
sounds_by_filename: dict,
|
|
results: ScanResults,
|
|
) -> set[str]:
|
|
"""Process all audio files in directory and return processed filenames."""
|
|
# Get all audio files from directory
|
|
audio_files = [
|
|
f
|
|
for f in scan_path.iterdir()
|
|
if f.is_file() and f.suffix.lower() in self.supported_extensions
|
|
]
|
|
|
|
# Process each file in directory
|
|
processed_filenames = set()
|
|
for file_path in audio_files:
|
|
results["scanned"] += 1
|
|
filename = file_path.name
|
|
processed_filenames.add(filename)
|
|
|
|
try:
|
|
# Calculate hash first to enable hash-based lookup
|
|
file_hash = get_file_hash(file_path)
|
|
existing_sound_by_hash = sounds_by_hash.get(file_hash)
|
|
existing_sound_by_filename = sounds_by_filename.get(filename)
|
|
|
|
# Create sync context
|
|
sync_context = SyncContext(
|
|
file_path=file_path,
|
|
sound_type=sound_type,
|
|
existing_sound_by_hash=existing_sound_by_hash,
|
|
existing_sound_by_filename=existing_sound_by_filename,
|
|
file_hash=file_hash,
|
|
)
|
|
|
|
await self._sync_audio_file(sync_context, results)
|
|
|
|
# Check if this was a rename and mark old filename as processed
|
|
if results["files"] and results["files"][-1].get("old_filename"):
|
|
old_filename = results["files"][-1]["old_filename"]
|
|
processed_filenames.add(old_filename)
|
|
logger.debug("Marked old filename as processed: %s", old_filename)
|
|
# Remove temporary tracking field from results
|
|
del results["files"][-1]["old_filename"]
|
|
except Exception as e:
|
|
logger.exception("Error processing file %s", file_path)
|
|
results["errors"] += 1
|
|
results["files"].append({
|
|
"filename": filename,
|
|
"status": "error",
|
|
"reason": None,
|
|
"name": None,
|
|
"duration": None,
|
|
"size": None,
|
|
"id": None,
|
|
"error": str(e),
|
|
"changes": None,
|
|
})
|
|
|
|
return processed_filenames
|
|
|
|
async def _delete_missing_sounds(
|
|
self,
|
|
sounds_by_filename: dict,
|
|
processed_filenames: set[str],
|
|
results: ScanResults,
|
|
) -> None:
|
|
"""Delete sounds that no longer exist in directory."""
|
|
for filename, sound_data in sounds_by_filename.items():
|
|
if filename not in processed_filenames:
|
|
# Attributes already captured in sound_data dictionary
|
|
sound_name = sound_data["name"]
|
|
sound_duration = sound_data["duration"]
|
|
sound_size = sound_data["size"]
|
|
sound_id = sound_data["id"]
|
|
sound_object = sound_data["sound_object"]
|
|
sound_type = sound_data["type"]
|
|
sound_is_normalized = sound_data["is_normalized"]
|
|
sound_normalized_filename = sound_data["normalized_filename"]
|
|
|
|
try:
|
|
# Delete the sound from database first
|
|
await self.sound_repo.delete(sound_object)
|
|
logger.info("Deleted sound no longer in directory: %s", filename)
|
|
|
|
# If the sound had a normalized file, delete it too
|
|
if sound_is_normalized and sound_normalized_filename:
|
|
normalized_base = Path(sound_normalized_filename).name
|
|
self._delete_normalized_file(sound_type, normalized_base)
|
|
|
|
results["deleted"] += 1
|
|
results["files"].append({
|
|
"filename": filename,
|
|
"status": "deleted",
|
|
"reason": "file no longer exists",
|
|
"name": sound_name,
|
|
"duration": sound_duration,
|
|
"size": sound_size,
|
|
"id": sound_id,
|
|
"error": None,
|
|
"changes": None,
|
|
})
|
|
except Exception as e:
|
|
logger.exception("Error deleting sound %s", filename)
|
|
results["errors"] += 1
|
|
results["files"].append({
|
|
"filename": filename,
|
|
"status": "error",
|
|
"reason": "failed to delete",
|
|
"name": sound_name,
|
|
"duration": sound_duration,
|
|
"size": sound_size,
|
|
"id": sound_id,
|
|
"error": str(e),
|
|
"changes": None,
|
|
})
|
|
|
|
async def scan_directory(
|
|
self,
|
|
directory_path: str,
|
|
sound_type: str = "SDB",
|
|
) -> ScanResults:
|
|
"""Sync a directory with the database (add/update/delete sounds)."""
|
|
scan_path = Path(directory_path)
|
|
|
|
if not scan_path.exists():
|
|
msg = f"Directory does not exist: {directory_path}"
|
|
raise ValueError(msg)
|
|
|
|
if not scan_path.is_dir():
|
|
msg = f"Path is not a directory: {directory_path}"
|
|
raise ValueError(msg)
|
|
|
|
results: ScanResults = {
|
|
"scanned": 0,
|
|
"added": 0,
|
|
"updated": 0,
|
|
"deleted": 0,
|
|
"skipped": 0,
|
|
"duplicates": 0,
|
|
"errors": 0,
|
|
"files": [],
|
|
}
|
|
|
|
logger.info("Starting sync of directory: %s", directory_path)
|
|
|
|
# Load existing sounds from database
|
|
sounds_by_hash, sounds_by_filename = await self._load_existing_sounds(
|
|
sound_type,
|
|
)
|
|
|
|
# Process audio files in directory
|
|
processed_filenames = await self._process_audio_files(
|
|
scan_path,
|
|
sound_type,
|
|
sounds_by_hash,
|
|
sounds_by_filename,
|
|
results,
|
|
)
|
|
|
|
# Delete sounds that no longer exist in directory
|
|
await self._delete_missing_sounds(
|
|
sounds_by_filename,
|
|
processed_filenames,
|
|
results,
|
|
)
|
|
|
|
logger.info("Sync completed: %s", results)
|
|
return results
|
|
|
|
async def _sync_audio_file(
|
|
self,
|
|
sync_context: SyncContext,
|
|
results: ScanResults,
|
|
) -> None:
|
|
"""Sync a single audio file using hash-first identification strategy."""
|
|
filename = sync_context.file_path.name
|
|
duration = get_audio_duration(sync_context.file_path)
|
|
size = get_file_size(sync_context.file_path)
|
|
name = self.extract_name_from_filename(filename)
|
|
|
|
# Create file info object
|
|
file_info = AudioFileInfo(
|
|
filename=filename,
|
|
name=name,
|
|
duration=duration,
|
|
size=size,
|
|
file_hash=sync_context.file_hash,
|
|
)
|
|
|
|
# Extract attributes from existing sounds
|
|
hash_attrs = self._extract_sound_attributes(sync_context.existing_sound_by_hash)
|
|
filename_attrs = self._extract_sound_attributes(
|
|
sync_context.existing_sound_by_filename,
|
|
)
|
|
|
|
# Hash-first identification strategy
|
|
if sync_context.existing_sound_by_hash is not None:
|
|
# Content exists in database (same hash)
|
|
if hash_attrs["filename"] == filename:
|
|
# Same hash, same filename - file unchanged
|
|
self._handle_unchanged_file(filename, hash_attrs, results)
|
|
else:
|
|
# Same hash, different filename - could be rename or duplicate
|
|
old_file_path = sync_context.file_path.parent / hash_attrs["filename"]
|
|
if old_file_path.exists():
|
|
# Both files exist with same hash - this is a duplicate
|
|
self._handle_duplicate_file(
|
|
filename,
|
|
hash_attrs["filename"],
|
|
sync_context.file_hash,
|
|
hash_attrs,
|
|
results,
|
|
)
|
|
else:
|
|
# Old file doesn't exist - this is a genuine rename
|
|
await self._handle_file_rename(file_info, hash_attrs, results)
|
|
|
|
elif sync_context.existing_sound_by_filename is not None:
|
|
# Same filename but different hash - file was modified
|
|
await self._handle_file_modification(file_info, filename_attrs, results)
|
|
else:
|
|
# New file - neither hash nor filename exists
|
|
await self._handle_new_file(file_info, sync_context.sound_type, results)
|
|
|
|
async def scan_soundboard_directory(self) -> ScanResults:
|
|
"""Sync the default soundboard directory."""
|
|
soundboard_path = "sounds/originals/soundboard"
|
|
return await self.scan_directory(soundboard_path, "SDB")
|