351 lines
12 KiB
Python
351 lines
12 KiB
Python
"""Sound file scanning service for discovering and importing audio files."""
|
|
|
|
import hashlib
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import ffmpeg
|
|
|
|
from app.database import db
|
|
from app.models.sound import Sound
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SoundScannerService:
|
|
"""Service for scanning and importing sound files."""
|
|
|
|
# Supported audio file extensions
|
|
SUPPORTED_EXTENSIONS = {".mp3", ".wav", ".ogg", ".flac", ".m4a", ".aac"}
|
|
|
|
# Default soundboard directory
|
|
DEFAULT_SOUNDBOARD_DIR = "sounds/soundboard"
|
|
|
|
@staticmethod
|
|
def scan_soundboard_directory(
|
|
directory: str | None = None,
|
|
) -> dict:
|
|
"""Scan the soundboard directory and add new files to the database.
|
|
|
|
Args:
|
|
directory: Directory to scan (defaults to sounds/soundboard)
|
|
|
|
Returns:
|
|
dict: Summary of the scan operation
|
|
|
|
"""
|
|
scan_dir = directory or SoundScannerService.DEFAULT_SOUNDBOARD_DIR
|
|
|
|
try:
|
|
# Ensure directory exists
|
|
scan_path = Path(scan_dir)
|
|
if not scan_path.exists():
|
|
logger.warning(
|
|
f"Soundboard directory does not exist: {scan_dir}",
|
|
)
|
|
return {
|
|
"success": False,
|
|
"error": f"Directory not found: {scan_dir}",
|
|
"files_found": 0,
|
|
"files_added": 0,
|
|
"files_skipped": 0,
|
|
}
|
|
|
|
logger.info(f"Starting soundboard scan in: {scan_dir}")
|
|
|
|
files_found = 0
|
|
files_added = 0
|
|
files_skipped = 0
|
|
errors = []
|
|
|
|
# Walk through directory and subdirectories
|
|
for file_path in scan_path.rglob("*"):
|
|
if file_path.is_file():
|
|
filename = file_path.name
|
|
|
|
# Check if file has supported extension
|
|
if not SoundScannerService._is_supported_audio_file(
|
|
filename,
|
|
):
|
|
continue
|
|
|
|
files_found += 1
|
|
|
|
try:
|
|
# Process the audio file
|
|
result = SoundScannerService._process_audio_file(
|
|
str(file_path),
|
|
scan_dir,
|
|
)
|
|
|
|
if result["added"]:
|
|
files_added += 1
|
|
logger.debug(f"Added sound: {filename}")
|
|
elif result.get("updated"):
|
|
files_added += (
|
|
1 # Count updates as additions for reporting
|
|
)
|
|
logger.debug(f"Updated sound: {filename}")
|
|
else:
|
|
files_skipped += 1
|
|
logger.debug(
|
|
f"Skipped sound: {filename} ({result['reason']})",
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error processing {filename}: {e!s}"
|
|
logger.error(error_msg)
|
|
errors.append(error_msg)
|
|
files_skipped += 1
|
|
|
|
# Commit all changes
|
|
db.session.commit()
|
|
|
|
logger.info(
|
|
f"Soundboard scan completed: {files_found} files found, "
|
|
f"{files_added} added, {files_skipped} skipped",
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"directory": scan_dir,
|
|
"files_found": files_found,
|
|
"files_added": files_added,
|
|
"files_skipped": files_skipped,
|
|
"errors": errors,
|
|
"message": f"Scan completed: {files_added} new sounds added",
|
|
}
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Error during soundboard scan: {e!s}")
|
|
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"files_found": 0,
|
|
"files_added": 0,
|
|
"files_skipped": 0,
|
|
"message": "Soundboard scan failed",
|
|
}
|
|
|
|
@staticmethod
|
|
def _is_supported_audio_file(filename: str) -> bool:
|
|
"""Check if file has a supported audio extension."""
|
|
return (
|
|
Path(filename).suffix.lower()
|
|
in SoundScannerService.SUPPORTED_EXTENSIONS
|
|
)
|
|
|
|
@staticmethod
|
|
def _process_audio_file(file_path: str, base_dir: str) -> dict:
|
|
"""Process a single audio file and add it to database if new."""
|
|
file_hash = SoundScannerService._calculate_file_hash(file_path)
|
|
metadata = SoundScannerService._extract_audio_metadata(file_path)
|
|
relative_path = Path(file_path).relative_to(Path(base_dir))
|
|
|
|
# Check for existing file by hash (duplicate content)
|
|
if existing_sound := Sound.find_by_hash(file_hash):
|
|
return SoundScannerService._handle_duplicate_file(existing_sound)
|
|
|
|
# Check for existing filename (file replacement)
|
|
if existing_filename_sound := Sound.find_by_filename(
|
|
str(relative_path)
|
|
):
|
|
return SoundScannerService._handle_file_replacement(
|
|
existing_filename_sound,
|
|
str(relative_path),
|
|
metadata,
|
|
file_hash,
|
|
)
|
|
|
|
# Create new sound record
|
|
return SoundScannerService._create_new_sound(
|
|
file_path,
|
|
str(relative_path),
|
|
metadata,
|
|
file_hash,
|
|
)
|
|
|
|
@staticmethod
|
|
def _handle_duplicate_file(existing_sound: Sound) -> dict:
|
|
"""Handle case where file content already exists in database."""
|
|
return {
|
|
"added": False,
|
|
"reason": f"File already exists as '{existing_sound.name}'",
|
|
}
|
|
|
|
@staticmethod
|
|
def _handle_file_replacement(
|
|
existing_sound: Sound,
|
|
relative_path: str,
|
|
metadata: dict,
|
|
file_hash: str,
|
|
) -> dict:
|
|
"""Handle case where filename exists but content may be different."""
|
|
# Remove normalized files and clear normalized info
|
|
SoundScannerService._clear_normalized_files(existing_sound)
|
|
existing_sound.clear_normalized_info()
|
|
|
|
# Update existing sound with new file information
|
|
existing_sound.update_file_info(
|
|
filename=relative_path,
|
|
duration=metadata["duration"],
|
|
size=metadata["size"],
|
|
hash_value=file_hash,
|
|
)
|
|
|
|
return {
|
|
"added": False,
|
|
"updated": True,
|
|
"sound_id": existing_sound.id,
|
|
"reason": f"Updated existing sound '{existing_sound.name}' with new file data",
|
|
}
|
|
|
|
@staticmethod
|
|
def _create_new_sound(
|
|
file_path: str,
|
|
relative_path: str,
|
|
metadata: dict,
|
|
file_hash: str,
|
|
) -> dict:
|
|
"""Create a new sound record in the database."""
|
|
sound_name = SoundScannerService._generate_unique_sound_name(
|
|
Path(file_path).stem,
|
|
)
|
|
|
|
sound = Sound.create_sound(
|
|
sound_type="SDB",
|
|
name=sound_name,
|
|
filename=relative_path,
|
|
duration=metadata["duration"],
|
|
size=metadata["size"],
|
|
hash_value=file_hash,
|
|
is_music=False,
|
|
is_deletable=False,
|
|
commit=False,
|
|
)
|
|
|
|
return {
|
|
"added": True,
|
|
"sound_id": sound.id,
|
|
"reason": "New file added successfully",
|
|
}
|
|
|
|
@staticmethod
|
|
def _generate_unique_sound_name(base_name: str) -> str:
|
|
"""Generate a unique sound name by appending numbers if needed."""
|
|
sound_name = base_name
|
|
counter = 1
|
|
|
|
while Sound.find_by_name(sound_name):
|
|
sound_name = f"{base_name}_{counter}"
|
|
counter += 1
|
|
|
|
return sound_name
|
|
|
|
@staticmethod
|
|
def _calculate_file_hash(file_path: str) -> str:
|
|
"""Calculate SHA256 hash of file contents."""
|
|
sha256_hash = hashlib.sha256()
|
|
|
|
with Path(file_path).open("rb") as f:
|
|
# Read file in chunks to handle large files
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
sha256_hash.update(chunk)
|
|
|
|
return sha256_hash.hexdigest()
|
|
|
|
@staticmethod
|
|
def _clear_normalized_files(sound: Sound) -> None:
|
|
"""Remove normalized files for a sound if they exist."""
|
|
if sound.is_normalized and sound.normalized_filename:
|
|
# Import here to avoid circular imports
|
|
from app.services.sound_normalizer_service import (
|
|
SoundNormalizerService,
|
|
)
|
|
|
|
normalized_path = (
|
|
Path(SoundNormalizerService.NORMALIZED_DIR)
|
|
/ sound.normalized_filename
|
|
)
|
|
if normalized_path.exists():
|
|
try:
|
|
normalized_path.unlink()
|
|
logger.info(f"Removed normalized file: {normalized_path}")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Could not remove normalized file {normalized_path}: {e}",
|
|
)
|
|
|
|
@staticmethod
|
|
def _extract_audio_metadata(file_path: str) -> dict:
|
|
"""Extract metadata from audio file using ffmpeg-python."""
|
|
try:
|
|
# Get file size
|
|
file_size = Path(file_path).stat().st_size
|
|
|
|
# Use ffmpeg to probe audio metadata
|
|
probe = ffmpeg.probe(file_path)
|
|
audio_stream = next(
|
|
(s for s in probe['streams'] if s['codec_type'] == 'audio'),
|
|
None
|
|
)
|
|
|
|
if not audio_stream:
|
|
raise ValueError("No audio stream found in file")
|
|
|
|
# Extract metadata from ffmpeg probe
|
|
duration = int(float(audio_stream.get('duration', 0)) * 1000) # Convert to milliseconds
|
|
channels = int(audio_stream.get('channels', 0))
|
|
sample_rate = int(audio_stream.get('sample_rate', 0))
|
|
bitrate = int(audio_stream.get('bit_rate', 0)) if audio_stream.get('bit_rate') else None
|
|
|
|
# Fallback bitrate calculation if not available
|
|
if not bitrate and duration > 0:
|
|
file_size_bits = file_size * 8
|
|
bitrate = int(file_size_bits / (duration / 1000))
|
|
|
|
return {
|
|
"duration": duration,
|
|
"size": file_size,
|
|
"bitrate": bitrate,
|
|
"channels": channels,
|
|
"sample_rate": sample_rate,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not extract metadata from {file_path}: {e}")
|
|
return {
|
|
"duration": 0,
|
|
"size": Path(file_path).stat().st_size,
|
|
"bitrate": None,
|
|
"channels": None,
|
|
"sample_rate": None,
|
|
}
|
|
|
|
@staticmethod
|
|
def get_scan_statistics() -> dict:
|
|
"""Get statistics about sounds in the database."""
|
|
total_sounds = Sound.query.count()
|
|
sdb_sounds = Sound.query.filter_by(type="SDB").count()
|
|
music_sounds = Sound.query.filter_by(is_music=True).count()
|
|
|
|
# Calculate total size and duration
|
|
sounds = Sound.query.all()
|
|
total_size = sum(sound.size for sound in sounds)
|
|
total_duration = sum(sound.duration for sound in sounds)
|
|
total_plays = sum(sound.play_count for sound in sounds)
|
|
|
|
return {
|
|
"total_sounds": total_sounds,
|
|
"soundboard_sounds": sdb_sounds,
|
|
"music_sounds": music_sounds,
|
|
"total_size_bytes": total_size,
|
|
"total_duration": total_duration,
|
|
"total_plays": total_plays,
|
|
"most_played": [
|
|
sound.to_dict() for sound in Sound.get_most_played(5)
|
|
],
|
|
}
|