"""Stream processing service with queue management and yt-dlp integration.""" import hashlib import os import re import shutil import threading import time from queue import Empty, Queue from typing import Dict, List, Optional from urllib.parse import parse_qs, urlparse from app.database import db from app.models.sound import Sound from app.models.stream import Stream from app.services.logging_service import LoggingService # Configure logging logger = LoggingService.get_logger(__name__) class StreamProcessingService: """Service for processing streaming URLs with yt-dlp.""" # Class variables for queue management _processing_queue: Queue = Queue() _processing_threads: List[threading.Thread] = [] _is_running: bool = False _max_concurrent_downloads: int = int( os.getenv("STREAM_MAX_CONCURRENT", "2") ) _downloads_dir: str = "sounds/temp" @classmethod def initialize(cls) -> None: """Initialize the stream processing service.""" if cls._is_running: return # Create necessary directories os.makedirs(cls._downloads_dir, exist_ok=True) os.makedirs("sounds/stream", exist_ok=True) os.makedirs("sounds/stream/thumbnails", exist_ok=True) # Start processing threads for i in range(cls._max_concurrent_downloads): thread = threading.Thread( target=cls._worker_thread, name=f"StreamProcessor-{i + 1}", daemon=True, ) thread.start() cls._processing_threads.append(thread) cls._is_running = True logger.info( f"StreamProcessingService initialized with {cls._max_concurrent_downloads} workers" ) @classmethod def add_to_queue(cls, stream_id: int) -> None: """Add a stream to the processing queue.""" if not cls._is_running: cls.initialize() cls._processing_queue.put(stream_id) logger.info(f"Added stream {stream_id} to processing queue") @classmethod def get_queue_status(cls) -> Dict: """Get the current queue status.""" pending_count = Stream.query.filter_by(status="pending").count() processing_count = Stream.query.filter_by(status="processing").count() return { "queue_size": cls._processing_queue.qsize(), "pending_streams": pending_count, "processing_streams": processing_count, "max_concurrent": cls._max_concurrent_downloads, "is_running": cls._is_running, } @classmethod def _worker_thread(cls) -> None: """Worker thread for processing streams.""" from app import create_app # Create app context for database operations app = create_app() while True: try: # Get stream ID from queue with timeout stream_id = cls._processing_queue.get(timeout=1) with app.app_context(): cls._process_stream(stream_id) cls._processing_queue.task_done() except Empty: # No items in queue, continue continue except Exception as e: logger.error(f"Error in worker thread: {e}") continue @classmethod def _process_stream(cls, stream_id: int) -> None: """Process a single stream.""" try: stream = Stream.query.get(stream_id) if not stream: logger.error(f"Stream {stream_id} not found") return if stream.status == "cancelled": logger.info(f"Stream {stream_id} was cancelled") return # Update status to processing stream.status = "processing" db.session.commit() logger.info( f"Starting processing of stream {stream_id}: {stream.url}" ) # Extract metadata and download audio metadata, error_msg = cls._extract_metadata(stream.url) if not metadata: if not error_msg: error_msg = "Failed to extract metadata from URL" stream.status = "failed" stream.error = error_msg db.session.commit() logger.error( f"Failed to extract metadata for stream {stream_id}: {error_msg}" ) return # Check for duplicate streams based on service and service_id service = metadata.get("service") service_id = metadata.get("service_id") if service and service_id: existing_stream = ( Stream.query.filter_by( service=service, service_id=service_id ) .filter(Stream.id != stream.id) .first() ) if existing_stream: error_msg = f"Stream already exists with {service} ID: {service_id} (stream #{existing_stream.id})" stream.status = "failed" stream.error = error_msg db.session.commit() logger.error( f"Duplicate stream detected for {stream_id}: {error_msg}" ) return # Update stream with metadata cls._update_stream_metadata(stream, metadata) # Download audio audio_path, error_msg = cls._download_audio(stream.url, metadata) if not audio_path: if not error_msg: error_msg = "Failed to download audio from URL" stream.status = "failed" stream.error = error_msg db.session.commit() logger.error( f"Failed to download audio for stream {stream_id}: {error_msg}" ) return # Move files to final locations final_audio_path, thumbnail_path, error_msg = ( cls._move_files_to_final_location(audio_path, metadata) ) if not final_audio_path: if not error_msg: error_msg = "Failed to move files to final location" stream.status = "failed" stream.error = error_msg db.session.commit() logger.error( f"Failed to move files for stream {stream_id}: {error_msg}" ) return # Create sound entry with final path sound, error_msg = cls._create_sound_entry( final_audio_path, metadata, thumbnail_path ) if not sound: if not error_msg: error_msg = "Failed to create sound entry in database" stream.status = "failed" stream.error = error_msg db.session.commit() logger.error( f"Failed to create sound entry for stream {stream_id}: {error_msg}" ) return # Update stream with sound_id and mark as completed stream.sound_id = sound.id stream.status = "completed" stream.error = None # Clear any previous errors db.session.commit() logger.info( f"Successfully processed stream {stream_id} -> sound {sound.id}" ) except Exception as e: error_msg = f"Unexpected error during processing: {str(e)}" logger.error(f"Error processing stream {stream_id}: {error_msg}") try: stream = Stream.query.get(stream_id) if stream: stream.status = "failed" stream.error = error_msg db.session.commit() except Exception as db_error: logger.error( f"Failed to update stream error in database: {db_error}" ) @classmethod def _extract_metadata( cls, url: str ) -> tuple[Optional[Dict], Optional[str]]: """Extract metadata from URL using yt-dlp.""" try: import yt_dlp ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": False, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) # Extract service information service = cls._detect_service(url) service_id = cls._extract_service_id(url, info) metadata = { "service": info.get("extractor", service), "service_id": info.get("id", service_id), "title": info.get("title", ""), "track": info.get("track", ""), "artist": info.get("artist", "") or info.get("uploader", ""), "album": info.get("album", ""), "genre": info.get("genre", ""), "duration": info.get("duration", 0), "description": info.get("description", ""), } return metadata, None except Exception as e: error_msg = f"yt-dlp extraction failed: {str(e)}" logger.error(f"Error extracting metadata from {url}: {error_msg}") return None, error_msg @classmethod def _download_audio( cls, url: str, metadata: Dict ) -> tuple[Optional[str], Optional[str]]: """Download audio from URL using yt-dlp.""" try: import yt_dlp # Generate filename title = metadata.get("title", "unknown") safe_title = re.sub(r"[^\w\s-]", "", title)[:50] filename = f"{safe_title}_{metadata.get('service_id', 'unknown')}" output_path = os.path.join( cls._downloads_dir, f"{filename}.%(ext)s" ) ydl_opts = { "format": "bestaudio/best", "outtmpl": output_path, "extractaudio": True, "audioformat": "opus", "audioquality": "192", "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "opus", "preferredquality": "192", } ], "writethumbnail": True, "quiet": True, "no_warnings": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Find the downloaded file final_path = os.path.join(cls._downloads_dir, f"{filename}.opus") if os.path.exists(final_path): return final_path, None # If opus doesn't exist, look for other formats for ext in ["mp3", "wav", "m4a", "webm", "ogg"]: alt_path = os.path.join(cls._downloads_dir, f"{filename}.{ext}") if os.path.exists(alt_path): return alt_path, None error_msg = f"Downloaded file not found after yt-dlp processing" logger.error(f"Downloaded file not found for {url}") return None, error_msg except Exception as e: error_msg = f"yt-dlp download failed: {str(e)}" logger.error(f"Error downloading audio from {url}: {error_msg}") return None, error_msg @classmethod def _move_files_to_final_location( cls, audio_path: str, metadata: dict ) -> tuple[Optional[str], Optional[str], Optional[str]]: """Move downloaded files to their final locations. Returns: tuple: (final_audio_path, thumbnail_path, error_message) """ try: # Create target directories stream_dir = "sounds/stream" thumbnail_dir = "sounds/stream/thumbnails" os.makedirs(stream_dir, exist_ok=True) os.makedirs(thumbnail_dir, exist_ok=True) # Generate safe filename title = metadata.get("title", "unknown") safe_title = re.sub(r"[^\w\s-]", "", title)[:50] service_id = metadata.get("service_id", "unknown") base_filename = f"{safe_title}_{service_id}" # Move audio file audio_extension = os.path.splitext(audio_path)[1] final_audio_filename = f"{base_filename}{audio_extension}" final_audio_path = os.path.join(stream_dir, final_audio_filename) # If file already exists, add a counter counter = 1 while os.path.exists(final_audio_path): final_audio_filename = ( f"{base_filename}_{counter}{audio_extension}" ) final_audio_path = os.path.join( stream_dir, final_audio_filename ) counter += 1 shutil.move(audio_path, final_audio_path) logger.info(f"Moved audio file to: {final_audio_path}") # Look for and move thumbnail thumbnail_path = None temp_dir = os.path.dirname(audio_path) # Common thumbnail extensions for thumb_ext in [".jpg", ".jpeg", ".png", ".webp", ".gif"]: temp_thumb_path = os.path.join( temp_dir, f"{os.path.splitext(os.path.basename(audio_path))[0]}{thumb_ext}", ) if os.path.exists(temp_thumb_path): final_thumb_filename = f"{base_filename}{thumb_ext}" final_thumb_path = os.path.join( thumbnail_dir, final_thumb_filename ) # Handle duplicate thumbnail names thumb_counter = 1 while os.path.exists(final_thumb_path): final_thumb_filename = ( f"{base_filename}_{thumb_counter}{thumb_ext}" ) final_thumb_path = os.path.join( thumbnail_dir, final_thumb_filename ) thumb_counter += 1 shutil.move(temp_thumb_path, final_thumb_path) thumbnail_path = final_thumb_path logger.info(f"Moved thumbnail to: {final_thumb_path}") break return final_audio_path, thumbnail_path, None except Exception as e: error_msg = f"File move operation failed: {str(e)}" logger.error(f"Error moving files: {error_msg}") return None, None, error_msg @classmethod def _create_sound_entry( cls, audio_path: str, metadata: dict, thumbnail_path: str | None = None, ) -> tuple[Sound | None, str | None]: """Create a sound entry from the downloaded audio.""" try: # Get file info file_size = os.path.getsize(audio_path) # Generate hash file_hash = cls._calculate_file_hash(audio_path) # Get duration (use metadata duration or calculate from file) duration_ms = int((metadata.get("duration", 0) or 0) * 1000) # Get thumbnail filename if available thumbnail_filename = None if thumbnail_path: thumbnail_filename = os.path.basename(thumbnail_path) # Create sound entry sound = Sound( type="STR", # Stream type name=metadata.get("title", "Unknown Title"), filename=os.path.basename(audio_path), thumbnail=thumbnail_filename, duration=duration_ms, size=file_size, hash=file_hash, is_music=True, # Streams are typically music is_deletable=True, ) db.session.add(sound) db.session.commit() # Add sound to main playlist cls._add_sound_to_main_playlist(sound) # Normalize the sound cls._normalize_sound(sound) return sound, None except Exception as e: error_msg = f"Database error while creating sound entry: {str(e)}" logger.error( f"Error creating sound entry for {audio_path}: {error_msg}" ) return None, error_msg @classmethod def _update_stream_metadata(cls, stream: Stream, metadata: Dict) -> None: """Update stream with extracted metadata.""" stream.service = metadata.get("service") stream.service_id = metadata.get("service_id") stream.title = metadata.get("title") stream.track = metadata.get("track") stream.artist = metadata.get("artist") stream.album = metadata.get("album") stream.genre = metadata.get("genre") db.session.commit() @classmethod def _detect_service(cls, url: str) -> str: """Detect the streaming service from URL.""" domain = urlparse(url).netloc.lower() if "youtube.com" in domain or "youtu.be" in domain: return "youtube" elif "soundcloud.com" in domain: return "soundcloud" elif "dailymotion.com" in domain: return "dailymotion" elif "spotify.com" in domain: return "spotify" elif "vimeo.com" in domain: return "vimeo" elif "twitch.tv" in domain: return "twitch" else: return "unknown" @classmethod def _extract_service_id(cls, url: str, info: Dict) -> str: """Extract service-specific ID from URL or info.""" service = cls._detect_service(url) if service == "youtube": # Try to get from info first if "id" in info: return info["id"] # Parse from URL parsed = urlparse(url) if "youtu.be" in parsed.netloc: return parsed.path[1:] # Remove leading slash elif "youtube.com" in parsed.netloc: query_params = parse_qs(parsed.query) return query_params.get("v", [""])[0] elif service == "soundcloud": if "id" in info: return str(info["id"]) # Fallback to using info ID or last part of URL if "id" in info: return str(info["id"]) return urlparse(url).path.split("/")[-1] or "unknown" @classmethod def _add_sound_to_main_playlist(cls, sound: Sound) -> None: """Add a sound to the main playlist.""" try: from app.models.playlist import Playlist # Find the main playlist main_playlist = Playlist.find_main_playlist() if main_playlist: # Add sound to the main playlist main_playlist.add_sound(sound.id, commit=True) logger.info(f"Added sound {sound.id} to main playlist") else: logger.warning("Main playlist not found - sound not added to any playlist") except Exception as e: logger.error(f"Failed to add sound {sound.id} to main playlist: {e}") @classmethod def _normalize_sound(cls, sound: Sound) -> None: """Normalize a stream sound using the sound normalizer service.""" try: from app.services.sound_normalizer_service import SoundNormalizerService logger.info(f"Starting normalization of stream sound {sound.id}: {sound.name}") # Normalize the sound (overwrite=True since it's a new sound) result = SoundNormalizerService.normalize_sound( sound.id, overwrite=True, two_pass=True ) if result.get("success"): logger.info(f"Successfully normalized stream sound {sound.id}") else: error_msg = result.get("error", "Unknown normalization error") logger.warning(f"Failed to normalize stream sound {sound.id}: {error_msg}") except Exception as e: logger.error(f"Error normalizing stream sound {sound.id}: {e}") @classmethod def _calculate_file_hash(cls, file_path: str) -> str: """Calculate SHA256 hash of file.""" sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): sha256_hash.update(chunk) return sha256_hash.hexdigest()