feat(stream): implement stream processing service with routes for managing streaming URLs; add support for concurrent processing and metadata extraction
This commit is contained in:
@@ -16,4 +16,7 @@ GOOGLE_CLIENT_SECRET=your_google_client_secret_here
|
||||
|
||||
# GitHub OAuth
|
||||
GITHUB_CLIENT_ID=your_github_client_id_here
|
||||
GITHUB_CLIENT_SECRET=your_github_client_secret_here
|
||||
GITHUB_CLIENT_SECRET=your_github_client_secret_here
|
||||
|
||||
# Stream Processing Configuration
|
||||
STREAM_MAX_CONCURRENT=2
|
||||
@@ -82,14 +82,20 @@ def create_app():
|
||||
# Start scheduler for background tasks
|
||||
scheduler_service.start()
|
||||
|
||||
# Initialize stream processing service
|
||||
from app.services.stream_processing_service import StreamProcessingService
|
||||
|
||||
StreamProcessingService.initialize()
|
||||
|
||||
# Register blueprints
|
||||
from app.routes import admin, admin_sounds, auth, main, soundboard
|
||||
from app.routes import admin, admin_sounds, auth, main, soundboard, stream
|
||||
|
||||
app.register_blueprint(main.bp, url_prefix="/api")
|
||||
app.register_blueprint(auth.bp, url_prefix="/api/auth")
|
||||
app.register_blueprint(admin.bp, url_prefix="/api/admin")
|
||||
app.register_blueprint(admin_sounds.bp)
|
||||
app.register_blueprint(soundboard.bp)
|
||||
app.register_blueprint(admin_sounds.bp, url_prefix="/api/admin/sounds")
|
||||
app.register_blueprint(soundboard.bp, url_prefix="/api/soundboard")
|
||||
app.register_blueprint(stream.bp, url_prefix="/api/streams")
|
||||
|
||||
# Shutdown scheduler when app is torn down
|
||||
@app.teardown_appcontext
|
||||
|
||||
@@ -36,6 +36,9 @@ class Sound(db.Model):
|
||||
# Basic sound information
|
||||
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
filename: Mapped[str] = mapped_column(String(500), nullable=False)
|
||||
thumbnail: Mapped[str | None] = mapped_column(
|
||||
String(500), nullable=True
|
||||
) # Thumbnail filename
|
||||
duration: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
size: Mapped[int] = mapped_column(Integer, nullable=False) # Size in bytes
|
||||
hash: Mapped[str] = mapped_column(String(64), nullable=False) # SHA256 hash
|
||||
@@ -114,6 +117,7 @@ class Sound(db.Model):
|
||||
"type": self.type,
|
||||
"name": self.name,
|
||||
"filename": self.filename,
|
||||
"thumbnail": self.thumbnail,
|
||||
"duration": self.duration,
|
||||
"size": self.size,
|
||||
"hash": self.hash,
|
||||
@@ -217,6 +221,7 @@ class Sound(db.Model):
|
||||
duration: float,
|
||||
size: int,
|
||||
hash_value: str,
|
||||
thumbnail: Optional[str] = None,
|
||||
is_music: bool = False,
|
||||
is_deletable: bool = True,
|
||||
commit: bool = True,
|
||||
@@ -230,6 +235,7 @@ class Sound(db.Model):
|
||||
type=sound_type,
|
||||
name=name,
|
||||
filename=filename,
|
||||
thumbnail=thumbnail,
|
||||
duration=duration,
|
||||
size=size,
|
||||
hash=hash_value,
|
||||
|
||||
@@ -4,7 +4,7 @@ from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from sqlalchemy import DateTime, ForeignKey, Integer, String, Text
|
||||
from sqlalchemy import DateTime, ForeignKey, Integer, String, Text, UniqueConstraint
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
|
||||
from app.database import db
|
||||
@@ -33,6 +33,7 @@ class Stream(db.Model):
|
||||
status: Mapped[str] = mapped_column(
|
||||
String(50), nullable=False, default="pending"
|
||||
)
|
||||
error: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime,
|
||||
default=lambda: datetime.now(tz=ZoneInfo("UTC")),
|
||||
@@ -48,6 +49,13 @@ class Stream(db.Model):
|
||||
# Relationships
|
||||
sound: Mapped["Sound"] = relationship("Sound", back_populates="streams")
|
||||
|
||||
# Constraints
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"service", "service_id", name="unique_service_stream"
|
||||
),
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""String representation of the stream."""
|
||||
return f"<Stream(id={self.id}, service='{self.service}', service_id='{self.service_id}', sound_id={self.sound_id})>"
|
||||
@@ -66,6 +74,7 @@ class Stream(db.Model):
|
||||
"album": self.album,
|
||||
"genre": self.genre,
|
||||
"status": self.status,
|
||||
"error": self.error,
|
||||
"created_at": (
|
||||
self.created_at.isoformat() if self.created_at else None
|
||||
),
|
||||
|
||||
@@ -2,11 +2,7 @@
|
||||
|
||||
from flask import Blueprint, request
|
||||
|
||||
from app.services.decorators import (
|
||||
get_current_user,
|
||||
require_auth,
|
||||
require_role,
|
||||
)
|
||||
from app.services.decorators import get_current_user, require_auth, require_role
|
||||
from app.services.scheduler_service import scheduler_service
|
||||
from app.services.sound_normalizer_service import SoundNormalizerService
|
||||
from app.services.sound_scanner_service import SoundScannerService
|
||||
|
||||
@@ -2,16 +2,17 @@
|
||||
|
||||
from flask import Blueprint, jsonify, request
|
||||
|
||||
from app.services.decorators import require_admin
|
||||
from app.services.decorators import require_admin, require_auth, require_role
|
||||
from app.services.error_handling_service import ErrorHandlingService
|
||||
from app.services.sound_normalizer_service import SoundNormalizerService
|
||||
from app.services.sound_scanner_service import SoundScannerService
|
||||
|
||||
bp = Blueprint("admin_sounds", __name__, url_prefix="/api/admin/sounds")
|
||||
bp = Blueprint("admin_sounds", __name__)
|
||||
|
||||
|
||||
@bp.route("/scan", methods=["POST"])
|
||||
@require_admin
|
||||
@require_auth
|
||||
@require_role("admin")
|
||||
def scan_sounds():
|
||||
"""Manually trigger sound scanning."""
|
||||
return ErrorHandlingService.wrap_service_call(
|
||||
|
||||
@@ -11,7 +11,7 @@ from app.services.decorators import (
|
||||
)
|
||||
from app.services.vlc_service import vlc_service
|
||||
|
||||
bp = Blueprint("soundboard", __name__, url_prefix="/api/soundboard")
|
||||
bp = Blueprint("soundboard", __name__)
|
||||
|
||||
|
||||
@bp.route("/sounds", methods=["GET"])
|
||||
|
||||
210
app/routes/stream.py
Normal file
210
app/routes/stream.py
Normal file
@@ -0,0 +1,210 @@
|
||||
"""Stream routes for managing streaming service links."""
|
||||
|
||||
from flask import Blueprint, jsonify, request
|
||||
|
||||
from app.database import db
|
||||
from app.models.stream import Stream
|
||||
from app.services.decorators import require_auth
|
||||
|
||||
bp = Blueprint("stream", __name__)
|
||||
|
||||
|
||||
@bp.route("/add-url", methods=["POST"])
|
||||
@require_auth
|
||||
def add_url():
|
||||
"""Add a URL to the stream processing queue."""
|
||||
try:
|
||||
data = request.get_json()
|
||||
|
||||
if not data or "url" not in data:
|
||||
return jsonify({"error": "URL is required"}), 400
|
||||
|
||||
url = data["url"].strip()
|
||||
if not url:
|
||||
return jsonify({"error": "URL cannot be empty"}), 400
|
||||
|
||||
# Check if URL already exists
|
||||
existing_stream = Stream.query.filter_by(url=url).first()
|
||||
if existing_stream:
|
||||
return (
|
||||
jsonify(
|
||||
{
|
||||
"error": "URL already exists in stream",
|
||||
"stream": existing_stream.to_dict(),
|
||||
}
|
||||
),
|
||||
409,
|
||||
)
|
||||
|
||||
# Try to extract basic metadata to check for service/service_id duplicates
|
||||
from app.services.stream_processing_service import (
|
||||
StreamProcessingService,
|
||||
)
|
||||
|
||||
try:
|
||||
metadata, _ = StreamProcessingService._extract_metadata(url)
|
||||
if metadata:
|
||||
service = metadata.get("service")
|
||||
service_id = metadata.get("service_id")
|
||||
|
||||
if service and service_id:
|
||||
existing_service_stream = Stream.query.filter_by(
|
||||
service=service, service_id=service_id
|
||||
).first()
|
||||
|
||||
if existing_service_stream:
|
||||
return (
|
||||
jsonify(
|
||||
{
|
||||
"error": f"Stream already exists with {service} ID: {service_id}",
|
||||
"existing_stream": existing_service_stream.to_dict(),
|
||||
}
|
||||
),
|
||||
409,
|
||||
)
|
||||
except Exception as e:
|
||||
# If metadata extraction fails here, we'll let the background process handle it
|
||||
# This is just an early check to prevent obvious duplicates
|
||||
pass
|
||||
|
||||
# Create stream entry with pending status
|
||||
stream = Stream.create_stream(url=url, status="pending", commit=True)
|
||||
|
||||
# Add to processing queue (will be implemented next)
|
||||
from app.services.stream_processing_service import (
|
||||
StreamProcessingService,
|
||||
)
|
||||
|
||||
StreamProcessingService.add_to_queue(stream.id)
|
||||
|
||||
return (
|
||||
jsonify(
|
||||
{
|
||||
"message": "URL added to processing queue",
|
||||
"stream": stream.to_dict(),
|
||||
}
|
||||
),
|
||||
201,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@bp.route("/", methods=["GET"])
|
||||
@require_auth
|
||||
def list_streams():
|
||||
"""List all streams with optional filtering."""
|
||||
try:
|
||||
status = request.args.get("status")
|
||||
service = request.args.get("service")
|
||||
|
||||
query = Stream.query
|
||||
|
||||
if status:
|
||||
query = query.filter_by(status=status)
|
||||
if service:
|
||||
query = query.filter_by(service=service)
|
||||
|
||||
streams = query.order_by(Stream.created_at.desc()).all()
|
||||
|
||||
return (
|
||||
jsonify({"streams": [stream.to_dict() for stream in streams]}),
|
||||
200,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@bp.route("/<int:stream_id>", methods=["GET"])
|
||||
@require_auth
|
||||
def get_stream(stream_id):
|
||||
"""Get a specific stream by ID."""
|
||||
try:
|
||||
stream = Stream.query.get_or_404(stream_id)
|
||||
return jsonify({"stream": stream.to_dict()}), 200
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@bp.route("/<int:stream_id>", methods=["PUT"])
|
||||
@require_auth
|
||||
def update_stream(stream_id):
|
||||
"""Update stream metadata."""
|
||||
try:
|
||||
stream = Stream.query.get_or_404(stream_id)
|
||||
data = request.get_json()
|
||||
|
||||
if not data:
|
||||
return jsonify({"error": "No data provided"}), 400
|
||||
|
||||
# Update allowed fields
|
||||
updatable_fields = [
|
||||
"title",
|
||||
"track",
|
||||
"artist",
|
||||
"album",
|
||||
"genre",
|
||||
"status",
|
||||
]
|
||||
for field in updatable_fields:
|
||||
if field in data:
|
||||
setattr(stream, field, data[field])
|
||||
|
||||
db.session.commit()
|
||||
|
||||
return (
|
||||
jsonify(
|
||||
{
|
||||
"message": "Stream updated successfully",
|
||||
"stream": stream.to_dict(),
|
||||
}
|
||||
),
|
||||
200,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@bp.route("/<int:stream_id>", methods=["DELETE"])
|
||||
@require_auth
|
||||
def delete_stream(stream_id):
|
||||
"""Delete a stream."""
|
||||
try:
|
||||
stream = Stream.query.get_or_404(stream_id)
|
||||
|
||||
# If stream is being processed, mark for deletion instead
|
||||
if stream.status == "processing":
|
||||
stream.status = "cancelled"
|
||||
db.session.commit()
|
||||
return jsonify({"message": "Stream marked for cancellation"}), 200
|
||||
|
||||
db.session.delete(stream)
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({"message": "Stream deleted successfully"}), 200
|
||||
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@bp.route("/queue/status", methods=["GET"])
|
||||
@require_auth
|
||||
def queue_status():
|
||||
"""Get the current processing queue status."""
|
||||
try:
|
||||
from app.services.stream_processing_service import (
|
||||
StreamProcessingService,
|
||||
)
|
||||
|
||||
status = StreamProcessingService.get_queue_status()
|
||||
return jsonify(status), 200
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
529
app/services/stream_processing_service.py
Normal file
529
app/services/stream_processing_service.py
Normal file
@@ -0,0 +1,529 @@
|
||||
"""Stream processing service with queue management and yt-dlp integration."""
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import threading
|
||||
import time
|
||||
from queue import Empty, Queue
|
||||
from typing import Dict, List, Optional
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
from app.database import db
|
||||
from app.models.sound import Sound
|
||||
from app.models.stream import Stream
|
||||
from app.services.logging_service import LoggingService
|
||||
|
||||
# Configure logging
|
||||
logger = LoggingService.get_logger(__name__)
|
||||
|
||||
|
||||
class StreamProcessingService:
|
||||
"""Service for processing streaming URLs with yt-dlp."""
|
||||
|
||||
# Class variables for queue management
|
||||
_processing_queue: Queue = Queue()
|
||||
_processing_threads: List[threading.Thread] = []
|
||||
_is_running: bool = False
|
||||
_max_concurrent_downloads: int = int(
|
||||
os.getenv("STREAM_MAX_CONCURRENT", "2")
|
||||
)
|
||||
_downloads_dir: str = "sounds/temp"
|
||||
|
||||
@classmethod
|
||||
def initialize(cls) -> None:
|
||||
"""Initialize the stream processing service."""
|
||||
if cls._is_running:
|
||||
return
|
||||
|
||||
# Create necessary directories
|
||||
os.makedirs(cls._downloads_dir, exist_ok=True)
|
||||
os.makedirs("sounds/stream", exist_ok=True)
|
||||
os.makedirs("sounds/stream/thumbnails", exist_ok=True)
|
||||
|
||||
# Start processing threads
|
||||
for i in range(cls._max_concurrent_downloads):
|
||||
thread = threading.Thread(
|
||||
target=cls._worker_thread,
|
||||
name=f"StreamProcessor-{i + 1}",
|
||||
daemon=True,
|
||||
)
|
||||
thread.start()
|
||||
cls._processing_threads.append(thread)
|
||||
|
||||
cls._is_running = True
|
||||
logger.info(
|
||||
f"StreamProcessingService initialized with {cls._max_concurrent_downloads} workers"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def add_to_queue(cls, stream_id: int) -> None:
|
||||
"""Add a stream to the processing queue."""
|
||||
if not cls._is_running:
|
||||
cls.initialize()
|
||||
|
||||
cls._processing_queue.put(stream_id)
|
||||
logger.info(f"Added stream {stream_id} to processing queue")
|
||||
|
||||
@classmethod
|
||||
def get_queue_status(cls) -> Dict:
|
||||
"""Get the current queue status."""
|
||||
pending_count = Stream.query.filter_by(status="pending").count()
|
||||
processing_count = Stream.query.filter_by(status="processing").count()
|
||||
|
||||
return {
|
||||
"queue_size": cls._processing_queue.qsize(),
|
||||
"pending_streams": pending_count,
|
||||
"processing_streams": processing_count,
|
||||
"max_concurrent": cls._max_concurrent_downloads,
|
||||
"is_running": cls._is_running,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _worker_thread(cls) -> None:
|
||||
"""Worker thread for processing streams."""
|
||||
from app import create_app
|
||||
|
||||
# Create app context for database operations
|
||||
app = create_app()
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Get stream ID from queue with timeout
|
||||
stream_id = cls._processing_queue.get(timeout=1)
|
||||
|
||||
with app.app_context():
|
||||
cls._process_stream(stream_id)
|
||||
|
||||
cls._processing_queue.task_done()
|
||||
|
||||
except Empty:
|
||||
# No items in queue, continue
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"Error in worker thread: {e}")
|
||||
continue
|
||||
|
||||
@classmethod
|
||||
def _process_stream(cls, stream_id: int) -> None:
|
||||
"""Process a single stream."""
|
||||
try:
|
||||
stream = Stream.query.get(stream_id)
|
||||
if not stream:
|
||||
logger.error(f"Stream {stream_id} not found")
|
||||
return
|
||||
|
||||
if stream.status == "cancelled":
|
||||
logger.info(f"Stream {stream_id} was cancelled")
|
||||
return
|
||||
|
||||
# Update status to processing
|
||||
stream.status = "processing"
|
||||
db.session.commit()
|
||||
|
||||
logger.info(
|
||||
f"Starting processing of stream {stream_id}: {stream.url}"
|
||||
)
|
||||
|
||||
# Extract metadata and download audio
|
||||
metadata, error_msg = cls._extract_metadata(stream.url)
|
||||
if not metadata:
|
||||
if not error_msg:
|
||||
error_msg = "Failed to extract metadata from URL"
|
||||
stream.status = "failed"
|
||||
stream.error = error_msg
|
||||
db.session.commit()
|
||||
logger.error(
|
||||
f"Failed to extract metadata for stream {stream_id}: {error_msg}"
|
||||
)
|
||||
return
|
||||
|
||||
# Check for duplicate streams based on service and service_id
|
||||
service = metadata.get("service")
|
||||
service_id = metadata.get("service_id")
|
||||
|
||||
if service and service_id:
|
||||
existing_stream = (
|
||||
Stream.query.filter_by(
|
||||
service=service, service_id=service_id
|
||||
)
|
||||
.filter(Stream.id != stream.id)
|
||||
.first()
|
||||
)
|
||||
|
||||
if existing_stream:
|
||||
error_msg = f"Stream already exists with {service} ID: {service_id} (stream #{existing_stream.id})"
|
||||
stream.status = "failed"
|
||||
stream.error = error_msg
|
||||
db.session.commit()
|
||||
logger.error(
|
||||
f"Duplicate stream detected for {stream_id}: {error_msg}"
|
||||
)
|
||||
return
|
||||
|
||||
# Update stream with metadata
|
||||
cls._update_stream_metadata(stream, metadata)
|
||||
|
||||
# Download audio
|
||||
audio_path, error_msg = cls._download_audio(stream.url, metadata)
|
||||
if not audio_path:
|
||||
if not error_msg:
|
||||
error_msg = "Failed to download audio from URL"
|
||||
stream.status = "failed"
|
||||
stream.error = error_msg
|
||||
db.session.commit()
|
||||
logger.error(
|
||||
f"Failed to download audio for stream {stream_id}: {error_msg}"
|
||||
)
|
||||
return
|
||||
|
||||
# Move files to final locations
|
||||
final_audio_path, thumbnail_path, error_msg = (
|
||||
cls._move_files_to_final_location(audio_path, metadata)
|
||||
)
|
||||
if not final_audio_path:
|
||||
if not error_msg:
|
||||
error_msg = "Failed to move files to final location"
|
||||
stream.status = "failed"
|
||||
stream.error = error_msg
|
||||
db.session.commit()
|
||||
logger.error(
|
||||
f"Failed to move files for stream {stream_id}: {error_msg}"
|
||||
)
|
||||
return
|
||||
|
||||
# Create sound entry with final path
|
||||
sound, error_msg = cls._create_sound_entry(
|
||||
final_audio_path, metadata, thumbnail_path
|
||||
)
|
||||
if not sound:
|
||||
if not error_msg:
|
||||
error_msg = "Failed to create sound entry in database"
|
||||
stream.status = "failed"
|
||||
stream.error = error_msg
|
||||
db.session.commit()
|
||||
logger.error(
|
||||
f"Failed to create sound entry for stream {stream_id}: {error_msg}"
|
||||
)
|
||||
return
|
||||
|
||||
# Update stream with sound_id and mark as completed
|
||||
stream.sound_id = sound.id
|
||||
stream.status = "completed"
|
||||
stream.error = None # Clear any previous errors
|
||||
db.session.commit()
|
||||
|
||||
logger.info(
|
||||
f"Successfully processed stream {stream_id} -> sound {sound.id}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error during processing: {str(e)}"
|
||||
logger.error(f"Error processing stream {stream_id}: {error_msg}")
|
||||
try:
|
||||
stream = Stream.query.get(stream_id)
|
||||
if stream:
|
||||
stream.status = "failed"
|
||||
stream.error = error_msg
|
||||
db.session.commit()
|
||||
except Exception as db_error:
|
||||
logger.error(
|
||||
f"Failed to update stream error in database: {db_error}"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _extract_metadata(
|
||||
cls, url: str
|
||||
) -> tuple[Optional[Dict], Optional[str]]:
|
||||
"""Extract metadata from URL using yt-dlp."""
|
||||
try:
|
||||
import yt_dlp
|
||||
|
||||
ydl_opts = {
|
||||
"quiet": True,
|
||||
"no_warnings": True,
|
||||
"extract_flat": False,
|
||||
}
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(url, download=False)
|
||||
|
||||
# Extract service information
|
||||
service = cls._detect_service(url)
|
||||
service_id = cls._extract_service_id(url, info)
|
||||
|
||||
metadata = {
|
||||
"service": info.get("extractor", service),
|
||||
"service_id": info.get("id", service_id),
|
||||
"title": info.get("title", ""),
|
||||
"track": info.get("track", ""),
|
||||
"artist": info.get("artist", "")
|
||||
or info.get("uploader", ""),
|
||||
"album": info.get("album", ""),
|
||||
"genre": info.get("genre", ""),
|
||||
"duration": info.get("duration", 0),
|
||||
"description": info.get("description", ""),
|
||||
}
|
||||
|
||||
return metadata, None
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"yt-dlp extraction failed: {str(e)}"
|
||||
logger.error(f"Error extracting metadata from {url}: {error_msg}")
|
||||
return None, error_msg
|
||||
|
||||
@classmethod
|
||||
def _download_audio(
|
||||
cls, url: str, metadata: Dict
|
||||
) -> tuple[Optional[str], Optional[str]]:
|
||||
"""Download audio from URL using yt-dlp."""
|
||||
try:
|
||||
import yt_dlp
|
||||
|
||||
# Generate filename
|
||||
title = metadata.get("title", "unknown")
|
||||
safe_title = re.sub(r"[^\w\s-]", "", title)[:50]
|
||||
filename = f"{safe_title}_{metadata.get('service_id', 'unknown')}"
|
||||
|
||||
output_path = os.path.join(
|
||||
cls._downloads_dir, f"{filename}.%(ext)s"
|
||||
)
|
||||
|
||||
ydl_opts = {
|
||||
"format": "bestaudio/best",
|
||||
"outtmpl": output_path,
|
||||
"extractaudio": True,
|
||||
"audioformat": "opus",
|
||||
"audioquality": "192",
|
||||
"postprocessors": [
|
||||
{
|
||||
"key": "FFmpegExtractAudio",
|
||||
"preferredcodec": "opus",
|
||||
"preferredquality": "192",
|
||||
}
|
||||
],
|
||||
"writethumbnail": True,
|
||||
"quiet": True,
|
||||
"no_warnings": True,
|
||||
}
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
ydl.download([url])
|
||||
|
||||
# Find the downloaded file
|
||||
final_path = os.path.join(cls._downloads_dir, f"{filename}.opus")
|
||||
if os.path.exists(final_path):
|
||||
return final_path, None
|
||||
|
||||
# If opus doesn't exist, look for other formats
|
||||
for ext in ["mp3", "wav", "m4a", "webm", "ogg"]:
|
||||
alt_path = os.path.join(cls._downloads_dir, f"{filename}.{ext}")
|
||||
if os.path.exists(alt_path):
|
||||
return alt_path, None
|
||||
|
||||
error_msg = f"Downloaded file not found after yt-dlp processing"
|
||||
logger.error(f"Downloaded file not found for {url}")
|
||||
return None, error_msg
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"yt-dlp download failed: {str(e)}"
|
||||
logger.error(f"Error downloading audio from {url}: {error_msg}")
|
||||
return None, error_msg
|
||||
|
||||
@classmethod
|
||||
def _move_files_to_final_location(
|
||||
cls, audio_path: str, metadata: dict
|
||||
) -> tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
"""Move downloaded files to their final locations.
|
||||
|
||||
Returns:
|
||||
tuple: (final_audio_path, thumbnail_path, error_message)
|
||||
"""
|
||||
try:
|
||||
# Create target directories
|
||||
stream_dir = "sounds/stream"
|
||||
thumbnail_dir = "sounds/stream/thumbnails"
|
||||
|
||||
os.makedirs(stream_dir, exist_ok=True)
|
||||
os.makedirs(thumbnail_dir, exist_ok=True)
|
||||
|
||||
# Generate safe filename
|
||||
title = metadata.get("title", "unknown")
|
||||
safe_title = re.sub(r"[^\w\s-]", "", title)[:50]
|
||||
service_id = metadata.get("service_id", "unknown")
|
||||
base_filename = f"{safe_title}_{service_id}"
|
||||
|
||||
# Move audio file
|
||||
audio_extension = os.path.splitext(audio_path)[1]
|
||||
final_audio_filename = f"{base_filename}{audio_extension}"
|
||||
final_audio_path = os.path.join(stream_dir, final_audio_filename)
|
||||
|
||||
# If file already exists, add a counter
|
||||
counter = 1
|
||||
while os.path.exists(final_audio_path):
|
||||
final_audio_filename = (
|
||||
f"{base_filename}_{counter}{audio_extension}"
|
||||
)
|
||||
final_audio_path = os.path.join(
|
||||
stream_dir, final_audio_filename
|
||||
)
|
||||
counter += 1
|
||||
|
||||
shutil.move(audio_path, final_audio_path)
|
||||
logger.info(f"Moved audio file to: {final_audio_path}")
|
||||
|
||||
# Look for and move thumbnail
|
||||
thumbnail_path = None
|
||||
temp_dir = os.path.dirname(audio_path)
|
||||
|
||||
# Common thumbnail extensions
|
||||
for thumb_ext in [".jpg", ".jpeg", ".png", ".webp", ".gif"]:
|
||||
temp_thumb_path = os.path.join(
|
||||
temp_dir,
|
||||
f"{os.path.splitext(os.path.basename(audio_path))[0]}{thumb_ext}",
|
||||
)
|
||||
if os.path.exists(temp_thumb_path):
|
||||
final_thumb_filename = f"{base_filename}{thumb_ext}"
|
||||
final_thumb_path = os.path.join(
|
||||
thumbnail_dir, final_thumb_filename
|
||||
)
|
||||
|
||||
# Handle duplicate thumbnail names
|
||||
thumb_counter = 1
|
||||
while os.path.exists(final_thumb_path):
|
||||
final_thumb_filename = (
|
||||
f"{base_filename}_{thumb_counter}{thumb_ext}"
|
||||
)
|
||||
final_thumb_path = os.path.join(
|
||||
thumbnail_dir, final_thumb_filename
|
||||
)
|
||||
thumb_counter += 1
|
||||
|
||||
shutil.move(temp_thumb_path, final_thumb_path)
|
||||
thumbnail_path = final_thumb_path
|
||||
logger.info(f"Moved thumbnail to: {final_thumb_path}")
|
||||
break
|
||||
|
||||
return final_audio_path, thumbnail_path, None
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"File move operation failed: {str(e)}"
|
||||
logger.error(f"Error moving files: {error_msg}")
|
||||
return None, None, error_msg
|
||||
|
||||
@classmethod
|
||||
def _create_sound_entry(
|
||||
cls,
|
||||
audio_path: str,
|
||||
metadata: dict,
|
||||
thumbnail_path: str | None = None,
|
||||
) -> tuple[Sound | None, str | None]:
|
||||
"""Create a sound entry from the downloaded audio."""
|
||||
try:
|
||||
# Get file info
|
||||
file_size = os.path.getsize(audio_path)
|
||||
|
||||
# Generate hash
|
||||
file_hash = cls._calculate_file_hash(audio_path)
|
||||
|
||||
# Get duration (use metadata duration or calculate from file)
|
||||
duration_ms = int((metadata.get("duration", 0) or 0) * 1000)
|
||||
|
||||
# Get thumbnail filename if available
|
||||
thumbnail_filename = None
|
||||
if thumbnail_path:
|
||||
thumbnail_filename = os.path.basename(thumbnail_path)
|
||||
|
||||
# Create sound entry
|
||||
sound = Sound(
|
||||
type="STR", # Stream type
|
||||
name=metadata.get("title", "Unknown Title"),
|
||||
filename=os.path.basename(audio_path),
|
||||
thumbnail=thumbnail_filename,
|
||||
duration=duration_ms,
|
||||
size=file_size,
|
||||
hash=file_hash,
|
||||
is_music=True, # Streams are typically music
|
||||
is_deletable=True,
|
||||
)
|
||||
|
||||
db.session.add(sound)
|
||||
db.session.commit()
|
||||
|
||||
return sound, None
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Database error while creating sound entry: {str(e)}"
|
||||
logger.error(
|
||||
f"Error creating sound entry for {audio_path}: {error_msg}"
|
||||
)
|
||||
return None, error_msg
|
||||
|
||||
@classmethod
|
||||
def _update_stream_metadata(cls, stream: Stream, metadata: Dict) -> None:
|
||||
"""Update stream with extracted metadata."""
|
||||
stream.service = metadata.get("service")
|
||||
stream.service_id = metadata.get("service_id")
|
||||
stream.title = metadata.get("title")
|
||||
stream.track = metadata.get("track")
|
||||
stream.artist = metadata.get("artist")
|
||||
stream.album = metadata.get("album")
|
||||
stream.genre = metadata.get("genre")
|
||||
db.session.commit()
|
||||
|
||||
@classmethod
|
||||
def _detect_service(cls, url: str) -> str:
|
||||
"""Detect the streaming service from URL."""
|
||||
domain = urlparse(url).netloc.lower()
|
||||
|
||||
if "youtube.com" in domain or "youtu.be" in domain:
|
||||
return "youtube"
|
||||
elif "soundcloud.com" in domain:
|
||||
return "soundcloud"
|
||||
elif "dailymotion.com" in domain:
|
||||
return "dailymotion"
|
||||
elif "spotify.com" in domain:
|
||||
return "spotify"
|
||||
elif "vimeo.com" in domain:
|
||||
return "vimeo"
|
||||
elif "twitch.tv" in domain:
|
||||
return "twitch"
|
||||
else:
|
||||
return "unknown"
|
||||
|
||||
@classmethod
|
||||
def _extract_service_id(cls, url: str, info: Dict) -> str:
|
||||
"""Extract service-specific ID from URL or info."""
|
||||
service = cls._detect_service(url)
|
||||
|
||||
if service == "youtube":
|
||||
# Try to get from info first
|
||||
if "id" in info:
|
||||
return info["id"]
|
||||
# Parse from URL
|
||||
parsed = urlparse(url)
|
||||
if "youtu.be" in parsed.netloc:
|
||||
return parsed.path[1:] # Remove leading slash
|
||||
elif "youtube.com" in parsed.netloc:
|
||||
query_params = parse_qs(parsed.query)
|
||||
return query_params.get("v", [""])[0]
|
||||
|
||||
elif service == "soundcloud":
|
||||
if "id" in info:
|
||||
return str(info["id"])
|
||||
|
||||
# Fallback to using info ID or last part of URL
|
||||
if "id" in info:
|
||||
return str(info["id"])
|
||||
|
||||
return urlparse(url).path.split("/")[-1] or "unknown"
|
||||
|
||||
@classmethod
|
||||
def _calculate_file_hash(cls, file_path: str) -> str:
|
||||
"""Calculate SHA256 hash of file."""
|
||||
sha256_hash = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
sha256_hash.update(chunk)
|
||||
return sha256_hash.hexdigest()
|
||||
@@ -19,6 +19,7 @@ dependencies = [
|
||||
"python-dotenv==1.1.1",
|
||||
"requests==2.32.4",
|
||||
"werkzeug==3.1.3",
|
||||
"yt-dlp>=2025.6.30",
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
|
||||
11
uv.lock
generated
11
uv.lock
generated
@@ -640,6 +640,7 @@ dependencies = [
|
||||
{ name = "python-dotenv" },
|
||||
{ name = "requests" },
|
||||
{ name = "werkzeug" },
|
||||
{ name = "yt-dlp" },
|
||||
]
|
||||
|
||||
[package.dev-dependencies]
|
||||
@@ -664,6 +665,7 @@ requires-dist = [
|
||||
{ name = "python-dotenv", specifier = "==1.1.1" },
|
||||
{ name = "requests", specifier = "==2.32.4" },
|
||||
{ name = "werkzeug", specifier = "==3.1.3" },
|
||||
{ name = "yt-dlp", specifier = ">=2025.6.30" },
|
||||
]
|
||||
|
||||
[package.metadata.requires-dev]
|
||||
@@ -776,3 +778,12 @@ sdist = { url = "https://files.pythonhosted.org/packages/c9/4a/44d3c295350d77642
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/78/58/e860788190eba3bcce367f74d29c4675466ce8dddfba85f7827588416f01/wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736", size = 24226 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "yt-dlp"
|
||||
version = "2025.6.30"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/23/9c/ff64c2fed7909f43a9a0aedb7395c65404e71c2439198764685a6e3b3059/yt_dlp-2025.6.30.tar.gz", hash = "sha256:6d0ae855c0a55bfcc28dffba804ec8525b9b955d34a41191a1561a4cec03d8bd", size = 3034364 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/14/41/2f048ae3f6d0fa2e59223f08ba5049dbcdac628b0a9f9deac722dd9260a5/yt_dlp-2025.6.30-py3-none-any.whl", hash = "sha256:541becc29ed7b7b3a08751c0a66da4b7f8ee95cb81066221c78e83598bc3d1f3", size = 3279333 },
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user