"""Extraction service for audio extraction from external services using yt-dlp.""" import asyncio import shutil from pathlib import Path from typing import TypedDict import yt_dlp from sqlmodel.ext.asyncio.session import AsyncSession from app.core.config import settings from app.core.logging import get_logger from app.models.sound import Sound from app.repositories.extraction import ExtractionRepository from app.repositories.sound import SoundRepository from app.services.playlist import PlaylistService from app.services.sound_normalizer import SoundNormalizerService from app.utils.audio import get_audio_duration, get_file_hash, get_file_size logger = get_logger(__name__) class ExtractionInfo(TypedDict): """Type definition for extraction information.""" id: int url: str service: str | None service_id: str | None title: str | None status: str error: str | None sound_id: int | None user_id: int created_at: str updated_at: str class ExtractionService: """Service for extracting audio from external services using yt-dlp.""" def __init__(self, session: AsyncSession) -> None: """Initialize the extraction service.""" self.session = session self.extraction_repo = ExtractionRepository(session) self.sound_repo = SoundRepository(session) self.playlist_service = PlaylistService(session) # Ensure required directories exist self._ensure_directories() def _ensure_directories(self) -> None: """Ensure all required directories exist.""" directories = [ settings.EXTRACTION_TEMP_DIR, "sounds/originals/extracted", settings.EXTRACTION_THUMBNAILS_DIR, ] for directory in directories: Path(directory).mkdir(parents=True, exist_ok=True) logger.debug("Ensured directory exists: %s", directory) async def create_extraction(self, url: str, user_id: int) -> ExtractionInfo: """Create a new extraction job.""" logger.info("Creating extraction for URL: %s (user: %d)", url, user_id) try: # Create the extraction record without service detection for fast response extraction_data = { "url": url, "user_id": user_id, "service": None, # Will be detected during processing "service_id": None, # Will be detected during processing "title": None, # Will be detected during processing "status": "pending", } extraction = await self.extraction_repo.create(extraction_data) logger.info("Created extraction with ID: %d", extraction.id) except Exception: logger.exception("Failed to create extraction for URL: %s", url) raise else: return { "id": extraction.id or 0, # Should never be None for created extraction "url": extraction.url, "service": extraction.service, "service_id": extraction.service_id, "title": extraction.title, "status": extraction.status, "error": extraction.error, "sound_id": extraction.sound_id, "user_id": extraction.user_id, "created_at": extraction.created_at.isoformat(), "updated_at": extraction.updated_at.isoformat(), } async def _detect_service_info(self, url: str) -> dict[str, str | None] | None: """Detect service information from URL using yt-dlp.""" try: # Configure yt-dlp for info extraction only ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": False, } def _extract_info() -> dict | None: with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Extract info without downloading return ydl.extract_info(url, download=False) # Run the blocking operation in a thread pool info = await asyncio.to_thread(_extract_info) if not info: return None return { "service": info.get("extractor", ""), "service_id": str(info.get("id", "")), "title": info.get("title"), } except Exception: logger.exception("Failed to detect service info for URL: %s", url) return None async def process_extraction(self, extraction_id: int) -> ExtractionInfo: """Process an extraction job.""" extraction = await self.extraction_repo.get_by_id(extraction_id) if not extraction: msg = f"Extraction {extraction_id} not found" raise ValueError(msg) if extraction.status != "pending": msg = f"Extraction {extraction_id} is not pending" raise ValueError(msg) # Store all needed values early to avoid session detachment issues user_id = extraction.user_id extraction_url = extraction.url extraction_service = extraction.service extraction_service_id = extraction.service_id extraction_title = extraction.title logger.info("Processing extraction %d: %s", extraction_id, extraction_url) try: # Update status to processing await self.extraction_repo.update(extraction, {"status": "processing"}) # Detect service info if not already available if not extraction_service or not extraction_service_id: logger.info("Detecting service info for extraction %d", extraction_id) service_info = await self._detect_service_info(extraction_url) if not service_info: msg = "Unable to detect service information from URL" raise ValueError(msg) # Check if extraction already exists for this service existing = await self.extraction_repo.get_by_service_and_id( service_info["service"], service_info["service_id"], ) if existing and existing.id != extraction_id: error_msg = ( f"Extraction already exists for " f"{service_info['service']}:{service_info['service_id']}" ) logger.warning(error_msg) raise ValueError(error_msg) # Update extraction with service info update_data = { "service": service_info["service"], "service_id": service_info["service_id"], "title": service_info.get("title") or extraction_title, } await self.extraction_repo.update(extraction, update_data) # Update values for processing extraction_service = service_info["service"] extraction_service_id = service_info["service_id"] extraction_title = service_info.get("title") or extraction_title # Extract audio and thumbnail audio_file, thumbnail_file = await self._extract_media( extraction_id, extraction_url, ) # Move files to final locations ( final_audio_path, final_thumbnail_path, ) = await self._move_files_to_final_location( audio_file, thumbnail_file, extraction_title, extraction_service, extraction_service_id, ) # Create Sound record sound = await self._create_sound_record( final_audio_path, final_thumbnail_path, extraction_title, extraction_service, extraction_service_id, ) # Store sound_id early to avoid session detachment issues sound_id = sound.id if not sound_id: msg = "Sound creation failed - no ID returned" raise RuntimeError(msg) # Normalize the sound await self._normalize_sound(sound_id) # Add to main playlist await self._add_to_main_playlist(sound_id, user_id) # Update extraction with success await self.extraction_repo.update( extraction, { "status": "completed", "sound_id": sound_id, "error": None, }, ) logger.info("Successfully processed extraction %d", extraction_id) except Exception as e: error_msg = str(e) logger.exception( "Failed to process extraction %d: %s", extraction_id, error_msg, ) else: # Get updated extraction to get latest timestamps updated_extraction = await self.extraction_repo.get_by_id(extraction_id) return { "id": extraction_id, "url": extraction_url, "service": extraction_service, "service_id": extraction_service_id, "title": extraction_title, "status": "completed", "error": None, "sound_id": sound_id, "user_id": user_id, "created_at": ( updated_extraction.created_at.isoformat() if updated_extraction else "" ), "updated_at": ( updated_extraction.updated_at.isoformat() if updated_extraction else "" ), } # Update extraction with error await self.extraction_repo.update( extraction, { "status": "failed", "error": error_msg, }, ) # Get updated extraction to get latest timestamps updated_extraction = await self.extraction_repo.get_by_id(extraction_id) return { "id": extraction_id, "url": extraction_url, "service": extraction_service, "service_id": extraction_service_id, "title": extraction_title, "status": "failed", "error": error_msg, "sound_id": None, "user_id": user_id, "created_at": ( updated_extraction.created_at.isoformat() if updated_extraction else "" ), "updated_at": ( updated_extraction.updated_at.isoformat() if updated_extraction else "" ), } async def _extract_media( self, extraction_id: int, extraction_url: str, ) -> tuple[Path, Path | None]: """Extract audio and thumbnail using yt-dlp.""" temp_dir = Path(settings.EXTRACTION_TEMP_DIR) # Create unique filename based on extraction ID output_template = str( temp_dir / f"extraction_{extraction_id}_%(title)s.%(ext)s", ) # Configure yt-dlp options ydl_opts = { "format": "bestaudio/best", "outtmpl": output_template, "extractaudio": True, "audioformat": settings.EXTRACTION_AUDIO_FORMAT, "audioquality": settings.EXTRACTION_AUDIO_BITRATE, "writethumbnail": True, "writeinfojson": False, "writeautomaticsub": False, "writesubtitles": False, "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": settings.EXTRACTION_AUDIO_FORMAT, "preferredquality": settings.EXTRACTION_AUDIO_BITRATE.rstrip("k"), }, ], } def _download_media() -> None: with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Download and extract ydl.download([extraction_url]) try: # Run the blocking download operation in a thread pool await asyncio.to_thread(_download_media) # Find the extracted files audio_files = list( temp_dir.glob( f"extraction_{extraction_id}_*.{settings.EXTRACTION_AUDIO_FORMAT}", ), ) thumbnail_files = ( list(temp_dir.glob(f"extraction_{extraction_id}_*.webp")) + list(temp_dir.glob(f"extraction_{extraction_id}_*.jpg")) + list(temp_dir.glob(f"extraction_{extraction_id}_*.png")) ) if not audio_files: msg = "No audio file was created during extraction" raise RuntimeError(msg) audio_file = audio_files[0] thumbnail_file = thumbnail_files[0] if thumbnail_files else None logger.info( "Extracted audio: %s, thumbnail: %s", audio_file, thumbnail_file or "None", ) except Exception as e: logger.exception("yt-dlp extraction failed for %s", extraction_url) error_msg = f"Audio extraction failed: {e}" raise RuntimeError(error_msg) from e else: return audio_file, thumbnail_file async def _move_files_to_final_location( self, audio_file: Path, thumbnail_file: Path | None, title: str | None, service: str | None, service_id: str | None, ) -> tuple[Path, Path | None]: """Move extracted files to their final locations.""" # Generate clean filename based on title and service safe_title = self._sanitize_filename( title or f"{service or 'unknown'}_{service_id or 'unknown'}", ) # Move audio file final_audio_path = ( Path("sounds/originals/extracted") / f"{safe_title}.{settings.EXTRACTION_AUDIO_FORMAT}" ) final_audio_path = self._ensure_unique_filename(final_audio_path) shutil.move(str(audio_file), str(final_audio_path)) logger.info("Moved audio file to: %s", final_audio_path) # Move thumbnail file if it exists final_thumbnail_path = None if thumbnail_file: thumbnail_ext = thumbnail_file.suffix final_thumbnail_path = ( Path(settings.EXTRACTION_THUMBNAILS_DIR) / f"{safe_title}{thumbnail_ext}" ) final_thumbnail_path = self._ensure_unique_filename(final_thumbnail_path) shutil.move(str(thumbnail_file), str(final_thumbnail_path)) logger.info("Moved thumbnail file to: %s", final_thumbnail_path) return final_audio_path, final_thumbnail_path def _sanitize_filename(self, filename: str) -> str: """Sanitize filename for filesystem.""" # Remove or replace problematic characters invalid_chars = '<>:"/\\|?*' for char in invalid_chars: filename = filename.replace(char, "_") # Limit length and remove leading/trailing spaces filename = filename.strip()[:100] return filename or "untitled" def _ensure_unique_filename(self, filepath: Path) -> Path: """Ensure filename is unique by adding counter if needed.""" if not filepath.exists(): return filepath stem = filepath.stem suffix = filepath.suffix parent = filepath.parent counter = 1 while True: new_path = parent / f"{stem}_{counter}{suffix}" if not new_path.exists(): return new_path counter += 1 async def _create_sound_record( self, audio_path: Path, thumbnail_path: Path | None, title: str | None, service: str | None, service_id: str | None, ) -> Sound: """Create a Sound record for the extracted audio.""" # Get audio metadata duration = get_audio_duration(audio_path) size = get_file_size(audio_path) file_hash = get_file_hash(audio_path) # Create sound data sound_data = { "type": "EXT", "name": title or f"{service or 'unknown'}_{service_id or 'unknown'}", "filename": audio_path.name, "duration": duration, "size": size, "hash": file_hash, "thumbnail": thumbnail_path.name if thumbnail_path else None, "is_deletable": True, # Extracted sounds can be deleted "is_music": True, # Assume extracted content is music "is_normalized": False, "play_count": 0, } sound = await self.sound_repo.create(sound_data) logger.info( "Created sound record with ID: %d, thumbnail: %s", sound.id, thumbnail_path.name if thumbnail_path else "None", ) return sound async def _normalize_sound(self, sound_id: int) -> None: """Normalize the extracted sound.""" try: # Get fresh sound object from database for normalization sound = await self.sound_repo.get_by_id(sound_id) if not sound: logger.warning("Sound %d not found for normalization", sound_id) return normalizer_service = SoundNormalizerService(self.session) result = await normalizer_service.normalize_sound(sound) if result["status"] == "error": logger.warning( "Failed to normalize sound %d: %s", sound_id, result.get("error"), ) else: logger.info("Successfully normalized sound %d", sound_id) except Exception: logger.exception("Error normalizing sound %d", sound_id) # Don't fail the extraction if normalization fails async def _add_to_main_playlist(self, sound_id: int, user_id: int) -> None: """Add the sound to the user's main playlist.""" try: await self.playlist_service._add_sound_to_main_playlist_internal( # noqa: SLF001 sound_id, user_id, ) logger.info( "Added sound %d to main playlist for user %d", sound_id, user_id, ) except Exception: logger.exception( "Error adding sound %d to main playlist for user %d", sound_id, user_id, ) # Don't fail the extraction if playlist addition fails async def get_extraction_by_id(self, extraction_id: int) -> ExtractionInfo | None: """Get extraction information by ID.""" extraction = await self.extraction_repo.get_by_id(extraction_id) if not extraction: return None return { "id": extraction.id or 0, # Should never be None for existing extraction "url": extraction.url, "service": extraction.service, "service_id": extraction.service_id, "title": extraction.title, "status": extraction.status, "error": extraction.error, "sound_id": extraction.sound_id, "user_id": extraction.user_id, "created_at": extraction.created_at.isoformat(), "updated_at": extraction.updated_at.isoformat(), } async def get_user_extractions( self, user_id: int, search: str | None = None, sort_by: str = "created_at", sort_order: str = "desc", status_filter: str | None = None, ) -> list[ExtractionInfo]: """Get all extractions for a user with filtering, search, and sorting.""" extractions = await self.extraction_repo.get_user_extractions_filtered( user_id=user_id, search=search, sort_by=sort_by, sort_order=sort_order, status_filter=status_filter, ) return [ { "id": extraction.id or 0, # Should never be None for existing extraction "url": extraction.url, "service": extraction.service, "service_id": extraction.service_id, "title": extraction.title, "status": extraction.status, "error": extraction.error, "sound_id": extraction.sound_id, "user_id": extraction.user_id, "created_at": extraction.created_at.isoformat(), "updated_at": extraction.updated_at.isoformat(), } for extraction in extractions ] async def get_pending_extractions(self) -> list[ExtractionInfo]: """Get all pending extractions.""" extractions = await self.extraction_repo.get_pending_extractions() return [ { "id": extraction.id or 0, # Should never be None for existing extraction "url": extraction.url, "service": extraction.service, "service_id": extraction.service_id, "title": extraction.title, "status": extraction.status, "error": extraction.error, "sound_id": extraction.sound_id, "user_id": extraction.user_id, "created_at": extraction.created_at.isoformat(), "updated_at": extraction.updated_at.isoformat(), } for extraction in extractions ]