From 4f18f3e64e3bb0ad9f1de7780a56cdee901caed2 Mon Sep 17 00:00:00 2001 From: JSC Date: Sun, 6 Jul 2025 16:57:33 +0200 Subject: [PATCH] feat(stream): implement stream processing service with routes for managing streaming URLs; add support for concurrent processing and metadata extraction --- .env.example | 5 +- app/__init__.py | 12 +- app/models/sound.py | 6 + app/models/stream.py | 11 +- app/routes/admin.py | 6 +- app/routes/admin_sounds.py | 7 +- app/routes/soundboard.py | 2 +- app/routes/stream.py | 210 +++++++++ app/services/stream_processing_service.py | 529 ++++++++++++++++++++++ pyproject.toml | 1 + uv.lock | 11 + 11 files changed, 786 insertions(+), 14 deletions(-) create mode 100644 app/routes/stream.py create mode 100644 app/services/stream_processing_service.py diff --git a/.env.example b/.env.example index 7116651..9e0799c 100644 --- a/.env.example +++ b/.env.example @@ -16,4 +16,7 @@ GOOGLE_CLIENT_SECRET=your_google_client_secret_here # GitHub OAuth GITHUB_CLIENT_ID=your_github_client_id_here -GITHUB_CLIENT_SECRET=your_github_client_secret_here \ No newline at end of file +GITHUB_CLIENT_SECRET=your_github_client_secret_here + +# Stream Processing Configuration +STREAM_MAX_CONCURRENT=2 \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py index 39f10b5..38e18b1 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -82,14 +82,20 @@ def create_app(): # Start scheduler for background tasks scheduler_service.start() + # Initialize stream processing service + from app.services.stream_processing_service import StreamProcessingService + + StreamProcessingService.initialize() + # Register blueprints - from app.routes import admin, admin_sounds, auth, main, soundboard + from app.routes import admin, admin_sounds, auth, main, soundboard, stream app.register_blueprint(main.bp, url_prefix="/api") app.register_blueprint(auth.bp, url_prefix="/api/auth") app.register_blueprint(admin.bp, url_prefix="/api/admin") - app.register_blueprint(admin_sounds.bp) - app.register_blueprint(soundboard.bp) + app.register_blueprint(admin_sounds.bp, url_prefix="/api/admin/sounds") + app.register_blueprint(soundboard.bp, url_prefix="/api/soundboard") + app.register_blueprint(stream.bp, url_prefix="/api/streams") # Shutdown scheduler when app is torn down @app.teardown_appcontext diff --git a/app/models/sound.py b/app/models/sound.py index f207083..a4139f4 100644 --- a/app/models/sound.py +++ b/app/models/sound.py @@ -36,6 +36,9 @@ class Sound(db.Model): # Basic sound information name: Mapped[str] = mapped_column(String(255), nullable=False) filename: Mapped[str] = mapped_column(String(500), nullable=False) + thumbnail: Mapped[str | None] = mapped_column( + String(500), nullable=True + ) # Thumbnail filename duration: Mapped[int] = mapped_column(Integer, nullable=False) size: Mapped[int] = mapped_column(Integer, nullable=False) # Size in bytes hash: Mapped[str] = mapped_column(String(64), nullable=False) # SHA256 hash @@ -114,6 +117,7 @@ class Sound(db.Model): "type": self.type, "name": self.name, "filename": self.filename, + "thumbnail": self.thumbnail, "duration": self.duration, "size": self.size, "hash": self.hash, @@ -217,6 +221,7 @@ class Sound(db.Model): duration: float, size: int, hash_value: str, + thumbnail: Optional[str] = None, is_music: bool = False, is_deletable: bool = True, commit: bool = True, @@ -230,6 +235,7 @@ class Sound(db.Model): type=sound_type, name=name, filename=filename, + thumbnail=thumbnail, duration=duration, size=size, hash=hash_value, diff --git a/app/models/stream.py b/app/models/stream.py index 379515c..27f266f 100644 --- a/app/models/stream.py +++ b/app/models/stream.py @@ -4,7 +4,7 @@ from datetime import datetime from typing import TYPE_CHECKING, Optional from zoneinfo import ZoneInfo -from sqlalchemy import DateTime, ForeignKey, Integer, String, Text +from sqlalchemy import DateTime, ForeignKey, Integer, String, Text, UniqueConstraint from sqlalchemy.orm import Mapped, mapped_column, relationship from app.database import db @@ -33,6 +33,7 @@ class Stream(db.Model): status: Mapped[str] = mapped_column( String(50), nullable=False, default="pending" ) + error: Mapped[str | None] = mapped_column(Text, nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime, default=lambda: datetime.now(tz=ZoneInfo("UTC")), @@ -48,6 +49,13 @@ class Stream(db.Model): # Relationships sound: Mapped["Sound"] = relationship("Sound", back_populates="streams") + # Constraints + __table_args__ = ( + UniqueConstraint( + "service", "service_id", name="unique_service_stream" + ), + ) + def __repr__(self) -> str: """String representation of the stream.""" return f"" @@ -66,6 +74,7 @@ class Stream(db.Model): "album": self.album, "genre": self.genre, "status": self.status, + "error": self.error, "created_at": ( self.created_at.isoformat() if self.created_at else None ), diff --git a/app/routes/admin.py b/app/routes/admin.py index 2919ceb..4c3ed06 100644 --- a/app/routes/admin.py +++ b/app/routes/admin.py @@ -2,11 +2,7 @@ from flask import Blueprint, request -from app.services.decorators import ( - get_current_user, - require_auth, - require_role, -) +from app.services.decorators import get_current_user, require_auth, require_role from app.services.scheduler_service import scheduler_service from app.services.sound_normalizer_service import SoundNormalizerService from app.services.sound_scanner_service import SoundScannerService diff --git a/app/routes/admin_sounds.py b/app/routes/admin_sounds.py index 8b38daa..db9acc2 100644 --- a/app/routes/admin_sounds.py +++ b/app/routes/admin_sounds.py @@ -2,16 +2,17 @@ from flask import Blueprint, jsonify, request -from app.services.decorators import require_admin +from app.services.decorators import require_admin, require_auth, require_role from app.services.error_handling_service import ErrorHandlingService from app.services.sound_normalizer_service import SoundNormalizerService from app.services.sound_scanner_service import SoundScannerService -bp = Blueprint("admin_sounds", __name__, url_prefix="/api/admin/sounds") +bp = Blueprint("admin_sounds", __name__) @bp.route("/scan", methods=["POST"]) -@require_admin +@require_auth +@require_role("admin") def scan_sounds(): """Manually trigger sound scanning.""" return ErrorHandlingService.wrap_service_call( diff --git a/app/routes/soundboard.py b/app/routes/soundboard.py index 1830f70..215664b 100644 --- a/app/routes/soundboard.py +++ b/app/routes/soundboard.py @@ -11,7 +11,7 @@ from app.services.decorators import ( ) from app.services.vlc_service import vlc_service -bp = Blueprint("soundboard", __name__, url_prefix="/api/soundboard") +bp = Blueprint("soundboard", __name__) @bp.route("/sounds", methods=["GET"]) diff --git a/app/routes/stream.py b/app/routes/stream.py new file mode 100644 index 0000000..095c14b --- /dev/null +++ b/app/routes/stream.py @@ -0,0 +1,210 @@ +"""Stream routes for managing streaming service links.""" + +from flask import Blueprint, jsonify, request + +from app.database import db +from app.models.stream import Stream +from app.services.decorators import require_auth + +bp = Blueprint("stream", __name__) + + +@bp.route("/add-url", methods=["POST"]) +@require_auth +def add_url(): + """Add a URL to the stream processing queue.""" + try: + data = request.get_json() + + if not data or "url" not in data: + return jsonify({"error": "URL is required"}), 400 + + url = data["url"].strip() + if not url: + return jsonify({"error": "URL cannot be empty"}), 400 + + # Check if URL already exists + existing_stream = Stream.query.filter_by(url=url).first() + if existing_stream: + return ( + jsonify( + { + "error": "URL already exists in stream", + "stream": existing_stream.to_dict(), + } + ), + 409, + ) + + # Try to extract basic metadata to check for service/service_id duplicates + from app.services.stream_processing_service import ( + StreamProcessingService, + ) + + try: + metadata, _ = StreamProcessingService._extract_metadata(url) + if metadata: + service = metadata.get("service") + service_id = metadata.get("service_id") + + if service and service_id: + existing_service_stream = Stream.query.filter_by( + service=service, service_id=service_id + ).first() + + if existing_service_stream: + return ( + jsonify( + { + "error": f"Stream already exists with {service} ID: {service_id}", + "existing_stream": existing_service_stream.to_dict(), + } + ), + 409, + ) + except Exception as e: + # If metadata extraction fails here, we'll let the background process handle it + # This is just an early check to prevent obvious duplicates + pass + + # Create stream entry with pending status + stream = Stream.create_stream(url=url, status="pending", commit=True) + + # Add to processing queue (will be implemented next) + from app.services.stream_processing_service import ( + StreamProcessingService, + ) + + StreamProcessingService.add_to_queue(stream.id) + + return ( + jsonify( + { + "message": "URL added to processing queue", + "stream": stream.to_dict(), + } + ), + 201, + ) + + except Exception as e: + db.session.rollback() + return jsonify({"error": str(e)}), 500 + + +@bp.route("/", methods=["GET"]) +@require_auth +def list_streams(): + """List all streams with optional filtering.""" + try: + status = request.args.get("status") + service = request.args.get("service") + + query = Stream.query + + if status: + query = query.filter_by(status=status) + if service: + query = query.filter_by(service=service) + + streams = query.order_by(Stream.created_at.desc()).all() + + return ( + jsonify({"streams": [stream.to_dict() for stream in streams]}), + 200, + ) + + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@bp.route("/", methods=["GET"]) +@require_auth +def get_stream(stream_id): + """Get a specific stream by ID.""" + try: + stream = Stream.query.get_or_404(stream_id) + return jsonify({"stream": stream.to_dict()}), 200 + + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@bp.route("/", methods=["PUT"]) +@require_auth +def update_stream(stream_id): + """Update stream metadata.""" + try: + stream = Stream.query.get_or_404(stream_id) + data = request.get_json() + + if not data: + return jsonify({"error": "No data provided"}), 400 + + # Update allowed fields + updatable_fields = [ + "title", + "track", + "artist", + "album", + "genre", + "status", + ] + for field in updatable_fields: + if field in data: + setattr(stream, field, data[field]) + + db.session.commit() + + return ( + jsonify( + { + "message": "Stream updated successfully", + "stream": stream.to_dict(), + } + ), + 200, + ) + + except Exception as e: + db.session.rollback() + return jsonify({"error": str(e)}), 500 + + +@bp.route("/", methods=["DELETE"]) +@require_auth +def delete_stream(stream_id): + """Delete a stream.""" + try: + stream = Stream.query.get_or_404(stream_id) + + # If stream is being processed, mark for deletion instead + if stream.status == "processing": + stream.status = "cancelled" + db.session.commit() + return jsonify({"message": "Stream marked for cancellation"}), 200 + + db.session.delete(stream) + db.session.commit() + + return jsonify({"message": "Stream deleted successfully"}), 200 + + except Exception as e: + db.session.rollback() + return jsonify({"error": str(e)}), 500 + + +@bp.route("/queue/status", methods=["GET"]) +@require_auth +def queue_status(): + """Get the current processing queue status.""" + try: + from app.services.stream_processing_service import ( + StreamProcessingService, + ) + + status = StreamProcessingService.get_queue_status() + return jsonify(status), 200 + + except Exception as e: + return jsonify({"error": str(e)}), 500 diff --git a/app/services/stream_processing_service.py b/app/services/stream_processing_service.py new file mode 100644 index 0000000..23d0b0f --- /dev/null +++ b/app/services/stream_processing_service.py @@ -0,0 +1,529 @@ +"""Stream processing service with queue management and yt-dlp integration.""" + +import hashlib +import os +import re +import shutil +import threading +import time +from queue import Empty, Queue +from typing import Dict, List, Optional +from urllib.parse import parse_qs, urlparse + +from app.database import db +from app.models.sound import Sound +from app.models.stream import Stream +from app.services.logging_service import LoggingService + +# Configure logging +logger = LoggingService.get_logger(__name__) + + +class StreamProcessingService: + """Service for processing streaming URLs with yt-dlp.""" + + # Class variables for queue management + _processing_queue: Queue = Queue() + _processing_threads: List[threading.Thread] = [] + _is_running: bool = False + _max_concurrent_downloads: int = int( + os.getenv("STREAM_MAX_CONCURRENT", "2") + ) + _downloads_dir: str = "sounds/temp" + + @classmethod + def initialize(cls) -> None: + """Initialize the stream processing service.""" + if cls._is_running: + return + + # Create necessary directories + os.makedirs(cls._downloads_dir, exist_ok=True) + os.makedirs("sounds/stream", exist_ok=True) + os.makedirs("sounds/stream/thumbnails", exist_ok=True) + + # Start processing threads + for i in range(cls._max_concurrent_downloads): + thread = threading.Thread( + target=cls._worker_thread, + name=f"StreamProcessor-{i + 1}", + daemon=True, + ) + thread.start() + cls._processing_threads.append(thread) + + cls._is_running = True + logger.info( + f"StreamProcessingService initialized with {cls._max_concurrent_downloads} workers" + ) + + @classmethod + def add_to_queue(cls, stream_id: int) -> None: + """Add a stream to the processing queue.""" + if not cls._is_running: + cls.initialize() + + cls._processing_queue.put(stream_id) + logger.info(f"Added stream {stream_id} to processing queue") + + @classmethod + def get_queue_status(cls) -> Dict: + """Get the current queue status.""" + pending_count = Stream.query.filter_by(status="pending").count() + processing_count = Stream.query.filter_by(status="processing").count() + + return { + "queue_size": cls._processing_queue.qsize(), + "pending_streams": pending_count, + "processing_streams": processing_count, + "max_concurrent": cls._max_concurrent_downloads, + "is_running": cls._is_running, + } + + @classmethod + def _worker_thread(cls) -> None: + """Worker thread for processing streams.""" + from app import create_app + + # Create app context for database operations + app = create_app() + + while True: + try: + # Get stream ID from queue with timeout + stream_id = cls._processing_queue.get(timeout=1) + + with app.app_context(): + cls._process_stream(stream_id) + + cls._processing_queue.task_done() + + except Empty: + # No items in queue, continue + continue + except Exception as e: + logger.error(f"Error in worker thread: {e}") + continue + + @classmethod + def _process_stream(cls, stream_id: int) -> None: + """Process a single stream.""" + try: + stream = Stream.query.get(stream_id) + if not stream: + logger.error(f"Stream {stream_id} not found") + return + + if stream.status == "cancelled": + logger.info(f"Stream {stream_id} was cancelled") + return + + # Update status to processing + stream.status = "processing" + db.session.commit() + + logger.info( + f"Starting processing of stream {stream_id}: {stream.url}" + ) + + # Extract metadata and download audio + metadata, error_msg = cls._extract_metadata(stream.url) + if not metadata: + if not error_msg: + error_msg = "Failed to extract metadata from URL" + stream.status = "failed" + stream.error = error_msg + db.session.commit() + logger.error( + f"Failed to extract metadata for stream {stream_id}: {error_msg}" + ) + return + + # Check for duplicate streams based on service and service_id + service = metadata.get("service") + service_id = metadata.get("service_id") + + if service and service_id: + existing_stream = ( + Stream.query.filter_by( + service=service, service_id=service_id + ) + .filter(Stream.id != stream.id) + .first() + ) + + if existing_stream: + error_msg = f"Stream already exists with {service} ID: {service_id} (stream #{existing_stream.id})" + stream.status = "failed" + stream.error = error_msg + db.session.commit() + logger.error( + f"Duplicate stream detected for {stream_id}: {error_msg}" + ) + return + + # Update stream with metadata + cls._update_stream_metadata(stream, metadata) + + # Download audio + audio_path, error_msg = cls._download_audio(stream.url, metadata) + if not audio_path: + if not error_msg: + error_msg = "Failed to download audio from URL" + stream.status = "failed" + stream.error = error_msg + db.session.commit() + logger.error( + f"Failed to download audio for stream {stream_id}: {error_msg}" + ) + return + + # Move files to final locations + final_audio_path, thumbnail_path, error_msg = ( + cls._move_files_to_final_location(audio_path, metadata) + ) + if not final_audio_path: + if not error_msg: + error_msg = "Failed to move files to final location" + stream.status = "failed" + stream.error = error_msg + db.session.commit() + logger.error( + f"Failed to move files for stream {stream_id}: {error_msg}" + ) + return + + # Create sound entry with final path + sound, error_msg = cls._create_sound_entry( + final_audio_path, metadata, thumbnail_path + ) + if not sound: + if not error_msg: + error_msg = "Failed to create sound entry in database" + stream.status = "failed" + stream.error = error_msg + db.session.commit() + logger.error( + f"Failed to create sound entry for stream {stream_id}: {error_msg}" + ) + return + + # Update stream with sound_id and mark as completed + stream.sound_id = sound.id + stream.status = "completed" + stream.error = None # Clear any previous errors + db.session.commit() + + logger.info( + f"Successfully processed stream {stream_id} -> sound {sound.id}" + ) + + except Exception as e: + error_msg = f"Unexpected error during processing: {str(e)}" + logger.error(f"Error processing stream {stream_id}: {error_msg}") + try: + stream = Stream.query.get(stream_id) + if stream: + stream.status = "failed" + stream.error = error_msg + db.session.commit() + except Exception as db_error: + logger.error( + f"Failed to update stream error in database: {db_error}" + ) + + @classmethod + def _extract_metadata( + cls, url: str + ) -> tuple[Optional[Dict], Optional[str]]: + """Extract metadata from URL using yt-dlp.""" + try: + import yt_dlp + + ydl_opts = { + "quiet": True, + "no_warnings": True, + "extract_flat": False, + } + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=False) + + # Extract service information + service = cls._detect_service(url) + service_id = cls._extract_service_id(url, info) + + metadata = { + "service": info.get("extractor", service), + "service_id": info.get("id", service_id), + "title": info.get("title", ""), + "track": info.get("track", ""), + "artist": info.get("artist", "") + or info.get("uploader", ""), + "album": info.get("album", ""), + "genre": info.get("genre", ""), + "duration": info.get("duration", 0), + "description": info.get("description", ""), + } + + return metadata, None + + except Exception as e: + error_msg = f"yt-dlp extraction failed: {str(e)}" + logger.error(f"Error extracting metadata from {url}: {error_msg}") + return None, error_msg + + @classmethod + def _download_audio( + cls, url: str, metadata: Dict + ) -> tuple[Optional[str], Optional[str]]: + """Download audio from URL using yt-dlp.""" + try: + import yt_dlp + + # Generate filename + title = metadata.get("title", "unknown") + safe_title = re.sub(r"[^\w\s-]", "", title)[:50] + filename = f"{safe_title}_{metadata.get('service_id', 'unknown')}" + + output_path = os.path.join( + cls._downloads_dir, f"{filename}.%(ext)s" + ) + + ydl_opts = { + "format": "bestaudio/best", + "outtmpl": output_path, + "extractaudio": True, + "audioformat": "opus", + "audioquality": "192", + "postprocessors": [ + { + "key": "FFmpegExtractAudio", + "preferredcodec": "opus", + "preferredquality": "192", + } + ], + "writethumbnail": True, + "quiet": True, + "no_warnings": True, + } + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + ydl.download([url]) + + # Find the downloaded file + final_path = os.path.join(cls._downloads_dir, f"{filename}.opus") + if os.path.exists(final_path): + return final_path, None + + # If opus doesn't exist, look for other formats + for ext in ["mp3", "wav", "m4a", "webm", "ogg"]: + alt_path = os.path.join(cls._downloads_dir, f"{filename}.{ext}") + if os.path.exists(alt_path): + return alt_path, None + + error_msg = f"Downloaded file not found after yt-dlp processing" + logger.error(f"Downloaded file not found for {url}") + return None, error_msg + + except Exception as e: + error_msg = f"yt-dlp download failed: {str(e)}" + logger.error(f"Error downloading audio from {url}: {error_msg}") + return None, error_msg + + @classmethod + def _move_files_to_final_location( + cls, audio_path: str, metadata: dict + ) -> tuple[Optional[str], Optional[str], Optional[str]]: + """Move downloaded files to their final locations. + + Returns: + tuple: (final_audio_path, thumbnail_path, error_message) + """ + try: + # Create target directories + stream_dir = "sounds/stream" + thumbnail_dir = "sounds/stream/thumbnails" + + os.makedirs(stream_dir, exist_ok=True) + os.makedirs(thumbnail_dir, exist_ok=True) + + # Generate safe filename + title = metadata.get("title", "unknown") + safe_title = re.sub(r"[^\w\s-]", "", title)[:50] + service_id = metadata.get("service_id", "unknown") + base_filename = f"{safe_title}_{service_id}" + + # Move audio file + audio_extension = os.path.splitext(audio_path)[1] + final_audio_filename = f"{base_filename}{audio_extension}" + final_audio_path = os.path.join(stream_dir, final_audio_filename) + + # If file already exists, add a counter + counter = 1 + while os.path.exists(final_audio_path): + final_audio_filename = ( + f"{base_filename}_{counter}{audio_extension}" + ) + final_audio_path = os.path.join( + stream_dir, final_audio_filename + ) + counter += 1 + + shutil.move(audio_path, final_audio_path) + logger.info(f"Moved audio file to: {final_audio_path}") + + # Look for and move thumbnail + thumbnail_path = None + temp_dir = os.path.dirname(audio_path) + + # Common thumbnail extensions + for thumb_ext in [".jpg", ".jpeg", ".png", ".webp", ".gif"]: + temp_thumb_path = os.path.join( + temp_dir, + f"{os.path.splitext(os.path.basename(audio_path))[0]}{thumb_ext}", + ) + if os.path.exists(temp_thumb_path): + final_thumb_filename = f"{base_filename}{thumb_ext}" + final_thumb_path = os.path.join( + thumbnail_dir, final_thumb_filename + ) + + # Handle duplicate thumbnail names + thumb_counter = 1 + while os.path.exists(final_thumb_path): + final_thumb_filename = ( + f"{base_filename}_{thumb_counter}{thumb_ext}" + ) + final_thumb_path = os.path.join( + thumbnail_dir, final_thumb_filename + ) + thumb_counter += 1 + + shutil.move(temp_thumb_path, final_thumb_path) + thumbnail_path = final_thumb_path + logger.info(f"Moved thumbnail to: {final_thumb_path}") + break + + return final_audio_path, thumbnail_path, None + + except Exception as e: + error_msg = f"File move operation failed: {str(e)}" + logger.error(f"Error moving files: {error_msg}") + return None, None, error_msg + + @classmethod + def _create_sound_entry( + cls, + audio_path: str, + metadata: dict, + thumbnail_path: str | None = None, + ) -> tuple[Sound | None, str | None]: + """Create a sound entry from the downloaded audio.""" + try: + # Get file info + file_size = os.path.getsize(audio_path) + + # Generate hash + file_hash = cls._calculate_file_hash(audio_path) + + # Get duration (use metadata duration or calculate from file) + duration_ms = int((metadata.get("duration", 0) or 0) * 1000) + + # Get thumbnail filename if available + thumbnail_filename = None + if thumbnail_path: + thumbnail_filename = os.path.basename(thumbnail_path) + + # Create sound entry + sound = Sound( + type="STR", # Stream type + name=metadata.get("title", "Unknown Title"), + filename=os.path.basename(audio_path), + thumbnail=thumbnail_filename, + duration=duration_ms, + size=file_size, + hash=file_hash, + is_music=True, # Streams are typically music + is_deletable=True, + ) + + db.session.add(sound) + db.session.commit() + + return sound, None + + except Exception as e: + error_msg = f"Database error while creating sound entry: {str(e)}" + logger.error( + f"Error creating sound entry for {audio_path}: {error_msg}" + ) + return None, error_msg + + @classmethod + def _update_stream_metadata(cls, stream: Stream, metadata: Dict) -> None: + """Update stream with extracted metadata.""" + stream.service = metadata.get("service") + stream.service_id = metadata.get("service_id") + stream.title = metadata.get("title") + stream.track = metadata.get("track") + stream.artist = metadata.get("artist") + stream.album = metadata.get("album") + stream.genre = metadata.get("genre") + db.session.commit() + + @classmethod + def _detect_service(cls, url: str) -> str: + """Detect the streaming service from URL.""" + domain = urlparse(url).netloc.lower() + + if "youtube.com" in domain or "youtu.be" in domain: + return "youtube" + elif "soundcloud.com" in domain: + return "soundcloud" + elif "dailymotion.com" in domain: + return "dailymotion" + elif "spotify.com" in domain: + return "spotify" + elif "vimeo.com" in domain: + return "vimeo" + elif "twitch.tv" in domain: + return "twitch" + else: + return "unknown" + + @classmethod + def _extract_service_id(cls, url: str, info: Dict) -> str: + """Extract service-specific ID from URL or info.""" + service = cls._detect_service(url) + + if service == "youtube": + # Try to get from info first + if "id" in info: + return info["id"] + # Parse from URL + parsed = urlparse(url) + if "youtu.be" in parsed.netloc: + return parsed.path[1:] # Remove leading slash + elif "youtube.com" in parsed.netloc: + query_params = parse_qs(parsed.query) + return query_params.get("v", [""])[0] + + elif service == "soundcloud": + if "id" in info: + return str(info["id"]) + + # Fallback to using info ID or last part of URL + if "id" in info: + return str(info["id"]) + + return urlparse(url).path.split("/")[-1] or "unknown" + + @classmethod + def _calculate_file_hash(cls, file_path: str) -> str: + """Calculate SHA256 hash of file.""" + sha256_hash = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + sha256_hash.update(chunk) + return sha256_hash.hexdigest() diff --git a/pyproject.toml b/pyproject.toml index 59a0bb1..ae97fa4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "python-dotenv==1.1.1", "requests==2.32.4", "werkzeug==3.1.3", + "yt-dlp>=2025.6.30", ] [dependency-groups] diff --git a/uv.lock b/uv.lock index 7933c7c..7bf1887 100644 --- a/uv.lock +++ b/uv.lock @@ -640,6 +640,7 @@ dependencies = [ { name = "python-dotenv" }, { name = "requests" }, { name = "werkzeug" }, + { name = "yt-dlp" }, ] [package.dev-dependencies] @@ -664,6 +665,7 @@ requires-dist = [ { name = "python-dotenv", specifier = "==1.1.1" }, { name = "requests", specifier = "==2.32.4" }, { name = "werkzeug", specifier = "==3.1.3" }, + { name = "yt-dlp", specifier = ">=2025.6.30" }, ] [package.metadata.requires-dev] @@ -776,3 +778,12 @@ sdist = { url = "https://files.pythonhosted.org/packages/c9/4a/44d3c295350d77642 wheels = [ { url = "https://files.pythonhosted.org/packages/78/58/e860788190eba3bcce367f74d29c4675466ce8dddfba85f7827588416f01/wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736", size = 24226 }, ] + +[[package]] +name = "yt-dlp" +version = "2025.6.30" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/23/9c/ff64c2fed7909f43a9a0aedb7395c65404e71c2439198764685a6e3b3059/yt_dlp-2025.6.30.tar.gz", hash = "sha256:6d0ae855c0a55bfcc28dffba804ec8525b9b955d34a41191a1561a4cec03d8bd", size = 3034364 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/14/41/2f048ae3f6d0fa2e59223f08ba5049dbcdac628b0a9f9deac722dd9260a5/yt_dlp-2025.6.30-py3-none-any.whl", hash = "sha256:541becc29ed7b7b3a08751c0a66da4b7f8ee95cb81066221c78e83598bc3d1f3", size = 3279333 }, +]