"""Extraction service for audio extraction from external services using yt-dlp.""" import asyncio import shutil from dataclasses import dataclass from pathlib import Path from typing import Any, TypedDict import yt_dlp from sqlmodel.ext.asyncio.session import AsyncSession from app.core.config import settings from app.core.logging import get_logger from app.models.extraction import Extraction from app.models.sound import Sound from app.repositories.extraction import ExtractionRepository from app.repositories.sound import SoundRepository from app.repositories.user import UserRepository from app.services.playlist import PlaylistService from app.services.sound_normalizer import SoundNormalizerService from app.utils.audio import get_audio_duration, get_file_hash, get_file_size logger = get_logger(__name__) @dataclass class ExtractionContext: """Context data for extraction processing.""" extraction_id: int extraction_url: str extraction_service: str | None extraction_service_id: str | None extraction_title: str | None user_id: int class ExtractionInfo(TypedDict): """Type definition for extraction information.""" id: int url: str service: str | None service_id: str | None title: str | None status: str error: str | None sound_id: int | None user_id: int user_name: str | None created_at: str updated_at: str class PaginatedExtractionsResponse(TypedDict): """Type definition for paginated extractions response.""" extractions: list[ExtractionInfo] total: int page: int limit: int total_pages: int class ExtractionService: """Service for extracting audio from external services using yt-dlp.""" def __init__(self, session: AsyncSession) -> None: """Initialize the extraction service.""" self.session = session self.extraction_repo = ExtractionRepository(session) self.sound_repo = SoundRepository(session) self.user_repo = UserRepository(session) self.playlist_service = PlaylistService(session) # Ensure required directories exist self._ensure_directories() def _ensure_directories(self) -> None: """Ensure all required directories exist.""" directories = [ settings.EXTRACTION_TEMP_DIR, "sounds/originals/extracted", settings.EXTRACTION_THUMBNAILS_DIR, ] for directory in directories: Path(directory).mkdir(parents=True, exist_ok=True) logger.debug("Ensured directory exists: %s", directory) async def create_extraction(self, url: str, user_id: int) -> ExtractionInfo: """Create a new extraction job.""" logger.info("Creating extraction for URL: %s (user: %d)", url, user_id) try: # Get user information user = await self.user_repo.get_by_id(user_id) if not user: msg = f"User {user_id} not found" raise ValueError(msg) # Extract user name immediately while in session context user_name = user.name # Create the extraction record without service detection for fast response extraction_data = { "url": url, "user_id": user_id, "service": None, # Will be detected during processing "service_id": None, # Will be detected during processing "title": None, # Will be detected during processing "status": "pending", } extraction = await self.extraction_repo.create(extraction_data) logger.info("Created extraction with ID: %d", extraction.id) except Exception: logger.exception("Failed to create extraction for URL: %s", url) raise else: return { "id": extraction.id or 0, # Should never be None for created extraction "url": extraction.url, "service": extraction.service, "service_id": extraction.service_id, "title": extraction.title, "status": extraction.status, "error": extraction.error, "sound_id": extraction.sound_id, "user_id": extraction.user_id, "user_name": user_name, "created_at": extraction.created_at.isoformat(), "updated_at": extraction.updated_at.isoformat(), } async def _detect_service_info(self, url: str) -> dict[str, str | None] | None: """Detect service information from URL using yt-dlp.""" try: # Configure yt-dlp for info extraction only ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": False, } def _extract_info() -> dict | None: with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Extract info without downloading return ydl.extract_info(url, download=False) # Run the blocking operation in a thread pool info = await asyncio.to_thread(_extract_info) if not info: return None return { "service": info.get("extractor", ""), "service_id": str(info.get("id", "")), "title": info.get("title"), } except Exception: logger.exception("Failed to detect service info for URL: %s", url) return None async def _validate_extraction(self, extraction_id: int) -> tuple: """Validate extraction and return extraction data.""" extraction = await self.extraction_repo.get_by_id(extraction_id) if not extraction: msg = f"Extraction {extraction_id} not found" raise ValueError(msg) if extraction.status != "pending": msg = f"Extraction {extraction_id} is not pending" raise ValueError(msg) # Store all needed values early to avoid session detachment issues user_id = extraction.user_id extraction_url = extraction.url extraction_service = extraction.service extraction_service_id = extraction.service_id extraction_title = extraction.title # Get user information for return value try: user = await self.user_repo.get_by_id(user_id) user_name = user.name if user else None except Exception: logger.exception("Failed to get user %d for extraction", user_id) user_name = None return ( extraction, user_id, extraction_url, extraction_service, extraction_service_id, extraction_title, user_name, ) async def _handle_service_detection( self, extraction: Extraction, context: ExtractionContext, ) -> tuple: """Handle service detection and duplicate checking.""" if context.extraction_service and context.extraction_service_id: return ( context.extraction_service, context.extraction_service_id, context.extraction_title, ) logger.info("Detecting service info for extraction %d", context.extraction_id) service_info = await self._detect_service_info(context.extraction_url) if not service_info: msg = "Unable to detect service information from URL" raise ValueError(msg) # Check if extraction already exists for this service service_name = service_info["service"] service_id_val = service_info["service_id"] if not service_name or not service_id_val: msg = "Service info is incomplete" raise ValueError(msg) existing = await self.extraction_repo.get_by_service_and_id( service_name, service_id_val, ) if existing and existing.id != context.extraction_id: error_msg = ( f"Extraction already exists for " f"{service_info['service']}:{service_info['service_id']}" ) logger.warning(error_msg) raise ValueError(error_msg) # Update extraction with service info update_data = { "service": service_info["service"], "service_id": service_info["service_id"], "title": service_info.get("title") or context.extraction_title, } await self.extraction_repo.update(extraction, update_data) # Update values for processing new_service = service_info["service"] new_service_id = service_info["service_id"] new_title = service_info.get("title") or context.extraction_title await self._emit_extraction_event( context.user_id, { "extraction_id": context.extraction_id, "status": "processing", "title": new_title, "url": context.extraction_url, }, ) return new_service, new_service_id, new_title async def _process_media_files( self, extraction_id: int, extraction_url: str, extraction_title: str | None, extraction_service: str, extraction_service_id: str, ) -> int: """Process media files and create sound record.""" # Extract audio and thumbnail audio_file, thumbnail_file = await self._extract_media( extraction_id, extraction_url, ) # Move files to final locations final_audio_path, final_thumbnail_path = ( await self._move_files_to_final_location( audio_file, thumbnail_file, extraction_title, extraction_service, extraction_service_id, ) ) # Create Sound record sound = await self._create_sound_record( final_audio_path, final_thumbnail_path, extraction_title, extraction_service, extraction_service_id, ) if not sound.id: msg = "Sound creation failed - no ID returned" raise RuntimeError(msg) return sound.id async def _complete_extraction( self, extraction: Extraction, context: ExtractionContext, sound_id: int, ) -> None: """Complete extraction processing.""" # Normalize the sound await self._normalize_sound(sound_id) # Add to main playlist await self._add_to_main_playlist(sound_id, context.user_id) # Update extraction with success await self.extraction_repo.update( extraction, { "status": "completed", "sound_id": sound_id, "error": None, }, ) # Emit WebSocket event for completion await self._emit_extraction_event( context.user_id, { "extraction_id": context.extraction_id, "status": "completed", "title": context.extraction_title, "url": context.extraction_url, "sound_id": sound_id, }, ) async def process_extraction(self, extraction_id: int) -> ExtractionInfo: """Process an extraction job.""" # Validate extraction and get context data ( extraction, user_id, extraction_url, extraction_service, extraction_service_id, extraction_title, user_name, ) = await self._validate_extraction(extraction_id) # Create context object for helper methods context = ExtractionContext( extraction_id=extraction_id, extraction_url=extraction_url, extraction_service=extraction_service, extraction_service_id=extraction_service_id, extraction_title=extraction_title, user_id=user_id, ) logger.info("Processing extraction %d: %s", extraction_id, extraction_url) try: # Update status to processing await self.extraction_repo.update(extraction, {"status": "processing"}) # Emit WebSocket event for processing start await self._emit_extraction_event( context.user_id, { "extraction_id": context.extraction_id, "status": "processing", "title": context.extraction_title or "Processing extraction...", "url": context.extraction_url, }, ) # Handle service detection and duplicate checking extraction_service, extraction_service_id, extraction_title = ( await self._handle_service_detection(extraction, context) ) # Update context with potentially new values context.extraction_service = extraction_service context.extraction_service_id = extraction_service_id context.extraction_title = extraction_title # Process media files and create sound record sound_id = await self._process_media_files( context.extraction_id, context.extraction_url, context.extraction_title, extraction_service, extraction_service_id, ) # Complete extraction processing await self._complete_extraction(extraction, context, sound_id) logger.info("Successfully processed extraction %d", context.extraction_id) # Get updated extraction to get latest timestamps updated_extraction = await self.extraction_repo.get_by_id( context.extraction_id, ) return { "id": context.extraction_id, "url": context.extraction_url, "service": extraction_service, "service_id": extraction_service_id, "title": extraction_title, "status": "completed", "error": None, "sound_id": sound_id, "user_id": context.user_id, "user_name": user_name, "created_at": ( updated_extraction.created_at.isoformat() if updated_extraction else "" ), "updated_at": ( updated_extraction.updated_at.isoformat() if updated_extraction else "" ), } except Exception as e: error_msg = str(e) logger.exception( "Failed to process extraction %d: %s", context.extraction_id, error_msg, ) # Emit WebSocket event for failure await self._emit_extraction_event( context.user_id, { "extraction_id": context.extraction_id, "status": "failed", "title": context.extraction_title or "Extraction failed", "url": context.extraction_url, "error": error_msg, }, ) # Update extraction with error await self.extraction_repo.update( extraction, { "status": "failed", "error": error_msg, }, ) # Get updated extraction to get latest timestamps updated_extraction = await self.extraction_repo.get_by_id( context.extraction_id, ) return { "id": context.extraction_id, "url": context.extraction_url, "service": context.extraction_service, "service_id": context.extraction_service_id, "title": context.extraction_title, "status": "failed", "error": error_msg, "sound_id": None, "user_id": context.user_id, "user_name": user_name, "created_at": ( updated_extraction.created_at.isoformat() if updated_extraction else "" ), "updated_at": ( updated_extraction.updated_at.isoformat() if updated_extraction else "" ), } async def _extract_media( self, extraction_id: int, extraction_url: str, ) -> tuple[Path, Path | None]: """Extract audio and thumbnail using yt-dlp.""" temp_dir = Path(settings.EXTRACTION_TEMP_DIR) # Create unique filename based on extraction ID output_template = str( temp_dir / f"extraction_{extraction_id}_%(title)s.%(ext)s", ) # Configure yt-dlp options ydl_opts = { "format": "bestaudio/best", "outtmpl": output_template, "extractaudio": True, "audioformat": settings.EXTRACTION_AUDIO_FORMAT, "audioquality": settings.EXTRACTION_AUDIO_BITRATE, "writethumbnail": True, "writeinfojson": False, "writeautomaticsub": False, "writesubtitles": False, "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": settings.EXTRACTION_AUDIO_FORMAT, "preferredquality": settings.EXTRACTION_AUDIO_BITRATE.rstrip("k"), }, ], } def _download_media() -> None: with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Download and extract ydl.download([extraction_url]) try: # Run the blocking download operation in a thread pool await asyncio.to_thread(_download_media) # Find the extracted files audio_files = list( temp_dir.glob( f"extraction_{extraction_id}_*.{settings.EXTRACTION_AUDIO_FORMAT}", ), ) thumbnail_files = ( list(temp_dir.glob(f"extraction_{extraction_id}_*.webp")) + list(temp_dir.glob(f"extraction_{extraction_id}_*.jpg")) + list(temp_dir.glob(f"extraction_{extraction_id}_*.png")) ) if not audio_files: msg = "No audio file was created during extraction" raise RuntimeError(msg) audio_file = audio_files[0] thumbnail_file = thumbnail_files[0] if thumbnail_files else None logger.info( "Extracted audio: %s, thumbnail: %s", audio_file, thumbnail_file or "None", ) except Exception as e: logger.exception("yt-dlp extraction failed for %s", extraction_url) error_msg = f"Audio extraction failed: {e}" raise RuntimeError(error_msg) from e else: return audio_file, thumbnail_file async def _move_files_to_final_location( self, audio_file: Path, thumbnail_file: Path | None, title: str | None, service: str | None, service_id: str | None, ) -> tuple[Path, Path | None]: """Move extracted files to their final locations.""" # Generate clean filename based on title and service safe_title = self._sanitize_filename( title or f"{service or 'unknown'}_{service_id or 'unknown'}", ) # Move audio file final_audio_path = ( Path("sounds/originals/extracted") / f"{safe_title}.{settings.EXTRACTION_AUDIO_FORMAT}" ) final_audio_path = self._ensure_unique_filename(final_audio_path) shutil.move(str(audio_file), str(final_audio_path)) logger.info("Moved audio file to: %s", final_audio_path) # Move thumbnail file if it exists final_thumbnail_path = None if thumbnail_file: thumbnail_ext = thumbnail_file.suffix final_thumbnail_path = ( Path(settings.EXTRACTION_THUMBNAILS_DIR) / f"{safe_title}{thumbnail_ext}" ) final_thumbnail_path = self._ensure_unique_filename(final_thumbnail_path) shutil.move(str(thumbnail_file), str(final_thumbnail_path)) logger.info("Moved thumbnail file to: %s", final_thumbnail_path) return final_audio_path, final_thumbnail_path def _sanitize_filename(self, filename: str) -> str: """Sanitize filename for filesystem.""" # Remove or replace problematic characters invalid_chars = '<>:"/\\|?*' for char in invalid_chars: filename = filename.replace(char, "_") # Limit length and remove leading/trailing spaces filename = filename.strip()[:100] return filename or "untitled" def _ensure_unique_filename(self, filepath: Path) -> Path: """Ensure filename is unique by adding counter if needed.""" if not filepath.exists(): return filepath stem = filepath.stem suffix = filepath.suffix parent = filepath.parent counter = 1 while True: new_path = parent / f"{stem}_{counter}{suffix}" if not new_path.exists(): return new_path counter += 1 async def _create_sound_record( self, audio_path: Path, thumbnail_path: Path | None, title: str | None, service: str | None, service_id: str | None, ) -> Sound: """Create a Sound record for the extracted audio.""" # Get audio metadata duration = get_audio_duration(audio_path) size = get_file_size(audio_path) file_hash = get_file_hash(audio_path) # Create sound data sound_data = { "type": "EXT", "name": title or f"{service or 'unknown'}_{service_id or 'unknown'}", "filename": audio_path.name, "duration": duration, "size": size, "hash": file_hash, "thumbnail": thumbnail_path.name if thumbnail_path else None, "is_deletable": True, # Extracted sounds can be deleted "is_music": True, # Assume extracted content is music "is_normalized": False, "play_count": 0, } sound = await self.sound_repo.create(sound_data) logger.info( "Created sound record with ID: %d, thumbnail: %s", sound.id, thumbnail_path.name if thumbnail_path else "None", ) return sound async def _normalize_sound(self, sound_id: int) -> None: """Normalize the extracted sound.""" try: # Get fresh sound object from database for normalization sound = await self.sound_repo.get_by_id(sound_id) if not sound: logger.warning("Sound %d not found for normalization", sound_id) return normalizer_service = SoundNormalizerService(self.session) result = await normalizer_service.normalize_sound(sound) if result["status"] == "error": logger.warning( "Failed to normalize sound %d: %s", sound_id, result.get("error"), ) else: logger.info("Successfully normalized sound %d", sound_id) except Exception: logger.exception("Error normalizing sound %d", sound_id) # Don't fail the extraction if normalization fails async def _add_to_main_playlist(self, sound_id: int, user_id: int) -> None: """Add the sound to the user's main playlist.""" try: await self.playlist_service._add_sound_to_main_playlist_internal( # noqa: SLF001 sound_id, user_id, ) logger.info( "Added sound %d to main playlist for user %d", sound_id, user_id, ) except Exception: logger.exception( "Error adding sound %d to main playlist for user %d", sound_id, user_id, ) # Don't fail the extraction if playlist addition fails async def _emit_extraction_event(self, user_id: int, data: dict) -> None: """Emit WebSocket event for extraction status updates to all users.""" try: # Import here to avoid circular imports from app.services.socket import socket_manager # noqa: PLC0415 await socket_manager.broadcast_to_all("extraction_status_update", data) logger.debug( "Broadcasted extraction event (initiated by user %d): %s", user_id, data["status"], ) except Exception: logger.exception("Failed to emit extraction event") async def get_extraction_by_id(self, extraction_id: int) -> ExtractionInfo | None: """Get extraction information by ID.""" extraction = await self.extraction_repo.get_by_id(extraction_id) if not extraction: return None # Get user information user = await self.user_repo.get_by_id(extraction.user_id) user_name = user.name if user else None return { "id": extraction.id or 0, # Should never be None for existing extraction "url": extraction.url, "service": extraction.service, "service_id": extraction.service_id, "title": extraction.title, "status": extraction.status, "error": extraction.error, "sound_id": extraction.sound_id, "user_id": extraction.user_id, "user_name": user_name, "created_at": extraction.created_at.isoformat(), "updated_at": extraction.updated_at.isoformat(), } async def get_user_extractions( # noqa: PLR0913 self, user_id: int, search: str | None = None, sort_by: str = "created_at", sort_order: str = "desc", status_filter: str | None = None, page: int = 1, limit: int = 50, ) -> PaginatedExtractionsResponse: """Get all extractions for a user with filtering, search, and sorting.""" offset = (page - 1) * limit ( extraction_user_tuples, total_count, ) = await self.extraction_repo.get_user_extractions_filtered( user_id=user_id, search=search, sort_by=sort_by, sort_order=sort_order, status_filter=status_filter, limit=limit, offset=offset, ) extractions = [ { "id": extraction.id or 0, # Should never be None for existing extraction "url": extraction.url, "service": extraction.service, "service_id": extraction.service_id, "title": extraction.title, "status": extraction.status, "error": extraction.error, "sound_id": extraction.sound_id, "user_id": extraction.user_id, "user_name": user.name, "created_at": extraction.created_at.isoformat(), "updated_at": extraction.updated_at.isoformat(), } for extraction, user in extraction_user_tuples ] total_pages = (total_count + limit - 1) // limit # Ceiling division return { "extractions": extractions, "total": total_count, "page": page, "limit": limit, "total_pages": total_pages, } async def get_all_extractions( # noqa: PLR0913 self, search: str | None = None, sort_by: str = "created_at", sort_order: str = "desc", status_filter: str | None = None, page: int = 1, limit: int = 50, ) -> PaginatedExtractionsResponse: """Get all extractions with filtering, search, and sorting.""" offset = (page - 1) * limit ( extraction_user_tuples, total_count, ) = await self.extraction_repo.get_all_extractions_filtered( search=search, sort_by=sort_by, sort_order=sort_order, status_filter=status_filter, limit=limit, offset=offset, ) extractions = [ { "id": extraction.id or 0, # Should never be None for existing extraction "url": extraction.url, "service": extraction.service, "service_id": extraction.service_id, "title": extraction.title, "status": extraction.status, "error": extraction.error, "sound_id": extraction.sound_id, "user_id": extraction.user_id, "user_name": user.name, "created_at": extraction.created_at.isoformat(), "updated_at": extraction.updated_at.isoformat(), } for extraction, user in extraction_user_tuples ] total_pages = (total_count + limit - 1) // limit # Ceiling division return { "extractions": extractions, "total": total_count, "page": page, "limit": limit, "total_pages": total_pages, } async def get_pending_extractions(self) -> list[ExtractionInfo]: """Get all pending extractions.""" extraction_user_tuples = await self.extraction_repo.get_pending_extractions() return [ { "id": extraction.id or 0, # Should never be None for existing extraction "url": extraction.url, "service": extraction.service, "service_id": extraction.service_id, "title": extraction.title, "status": extraction.status, "error": extraction.error, "sound_id": extraction.sound_id, "user_id": extraction.user_id, "user_name": user.name, "created_at": extraction.created_at.isoformat(), "updated_at": extraction.updated_at.isoformat(), } for extraction, user in extraction_user_tuples ] async def delete_extraction( self, extraction_id: int, user_id: int | None = None, ) -> bool: """Delete an extraction and its associated sound and files. Args: extraction_id: The ID of the extraction to delete user_id: Optional user ID for ownership verification (None for admin) Returns: True if deletion was successful, False if extraction not found Raises: ValueError: If user doesn't own the extraction (when user_id is provided) """ logger.info( "Deleting extraction: %d (user: %s)", extraction_id, user_id or "admin", ) # Get the extraction record extraction = await self.extraction_repo.get_by_id(extraction_id) if not extraction: logger.warning("Extraction %d not found", extraction_id) return False # Check ownership if user_id is provided (non-admin request) if user_id is not None and extraction.user_id != user_id: msg = "You don't have permission to delete this extraction" raise ValueError(msg) # Get associated sound if it exists and capture its attributes immediately sound_data = None sound_object = None if extraction.sound_id: sound_object = await self.sound_repo.get_by_id(extraction.sound_id) if sound_object: # Capture attributes immediately while session is valid sound_data = { "id": sound_object.id, "type": sound_object.type, "filename": sound_object.filename, "is_normalized": sound_object.is_normalized, "normalized_filename": sound_object.normalized_filename, "thumbnail": sound_object.thumbnail, } try: # Delete the extraction record first await self.extraction_repo.delete(extraction) logger.info("Deleted extraction record: %d", extraction_id) # Check if sound was in current playlist before deletion sound_was_in_current_playlist = False if sound_object and sound_data: sound_was_in_current_playlist = ( await self._check_sound_in_current_playlist(sound_data["id"]) ) # If there's an associated sound, delete it and its files if sound_object and sound_data: await self._delete_sound_and_files(sound_object, sound_data) logger.info( "Deleted associated sound: %d (%s)", sound_data["id"], sound_data["filename"], ) # Commit the transaction await self.session.commit() # Reload player playlist if deleted sound was in current playlist if sound_was_in_current_playlist and sound_data: await self._reload_player_playlist() logger.info( "Reloaded player playlist after deleting sound %d " "from current playlist", sound_data["id"], ) except Exception: # Rollback on any error await self.session.rollback() logger.exception("Failed to delete extraction %d", extraction_id) raise else: return True async def _delete_sound_and_files( self, sound: Sound, sound_data: dict[str, Any], ) -> None: """Delete a sound record and all its associated files.""" # Collect all file paths to delete using captured attributes files_to_delete = [] # Original audio file if sound_data["type"] == "EXT": # Extracted sounds original_path = Path("sounds/originals/extracted") / sound_data["filename"] if original_path.exists(): files_to_delete.append(original_path) # Normalized file if sound_data["is_normalized"] and sound_data["normalized_filename"]: normalized_path = ( Path("sounds/normalized/extracted") / sound_data["normalized_filename"] ) if normalized_path.exists(): files_to_delete.append(normalized_path) # Thumbnail file if sound_data["thumbnail"]: thumbnail_path = ( Path(settings.EXTRACTION_THUMBNAILS_DIR) / sound_data["thumbnail"] ) if thumbnail_path.exists(): files_to_delete.append(thumbnail_path) # Delete the sound from database first await self.sound_repo.delete(sound) # Delete all associated files for file_path in files_to_delete: try: file_path.unlink() logger.info("Deleted file: %s", file_path) except OSError: logger.exception("Failed to delete file %s", file_path) # Continue with other files even if one fails async def _check_sound_in_current_playlist(self, sound_id: int) -> bool: """Check if a sound is in the current playlist.""" try: from app.repositories.playlist import PlaylistRepository # noqa: PLC0415 playlist_repo = PlaylistRepository(self.session) current_playlist = await playlist_repo.get_current_playlist() if not current_playlist or not current_playlist.id: return False return await playlist_repo.is_sound_in_playlist( current_playlist.id, sound_id, ) except (ImportError, AttributeError, ValueError, RuntimeError) as e: logger.warning( "Failed to check if sound %s is in current playlist: %s", sound_id, e, exc_info=True, ) return False async def _reload_player_playlist(self) -> None: """Reload the player playlist after a sound is deleted.""" try: # Import here to avoid circular import issues from app.services.player import get_player_service # noqa: PLC0415 player = get_player_service() await player.reload_playlist() logger.debug("Player playlist reloaded after sound deletion") except (ImportError, AttributeError, ValueError, RuntimeError) as e: # Don't fail the deletion operation if player reload fails logger.warning("Failed to reload player playlist: %s", e, exc_info=True)