530 lines
19 KiB
Python
530 lines
19 KiB
Python
"""Stream processing service with queue management and yt-dlp integration."""
|
|
|
|
import hashlib
|
|
import os
|
|
import re
|
|
import shutil
|
|
import threading
|
|
import time
|
|
from queue import Empty, Queue
|
|
from typing import Dict, List, Optional
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
from app.database import db
|
|
from app.models.sound import Sound
|
|
from app.models.stream import Stream
|
|
from app.services.logging_service import LoggingService
|
|
|
|
# Configure logging
|
|
logger = LoggingService.get_logger(__name__)
|
|
|
|
|
|
class StreamProcessingService:
|
|
"""Service for processing streaming URLs with yt-dlp."""
|
|
|
|
# Class variables for queue management
|
|
_processing_queue: Queue = Queue()
|
|
_processing_threads: List[threading.Thread] = []
|
|
_is_running: bool = False
|
|
_max_concurrent_downloads: int = int(
|
|
os.getenv("STREAM_MAX_CONCURRENT", "2")
|
|
)
|
|
_downloads_dir: str = "sounds/temp"
|
|
|
|
@classmethod
|
|
def initialize(cls) -> None:
|
|
"""Initialize the stream processing service."""
|
|
if cls._is_running:
|
|
return
|
|
|
|
# Create necessary directories
|
|
os.makedirs(cls._downloads_dir, exist_ok=True)
|
|
os.makedirs("sounds/stream", exist_ok=True)
|
|
os.makedirs("sounds/stream/thumbnails", exist_ok=True)
|
|
|
|
# Start processing threads
|
|
for i in range(cls._max_concurrent_downloads):
|
|
thread = threading.Thread(
|
|
target=cls._worker_thread,
|
|
name=f"StreamProcessor-{i + 1}",
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
cls._processing_threads.append(thread)
|
|
|
|
cls._is_running = True
|
|
logger.info(
|
|
f"StreamProcessingService initialized with {cls._max_concurrent_downloads} workers"
|
|
)
|
|
|
|
@classmethod
|
|
def add_to_queue(cls, stream_id: int) -> None:
|
|
"""Add a stream to the processing queue."""
|
|
if not cls._is_running:
|
|
cls.initialize()
|
|
|
|
cls._processing_queue.put(stream_id)
|
|
logger.info(f"Added stream {stream_id} to processing queue")
|
|
|
|
@classmethod
|
|
def get_queue_status(cls) -> Dict:
|
|
"""Get the current queue status."""
|
|
pending_count = Stream.query.filter_by(status="pending").count()
|
|
processing_count = Stream.query.filter_by(status="processing").count()
|
|
|
|
return {
|
|
"queue_size": cls._processing_queue.qsize(),
|
|
"pending_streams": pending_count,
|
|
"processing_streams": processing_count,
|
|
"max_concurrent": cls._max_concurrent_downloads,
|
|
"is_running": cls._is_running,
|
|
}
|
|
|
|
@classmethod
|
|
def _worker_thread(cls) -> None:
|
|
"""Worker thread for processing streams."""
|
|
from app import create_app
|
|
|
|
# Create app context for database operations
|
|
app = create_app()
|
|
|
|
while True:
|
|
try:
|
|
# Get stream ID from queue with timeout
|
|
stream_id = cls._processing_queue.get(timeout=1)
|
|
|
|
with app.app_context():
|
|
cls._process_stream(stream_id)
|
|
|
|
cls._processing_queue.task_done()
|
|
|
|
except Empty:
|
|
# No items in queue, continue
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Error in worker thread: {e}")
|
|
continue
|
|
|
|
@classmethod
|
|
def _process_stream(cls, stream_id: int) -> None:
|
|
"""Process a single stream."""
|
|
try:
|
|
stream = Stream.query.get(stream_id)
|
|
if not stream:
|
|
logger.error(f"Stream {stream_id} not found")
|
|
return
|
|
|
|
if stream.status == "cancelled":
|
|
logger.info(f"Stream {stream_id} was cancelled")
|
|
return
|
|
|
|
# Update status to processing
|
|
stream.status = "processing"
|
|
db.session.commit()
|
|
|
|
logger.info(
|
|
f"Starting processing of stream {stream_id}: {stream.url}"
|
|
)
|
|
|
|
# Extract metadata and download audio
|
|
metadata, error_msg = cls._extract_metadata(stream.url)
|
|
if not metadata:
|
|
if not error_msg:
|
|
error_msg = "Failed to extract metadata from URL"
|
|
stream.status = "failed"
|
|
stream.error = error_msg
|
|
db.session.commit()
|
|
logger.error(
|
|
f"Failed to extract metadata for stream {stream_id}: {error_msg}"
|
|
)
|
|
return
|
|
|
|
# Check for duplicate streams based on service and service_id
|
|
service = metadata.get("service")
|
|
service_id = metadata.get("service_id")
|
|
|
|
if service and service_id:
|
|
existing_stream = (
|
|
Stream.query.filter_by(
|
|
service=service, service_id=service_id
|
|
)
|
|
.filter(Stream.id != stream.id)
|
|
.first()
|
|
)
|
|
|
|
if existing_stream:
|
|
error_msg = f"Stream already exists with {service} ID: {service_id} (stream #{existing_stream.id})"
|
|
stream.status = "failed"
|
|
stream.error = error_msg
|
|
db.session.commit()
|
|
logger.error(
|
|
f"Duplicate stream detected for {stream_id}: {error_msg}"
|
|
)
|
|
return
|
|
|
|
# Update stream with metadata
|
|
cls._update_stream_metadata(stream, metadata)
|
|
|
|
# Download audio
|
|
audio_path, error_msg = cls._download_audio(stream.url, metadata)
|
|
if not audio_path:
|
|
if not error_msg:
|
|
error_msg = "Failed to download audio from URL"
|
|
stream.status = "failed"
|
|
stream.error = error_msg
|
|
db.session.commit()
|
|
logger.error(
|
|
f"Failed to download audio for stream {stream_id}: {error_msg}"
|
|
)
|
|
return
|
|
|
|
# Move files to final locations
|
|
final_audio_path, thumbnail_path, error_msg = (
|
|
cls._move_files_to_final_location(audio_path, metadata)
|
|
)
|
|
if not final_audio_path:
|
|
if not error_msg:
|
|
error_msg = "Failed to move files to final location"
|
|
stream.status = "failed"
|
|
stream.error = error_msg
|
|
db.session.commit()
|
|
logger.error(
|
|
f"Failed to move files for stream {stream_id}: {error_msg}"
|
|
)
|
|
return
|
|
|
|
# Create sound entry with final path
|
|
sound, error_msg = cls._create_sound_entry(
|
|
final_audio_path, metadata, thumbnail_path
|
|
)
|
|
if not sound:
|
|
if not error_msg:
|
|
error_msg = "Failed to create sound entry in database"
|
|
stream.status = "failed"
|
|
stream.error = error_msg
|
|
db.session.commit()
|
|
logger.error(
|
|
f"Failed to create sound entry for stream {stream_id}: {error_msg}"
|
|
)
|
|
return
|
|
|
|
# Update stream with sound_id and mark as completed
|
|
stream.sound_id = sound.id
|
|
stream.status = "completed"
|
|
stream.error = None # Clear any previous errors
|
|
db.session.commit()
|
|
|
|
logger.info(
|
|
f"Successfully processed stream {stream_id} -> sound {sound.id}"
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error during processing: {str(e)}"
|
|
logger.error(f"Error processing stream {stream_id}: {error_msg}")
|
|
try:
|
|
stream = Stream.query.get(stream_id)
|
|
if stream:
|
|
stream.status = "failed"
|
|
stream.error = error_msg
|
|
db.session.commit()
|
|
except Exception as db_error:
|
|
logger.error(
|
|
f"Failed to update stream error in database: {db_error}"
|
|
)
|
|
|
|
@classmethod
|
|
def _extract_metadata(
|
|
cls, url: str
|
|
) -> tuple[Optional[Dict], Optional[str]]:
|
|
"""Extract metadata from URL using yt-dlp."""
|
|
try:
|
|
import yt_dlp
|
|
|
|
ydl_opts = {
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"extract_flat": False,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(url, download=False)
|
|
|
|
# Extract service information
|
|
service = cls._detect_service(url)
|
|
service_id = cls._extract_service_id(url, info)
|
|
|
|
metadata = {
|
|
"service": info.get("extractor", service),
|
|
"service_id": info.get("id", service_id),
|
|
"title": info.get("title", ""),
|
|
"track": info.get("track", ""),
|
|
"artist": info.get("artist", "")
|
|
or info.get("uploader", ""),
|
|
"album": info.get("album", ""),
|
|
"genre": info.get("genre", ""),
|
|
"duration": info.get("duration", 0),
|
|
"description": info.get("description", ""),
|
|
}
|
|
|
|
return metadata, None
|
|
|
|
except Exception as e:
|
|
error_msg = f"yt-dlp extraction failed: {str(e)}"
|
|
logger.error(f"Error extracting metadata from {url}: {error_msg}")
|
|
return None, error_msg
|
|
|
|
@classmethod
|
|
def _download_audio(
|
|
cls, url: str, metadata: Dict
|
|
) -> tuple[Optional[str], Optional[str]]:
|
|
"""Download audio from URL using yt-dlp."""
|
|
try:
|
|
import yt_dlp
|
|
|
|
# Generate filename
|
|
title = metadata.get("title", "unknown")
|
|
safe_title = re.sub(r"[^\w\s-]", "", title)[:50]
|
|
filename = f"{safe_title}_{metadata.get('service_id', 'unknown')}"
|
|
|
|
output_path = os.path.join(
|
|
cls._downloads_dir, f"{filename}.%(ext)s"
|
|
)
|
|
|
|
ydl_opts = {
|
|
"format": "bestaudio/best",
|
|
"outtmpl": output_path,
|
|
"extractaudio": True,
|
|
"audioformat": "opus",
|
|
"audioquality": "192",
|
|
"postprocessors": [
|
|
{
|
|
"key": "FFmpegExtractAudio",
|
|
"preferredcodec": "opus",
|
|
"preferredquality": "192",
|
|
}
|
|
],
|
|
"writethumbnail": True,
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
ydl.download([url])
|
|
|
|
# Find the downloaded file
|
|
final_path = os.path.join(cls._downloads_dir, f"{filename}.opus")
|
|
if os.path.exists(final_path):
|
|
return final_path, None
|
|
|
|
# If opus doesn't exist, look for other formats
|
|
for ext in ["mp3", "wav", "m4a", "webm", "ogg"]:
|
|
alt_path = os.path.join(cls._downloads_dir, f"{filename}.{ext}")
|
|
if os.path.exists(alt_path):
|
|
return alt_path, None
|
|
|
|
error_msg = f"Downloaded file not found after yt-dlp processing"
|
|
logger.error(f"Downloaded file not found for {url}")
|
|
return None, error_msg
|
|
|
|
except Exception as e:
|
|
error_msg = f"yt-dlp download failed: {str(e)}"
|
|
logger.error(f"Error downloading audio from {url}: {error_msg}")
|
|
return None, error_msg
|
|
|
|
@classmethod
|
|
def _move_files_to_final_location(
|
|
cls, audio_path: str, metadata: dict
|
|
) -> tuple[Optional[str], Optional[str], Optional[str]]:
|
|
"""Move downloaded files to their final locations.
|
|
|
|
Returns:
|
|
tuple: (final_audio_path, thumbnail_path, error_message)
|
|
"""
|
|
try:
|
|
# Create target directories
|
|
stream_dir = "sounds/stream"
|
|
thumbnail_dir = "sounds/stream/thumbnails"
|
|
|
|
os.makedirs(stream_dir, exist_ok=True)
|
|
os.makedirs(thumbnail_dir, exist_ok=True)
|
|
|
|
# Generate safe filename
|
|
title = metadata.get("title", "unknown")
|
|
safe_title = re.sub(r"[^\w\s-]", "", title)[:50]
|
|
service_id = metadata.get("service_id", "unknown")
|
|
base_filename = f"{safe_title}_{service_id}"
|
|
|
|
# Move audio file
|
|
audio_extension = os.path.splitext(audio_path)[1]
|
|
final_audio_filename = f"{base_filename}{audio_extension}"
|
|
final_audio_path = os.path.join(stream_dir, final_audio_filename)
|
|
|
|
# If file already exists, add a counter
|
|
counter = 1
|
|
while os.path.exists(final_audio_path):
|
|
final_audio_filename = (
|
|
f"{base_filename}_{counter}{audio_extension}"
|
|
)
|
|
final_audio_path = os.path.join(
|
|
stream_dir, final_audio_filename
|
|
)
|
|
counter += 1
|
|
|
|
shutil.move(audio_path, final_audio_path)
|
|
logger.info(f"Moved audio file to: {final_audio_path}")
|
|
|
|
# Look for and move thumbnail
|
|
thumbnail_path = None
|
|
temp_dir = os.path.dirname(audio_path)
|
|
|
|
# Common thumbnail extensions
|
|
for thumb_ext in [".jpg", ".jpeg", ".png", ".webp", ".gif"]:
|
|
temp_thumb_path = os.path.join(
|
|
temp_dir,
|
|
f"{os.path.splitext(os.path.basename(audio_path))[0]}{thumb_ext}",
|
|
)
|
|
if os.path.exists(temp_thumb_path):
|
|
final_thumb_filename = f"{base_filename}{thumb_ext}"
|
|
final_thumb_path = os.path.join(
|
|
thumbnail_dir, final_thumb_filename
|
|
)
|
|
|
|
# Handle duplicate thumbnail names
|
|
thumb_counter = 1
|
|
while os.path.exists(final_thumb_path):
|
|
final_thumb_filename = (
|
|
f"{base_filename}_{thumb_counter}{thumb_ext}"
|
|
)
|
|
final_thumb_path = os.path.join(
|
|
thumbnail_dir, final_thumb_filename
|
|
)
|
|
thumb_counter += 1
|
|
|
|
shutil.move(temp_thumb_path, final_thumb_path)
|
|
thumbnail_path = final_thumb_path
|
|
logger.info(f"Moved thumbnail to: {final_thumb_path}")
|
|
break
|
|
|
|
return final_audio_path, thumbnail_path, None
|
|
|
|
except Exception as e:
|
|
error_msg = f"File move operation failed: {str(e)}"
|
|
logger.error(f"Error moving files: {error_msg}")
|
|
return None, None, error_msg
|
|
|
|
@classmethod
|
|
def _create_sound_entry(
|
|
cls,
|
|
audio_path: str,
|
|
metadata: dict,
|
|
thumbnail_path: str | None = None,
|
|
) -> tuple[Sound | None, str | None]:
|
|
"""Create a sound entry from the downloaded audio."""
|
|
try:
|
|
# Get file info
|
|
file_size = os.path.getsize(audio_path)
|
|
|
|
# Generate hash
|
|
file_hash = cls._calculate_file_hash(audio_path)
|
|
|
|
# Get duration (use metadata duration or calculate from file)
|
|
duration_ms = int((metadata.get("duration", 0) or 0) * 1000)
|
|
|
|
# Get thumbnail filename if available
|
|
thumbnail_filename = None
|
|
if thumbnail_path:
|
|
thumbnail_filename = os.path.basename(thumbnail_path)
|
|
|
|
# Create sound entry
|
|
sound = Sound(
|
|
type="STR", # Stream type
|
|
name=metadata.get("title", "Unknown Title"),
|
|
filename=os.path.basename(audio_path),
|
|
thumbnail=thumbnail_filename,
|
|
duration=duration_ms,
|
|
size=file_size,
|
|
hash=file_hash,
|
|
is_music=True, # Streams are typically music
|
|
is_deletable=True,
|
|
)
|
|
|
|
db.session.add(sound)
|
|
db.session.commit()
|
|
|
|
return sound, None
|
|
|
|
except Exception as e:
|
|
error_msg = f"Database error while creating sound entry: {str(e)}"
|
|
logger.error(
|
|
f"Error creating sound entry for {audio_path}: {error_msg}"
|
|
)
|
|
return None, error_msg
|
|
|
|
@classmethod
|
|
def _update_stream_metadata(cls, stream: Stream, metadata: Dict) -> None:
|
|
"""Update stream with extracted metadata."""
|
|
stream.service = metadata.get("service")
|
|
stream.service_id = metadata.get("service_id")
|
|
stream.title = metadata.get("title")
|
|
stream.track = metadata.get("track")
|
|
stream.artist = metadata.get("artist")
|
|
stream.album = metadata.get("album")
|
|
stream.genre = metadata.get("genre")
|
|
db.session.commit()
|
|
|
|
@classmethod
|
|
def _detect_service(cls, url: str) -> str:
|
|
"""Detect the streaming service from URL."""
|
|
domain = urlparse(url).netloc.lower()
|
|
|
|
if "youtube.com" in domain or "youtu.be" in domain:
|
|
return "youtube"
|
|
elif "soundcloud.com" in domain:
|
|
return "soundcloud"
|
|
elif "dailymotion.com" in domain:
|
|
return "dailymotion"
|
|
elif "spotify.com" in domain:
|
|
return "spotify"
|
|
elif "vimeo.com" in domain:
|
|
return "vimeo"
|
|
elif "twitch.tv" in domain:
|
|
return "twitch"
|
|
else:
|
|
return "unknown"
|
|
|
|
@classmethod
|
|
def _extract_service_id(cls, url: str, info: Dict) -> str:
|
|
"""Extract service-specific ID from URL or info."""
|
|
service = cls._detect_service(url)
|
|
|
|
if service == "youtube":
|
|
# Try to get from info first
|
|
if "id" in info:
|
|
return info["id"]
|
|
# Parse from URL
|
|
parsed = urlparse(url)
|
|
if "youtu.be" in parsed.netloc:
|
|
return parsed.path[1:] # Remove leading slash
|
|
elif "youtube.com" in parsed.netloc:
|
|
query_params = parse_qs(parsed.query)
|
|
return query_params.get("v", [""])[0]
|
|
|
|
elif service == "soundcloud":
|
|
if "id" in info:
|
|
return str(info["id"])
|
|
|
|
# Fallback to using info ID or last part of URL
|
|
if "id" in info:
|
|
return str(info["id"])
|
|
|
|
return urlparse(url).path.split("/")[-1] or "unknown"
|
|
|
|
@classmethod
|
|
def _calculate_file_hash(cls, file_path: str) -> str:
|
|
"""Calculate SHA256 hash of file."""
|
|
sha256_hash = hashlib.sha256()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
sha256_hash.update(chunk)
|
|
return sha256_hash.hexdigest()
|