Compare commits

...

6 Commits

13 changed files with 1044 additions and 78 deletions

View File

@@ -17,3 +17,6 @@ GOOGLE_CLIENT_SECRET=your_google_client_secret_here
# GitHub OAuth # GitHub OAuth
GITHUB_CLIENT_ID=your_github_client_id_here GITHUB_CLIENT_ID=your_github_client_id_here
GITHUB_CLIENT_SECRET=your_github_client_secret_here GITHUB_CLIENT_SECRET=your_github_client_secret_here
# Stream Processing Configuration
STREAM_MAX_CONCURRENT=2

View File

@@ -82,14 +82,20 @@ def create_app():
# Start scheduler for background tasks # Start scheduler for background tasks
scheduler_service.start() scheduler_service.start()
# Initialize stream processing service
from app.services.stream_processing_service import StreamProcessingService
StreamProcessingService.initialize()
# Register blueprints # Register blueprints
from app.routes import admin, admin_sounds, auth, main, soundboard from app.routes import admin, admin_sounds, auth, main, soundboard, stream
app.register_blueprint(main.bp, url_prefix="/api") app.register_blueprint(main.bp, url_prefix="/api")
app.register_blueprint(auth.bp, url_prefix="/api/auth") app.register_blueprint(auth.bp, url_prefix="/api/auth")
app.register_blueprint(admin.bp, url_prefix="/api/admin") app.register_blueprint(admin.bp, url_prefix="/api/admin")
app.register_blueprint(admin_sounds.bp) app.register_blueprint(admin_sounds.bp, url_prefix="/api/admin/sounds")
app.register_blueprint(soundboard.bp) app.register_blueprint(soundboard.bp, url_prefix="/api/soundboard")
app.register_blueprint(stream.bp, url_prefix="/api/stream")
# Shutdown scheduler when app is torn down # Shutdown scheduler when app is torn down
@app.teardown_appcontext @app.teardown_appcontext

View File

@@ -4,7 +4,8 @@ from .plan import Plan
from .playlist import Playlist from .playlist import Playlist
from .playlist_sound import PlaylistSound from .playlist_sound import PlaylistSound
from .sound import Sound from .sound import Sound
from .stream import Stream
from .user import User from .user import User
from .user_oauth import UserOAuth from .user_oauth import UserOAuth
__all__ = ["Plan", "Playlist", "PlaylistSound", "Sound", "User", "UserOAuth"] __all__ = ["Plan", "Playlist", "PlaylistSound", "Sound", "Stream", "User", "UserOAuth"]

View File

@@ -12,6 +12,7 @@ from app.database import db
if TYPE_CHECKING: if TYPE_CHECKING:
from app.models.playlist_sound import PlaylistSound from app.models.playlist_sound import PlaylistSound
from app.models.stream import Stream
class SoundType(Enum): class SoundType(Enum):
@@ -35,6 +36,9 @@ class Sound(db.Model):
# Basic sound information # Basic sound information
name: Mapped[str] = mapped_column(String(255), nullable=False) name: Mapped[str] = mapped_column(String(255), nullable=False)
filename: Mapped[str] = mapped_column(String(500), nullable=False) filename: Mapped[str] = mapped_column(String(500), nullable=False)
thumbnail: Mapped[str | None] = mapped_column(
String(500), nullable=True
) # Thumbnail filename
duration: Mapped[int] = mapped_column(Integer, nullable=False) duration: Mapped[int] = mapped_column(Integer, nullable=False)
size: Mapped[int] = mapped_column(Integer, nullable=False) # Size in bytes size: Mapped[int] = mapped_column(Integer, nullable=False) # Size in bytes
hash: Mapped[str] = mapped_column(String(64), nullable=False) # SHA256 hash hash: Mapped[str] = mapped_column(String(64), nullable=False) # SHA256 hash
@@ -96,6 +100,11 @@ class Sound(db.Model):
back_populates="sound", back_populates="sound",
cascade="all, delete-orphan", cascade="all, delete-orphan",
) )
streams: Mapped[list["Stream"]] = relationship(
"Stream",
back_populates="sound",
cascade="all, delete-orphan",
)
def __repr__(self) -> str: def __repr__(self) -> str:
"""String representation of Sound.""" """String representation of Sound."""
@@ -108,6 +117,7 @@ class Sound(db.Model):
"type": self.type, "type": self.type,
"name": self.name, "name": self.name,
"filename": self.filename, "filename": self.filename,
"thumbnail": self.thumbnail,
"duration": self.duration, "duration": self.duration,
"size": self.size, "size": self.size,
"hash": self.hash, "hash": self.hash,
@@ -211,6 +221,7 @@ class Sound(db.Model):
duration: float, duration: float,
size: int, size: int,
hash_value: str, hash_value: str,
thumbnail: Optional[str] = None,
is_music: bool = False, is_music: bool = False,
is_deletable: bool = True, is_deletable: bool = True,
commit: bool = True, commit: bool = True,
@@ -224,6 +235,7 @@ class Sound(db.Model):
type=sound_type, type=sound_type,
name=name, name=name,
filename=filename, filename=filename,
thumbnail=thumbnail,
duration=duration, duration=duration,
size=size, size=size,
hash=hash_value, hash=hash_value,

186
app/models/stream.py Normal file
View File

@@ -0,0 +1,186 @@
"""Stream model for storing streaming service links to sounds."""
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from zoneinfo import ZoneInfo
from sqlalchemy import DateTime, ForeignKey, Integer, String, Text, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import db
if TYPE_CHECKING:
from app.models.sound import Sound
class Stream(db.Model):
"""Model for storing streaming service information linked to sounds."""
__tablename__ = "stream"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
service: Mapped[str] = mapped_column(String(50), nullable=True)
service_id: Mapped[str] = mapped_column(String(255), nullable=True)
sound_id: Mapped[int] = mapped_column(
Integer, ForeignKey("sound.id"), nullable=True
)
url: Mapped[str] = mapped_column(Text, nullable=False)
title: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)
track: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)
artist: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
album: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
genre: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
status: Mapped[str] = mapped_column(
String(50), nullable=False, default="pending"
)
error: Mapped[str | None] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(tz=ZoneInfo("UTC")),
nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(tz=ZoneInfo("UTC")),
onupdate=lambda: datetime.now(tz=ZoneInfo("UTC")),
nullable=False,
)
# Relationships
sound: Mapped["Sound"] = relationship("Sound", back_populates="streams")
# Constraints
__table_args__ = (
UniqueConstraint(
"service", "service_id", name="unique_service_stream"
),
)
def __repr__(self) -> str:
"""String representation of the stream."""
return f"<Stream(id={self.id}, service='{self.service}', service_id='{self.service_id}', sound_id={self.sound_id})>"
def to_dict(self) -> dict:
"""Convert stream to dictionary representation."""
return {
"id": self.id,
"service": self.service,
"service_id": self.service_id,
"sound_id": self.sound_id,
"url": self.url,
"title": self.title,
"track": self.track,
"artist": self.artist,
"album": self.album,
"genre": self.genre,
"status": self.status,
"error": self.error,
"created_at": (
self.created_at.isoformat() if self.created_at else None
),
"updated_at": (
self.updated_at.isoformat() if self.updated_at else None
),
}
@classmethod
def create_stream(
cls,
url: str,
service: Optional[str] = None,
service_id: Optional[str] = None,
sound_id: Optional[int] = None,
title: Optional[str] = None,
track: Optional[str] = None,
artist: Optional[str] = None,
album: Optional[str] = None,
genre: Optional[str] = None,
status: str = "active",
commit: bool = True,
) -> "Stream":
"""Create a new stream record."""
stream = cls(
service=service,
service_id=service_id,
sound_id=sound_id,
url=url,
title=title,
track=track,
artist=artist,
album=album,
genre=genre,
status=status,
)
db.session.add(stream)
if commit:
db.session.commit()
return stream
@classmethod
def find_by_service_and_id(
cls, service: str, service_id: str
) -> Optional["Stream"]:
"""Find stream by service and service_id."""
return cls.query.filter_by(
service=service, service_id=service_id
).first()
@classmethod
def find_by_sound(cls, sound_id: int) -> list["Stream"]:
"""Find all streams for a specific sound."""
return cls.query.filter_by(sound_id=sound_id).all()
@classmethod
def find_by_service(cls, service: str) -> list["Stream"]:
"""Find all streams for a specific service."""
return cls.query.filter_by(service=service).all()
@classmethod
def find_by_status(cls, status: str) -> list["Stream"]:
"""Find all streams with a specific status."""
return cls.query.filter_by(status=status).all()
@classmethod
def find_active_streams(cls) -> list["Stream"]:
"""Find all active streams."""
return cls.query.filter_by(status="active").all()
def update_metadata(
self,
title: Optional[str] = None,
track: Optional[str] = None,
artist: Optional[str] = None,
album: Optional[str] = None,
genre: Optional[str] = None,
commit: bool = True,
) -> None:
"""Update stream metadata."""
if title is not None:
self.title = title
if track is not None:
self.track = track
if artist is not None:
self.artist = artist
if album is not None:
self.album = album
if genre is not None:
self.genre = genre
if commit:
db.session.commit()
def set_status(self, status: str, commit: bool = True) -> None:
"""Update stream status."""
self.status = status
if commit:
db.session.commit()
def is_active(self) -> bool:
"""Check if stream is active."""
return self.status == "active"
def get_display_name(self) -> str:
"""Get a display name for the stream (title or track or service_id)."""
return self.title or self.track or self.service_id

View File

@@ -1,15 +1,9 @@
"""Admin routes for the application.""" """Admin routes for the application."""
from flask import Blueprint, request from flask import Blueprint
from app.services.decorators import ( from app.services.decorators import get_current_user, require_auth, require_role
get_current_user,
require_auth,
require_role,
)
from app.services.scheduler_service import scheduler_service from app.services.scheduler_service import scheduler_service
from app.services.sound_normalizer_service import SoundNormalizerService
from app.services.sound_scanner_service import SoundScannerService
bp = Blueprint("admin", __name__) bp = Blueprint("admin", __name__)
@@ -43,53 +37,3 @@ def manual_credit_refill() -> dict:
return scheduler_service.trigger_credit_refill_now() return scheduler_service.trigger_credit_refill_now()
@bp.route("/sounds/scan", methods=["POST"])
@require_auth
@require_role("admin")
def manual_sound_scan() -> dict:
"""Manually trigger sound directory scan (admin only)."""
return scheduler_service.trigger_sound_scan_now()
@bp.route("/sounds/stats")
@require_auth
@require_role("admin")
def sound_statistics() -> dict:
"""Get sound database statistics (admin only)."""
return SoundScannerService.get_scan_statistics()
@bp.route("/sounds/normalize/<int:sound_id>", methods=["POST"])
@require_auth
@require_role("admin")
def normalize_sound(sound_id: int) -> dict:
"""Normalize a specific sound file (admin only)."""
overwrite = request.args.get("overwrite", "false").lower() == "true"
return SoundNormalizerService.normalize_sound(sound_id, overwrite)
@bp.route("/sounds/normalize-all", methods=["POST"])
@require_auth
@require_role("admin")
def normalize_all_sounds() -> dict:
"""Normalize all soundboard files (admin only)."""
overwrite = request.args.get("overwrite", "false").lower() == "true"
limit_str = request.args.get("limit")
limit = int(limit_str) if limit_str else None
return SoundNormalizerService.normalize_all_sounds(overwrite, limit)
@bp.route("/sounds/normalization-status")
@require_auth
@require_role("admin")
def normalization_status() -> dict:
"""Get normalization status statistics (admin only)."""
return SoundNormalizerService.get_normalization_status()
@bp.route("/sounds/ffmpeg-check")
@require_auth
@require_role("admin")
def ffmpeg_check() -> dict:
"""Check ffmpeg availability and capabilities (admin only)."""
return SoundNormalizerService.check_ffmpeg_availability()

View File

@@ -2,21 +2,21 @@
from flask import Blueprint, jsonify, request from flask import Blueprint, jsonify, request
from app.services.decorators import require_admin from app.services.decorators import require_admin, require_auth, require_role
from app.services.error_handling_service import ErrorHandlingService from app.services.error_handling_service import ErrorHandlingService
from app.services.scheduler_service import scheduler_service
from app.services.sound_normalizer_service import SoundNormalizerService from app.services.sound_normalizer_service import SoundNormalizerService
from app.services.sound_scanner_service import SoundScannerService from app.services.sound_scanner_service import SoundScannerService
bp = Blueprint("admin_sounds", __name__, url_prefix="/api/admin/sounds") bp = Blueprint("admin_sounds", __name__)
@bp.route("/scan", methods=["POST"]) @bp.route("/scan", methods=["POST"])
@require_admin @require_admin
def scan_sounds(): def scan_sounds():
"""Manually trigger sound scanning.""" """Manually trigger sound scanning."""
return ErrorHandlingService.wrap_service_call( return ErrorHandlingService.handle_service_result(
SoundScannerService.scan_soundboard_directory, scheduler_service.trigger_sound_scan_now()
request.get_json().get("directory") if request.get_json() else None,
) )

View File

@@ -11,7 +11,7 @@ from app.services.decorators import (
) )
from app.services.vlc_service import vlc_service from app.services.vlc_service import vlc_service
bp = Blueprint("soundboard", __name__, url_prefix="/api/soundboard") bp = Blueprint("soundboard", __name__)
@bp.route("/sounds", methods=["GET"]) @bp.route("/sounds", methods=["GET"])

210
app/routes/stream.py Normal file
View File

@@ -0,0 +1,210 @@
"""Stream routes for managing streaming service links."""
from flask import Blueprint, jsonify, request
from app.database import db
from app.models.stream import Stream
from app.services.decorators import require_auth
bp = Blueprint("stream", __name__)
@bp.route("/add-url", methods=["POST"])
@require_auth
def add_url():
"""Add a URL to the stream processing queue."""
try:
data = request.get_json()
if not data or "url" not in data:
return jsonify({"error": "URL is required"}), 400
url = data["url"].strip()
if not url:
return jsonify({"error": "URL cannot be empty"}), 400
# Check if URL already exists
existing_stream = Stream.query.filter_by(url=url).first()
if existing_stream:
return (
jsonify(
{
"error": "URL already exists in stream",
"stream": existing_stream.to_dict(),
}
),
409,
)
# Try to extract basic metadata to check for service/service_id duplicates
from app.services.stream_processing_service import (
StreamProcessingService,
)
try:
metadata, _ = StreamProcessingService._extract_metadata(url)
if metadata:
service = metadata.get("service")
service_id = metadata.get("service_id")
if service and service_id:
existing_service_stream = Stream.query.filter_by(
service=service, service_id=service_id
).first()
if existing_service_stream:
return (
jsonify(
{
"error": f"Stream already exists with {service} ID: {service_id}",
"existing_stream": existing_service_stream.to_dict(),
}
),
409,
)
except Exception as e:
# If metadata extraction fails here, we'll let the background process handle it
# This is just an early check to prevent obvious duplicates
pass
# Create stream entry with pending status
stream = Stream.create_stream(url=url, status="pending", commit=True)
# Add to processing queue (will be implemented next)
from app.services.stream_processing_service import (
StreamProcessingService,
)
StreamProcessingService.add_to_queue(stream.id)
return (
jsonify(
{
"message": "URL added to processing queue",
"stream": stream.to_dict(),
}
),
201,
)
except Exception as e:
db.session.rollback()
return jsonify({"error": str(e)}), 500
@bp.route("/", methods=["GET"])
@require_auth
def list_streams():
"""List all streams with optional filtering."""
try:
status = request.args.get("status")
service = request.args.get("service")
query = Stream.query
if status:
query = query.filter_by(status=status)
if service:
query = query.filter_by(service=service)
streams = query.order_by(Stream.created_at.desc()).all()
return (
jsonify({"streams": [stream.to_dict() for stream in streams]}),
200,
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/<int:stream_id>", methods=["GET"])
@require_auth
def get_stream(stream_id):
"""Get a specific stream by ID."""
try:
stream = Stream.query.get_or_404(stream_id)
return jsonify({"stream": stream.to_dict()}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
@bp.route("/<int:stream_id>", methods=["PUT"])
@require_auth
def update_stream(stream_id):
"""Update stream metadata."""
try:
stream = Stream.query.get_or_404(stream_id)
data = request.get_json()
if not data:
return jsonify({"error": "No data provided"}), 400
# Update allowed fields
updatable_fields = [
"title",
"track",
"artist",
"album",
"genre",
"status",
]
for field in updatable_fields:
if field in data:
setattr(stream, field, data[field])
db.session.commit()
return (
jsonify(
{
"message": "Stream updated successfully",
"stream": stream.to_dict(),
}
),
200,
)
except Exception as e:
db.session.rollback()
return jsonify({"error": str(e)}), 500
@bp.route("/<int:stream_id>", methods=["DELETE"])
@require_auth
def delete_stream(stream_id):
"""Delete a stream."""
try:
stream = Stream.query.get_or_404(stream_id)
# If stream is being processed, mark for deletion instead
if stream.status == "processing":
stream.status = "cancelled"
db.session.commit()
return jsonify({"message": "Stream marked for cancellation"}), 200
db.session.delete(stream)
db.session.commit()
return jsonify({"message": "Stream deleted successfully"}), 200
except Exception as e:
db.session.rollback()
return jsonify({"error": str(e)}), 500
@bp.route("/queue/status", methods=["GET"])
@require_auth
def queue_status():
"""Get the current processing queue status."""
try:
from app.services.stream_processing_service import (
StreamProcessingService,
)
status = StreamProcessingService.get_queue_status()
return jsonify(status), 200
except Exception as e:
return jsonify({"error": str(e)}), 500

View File

@@ -27,8 +27,17 @@ class SoundNormalizerService:
".aac", ".aac",
".opus", ".opus",
} }
SOUNDS_DIR = "sounds/soundboard" # Sound directories by type
NORMALIZED_DIR = "sounds/normalized/soundboard" SOUND_DIRS = {
"SDB": "sounds/soundboard",
"SAY": "sounds/say",
"STR": "sounds/stream"
}
NORMALIZED_DIRS = {
"SDB": "sounds/normalized/soundboard",
"SAY": "sounds/normalized/say",
"STR": "sounds/normalized/stream"
}
LOUDNORM_PARAMS = { LOUDNORM_PARAMS = {
"integrated": -16, "integrated": -16,
@@ -62,9 +71,17 @@ class SoundNormalizerService:
"error": f"Sound with ID {sound_id} not found", "error": f"Sound with ID {sound_id} not found",
} }
source_path = ( # Get directories based on sound type
Path(SoundNormalizerService.SOUNDS_DIR) / sound.filename sound_dir = SoundNormalizerService.SOUND_DIRS.get(sound.type)
) normalized_dir = SoundNormalizerService.NORMALIZED_DIRS.get(sound.type)
if not sound_dir or not normalized_dir:
return {
"success": False,
"error": f"Unsupported sound type: {sound.type}",
}
source_path = Path(sound_dir) / sound.filename
if not source_path.exists(): if not source_path.exists():
return { return {
"success": False, "success": False,
@@ -74,10 +91,7 @@ class SoundNormalizerService:
# Always output as WAV regardless of input format # Always output as WAV regardless of input format
filename_without_ext = Path(sound.filename).stem filename_without_ext = Path(sound.filename).stem
normalized_filename = f"{filename_without_ext}.wav" normalized_filename = f"{filename_without_ext}.wav"
normalized_path = ( normalized_path = Path(normalized_dir) / normalized_filename
Path(SoundNormalizerService.NORMALIZED_DIR)
/ normalized_filename
)
normalized_path.parent.mkdir(parents=True, exist_ok=True) normalized_path.parent.mkdir(parents=True, exist_ok=True)

View File

@@ -0,0 +1,578 @@
"""Stream processing service with queue management and yt-dlp integration."""
import hashlib
import os
import re
import shutil
import threading
import time
from queue import Empty, Queue
from typing import Dict, List, Optional
from urllib.parse import parse_qs, urlparse
from app.database import db
from app.models.sound import Sound
from app.models.stream import Stream
from app.services.logging_service import LoggingService
# Configure logging
logger = LoggingService.get_logger(__name__)
class StreamProcessingService:
"""Service for processing streaming URLs with yt-dlp."""
# Class variables for queue management
_processing_queue: Queue = Queue()
_processing_threads: List[threading.Thread] = []
_is_running: bool = False
_max_concurrent_downloads: int = int(
os.getenv("STREAM_MAX_CONCURRENT", "2")
)
_downloads_dir: str = "sounds/temp"
@classmethod
def initialize(cls) -> None:
"""Initialize the stream processing service."""
if cls._is_running:
return
# Create necessary directories
os.makedirs(cls._downloads_dir, exist_ok=True)
os.makedirs("sounds/stream", exist_ok=True)
os.makedirs("sounds/stream/thumbnails", exist_ok=True)
# Start processing threads
for i in range(cls._max_concurrent_downloads):
thread = threading.Thread(
target=cls._worker_thread,
name=f"StreamProcessor-{i + 1}",
daemon=True,
)
thread.start()
cls._processing_threads.append(thread)
cls._is_running = True
logger.info(
f"StreamProcessingService initialized with {cls._max_concurrent_downloads} workers"
)
@classmethod
def add_to_queue(cls, stream_id: int) -> None:
"""Add a stream to the processing queue."""
if not cls._is_running:
cls.initialize()
cls._processing_queue.put(stream_id)
logger.info(f"Added stream {stream_id} to processing queue")
@classmethod
def get_queue_status(cls) -> Dict:
"""Get the current queue status."""
pending_count = Stream.query.filter_by(status="pending").count()
processing_count = Stream.query.filter_by(status="processing").count()
return {
"queue_size": cls._processing_queue.qsize(),
"pending_streams": pending_count,
"processing_streams": processing_count,
"max_concurrent": cls._max_concurrent_downloads,
"is_running": cls._is_running,
}
@classmethod
def _worker_thread(cls) -> None:
"""Worker thread for processing streams."""
from app import create_app
# Create app context for database operations
app = create_app()
while True:
try:
# Get stream ID from queue with timeout
stream_id = cls._processing_queue.get(timeout=1)
with app.app_context():
cls._process_stream(stream_id)
cls._processing_queue.task_done()
except Empty:
# No items in queue, continue
continue
except Exception as e:
logger.error(f"Error in worker thread: {e}")
continue
@classmethod
def _process_stream(cls, stream_id: int) -> None:
"""Process a single stream."""
try:
stream = Stream.query.get(stream_id)
if not stream:
logger.error(f"Stream {stream_id} not found")
return
if stream.status == "cancelled":
logger.info(f"Stream {stream_id} was cancelled")
return
# Update status to processing
stream.status = "processing"
db.session.commit()
logger.info(
f"Starting processing of stream {stream_id}: {stream.url}"
)
# Extract metadata and download audio
metadata, error_msg = cls._extract_metadata(stream.url)
if not metadata:
if not error_msg:
error_msg = "Failed to extract metadata from URL"
stream.status = "failed"
stream.error = error_msg
db.session.commit()
logger.error(
f"Failed to extract metadata for stream {stream_id}: {error_msg}"
)
return
# Check for duplicate streams based on service and service_id
service = metadata.get("service")
service_id = metadata.get("service_id")
if service and service_id:
existing_stream = (
Stream.query.filter_by(
service=service, service_id=service_id
)
.filter(Stream.id != stream.id)
.first()
)
if existing_stream:
error_msg = f"Stream already exists with {service} ID: {service_id} (stream #{existing_stream.id})"
stream.status = "failed"
stream.error = error_msg
db.session.commit()
logger.error(
f"Duplicate stream detected for {stream_id}: {error_msg}"
)
return
# Update stream with metadata
cls._update_stream_metadata(stream, metadata)
# Download audio
audio_path, error_msg = cls._download_audio(stream.url, metadata)
if not audio_path:
if not error_msg:
error_msg = "Failed to download audio from URL"
stream.status = "failed"
stream.error = error_msg
db.session.commit()
logger.error(
f"Failed to download audio for stream {stream_id}: {error_msg}"
)
return
# Move files to final locations
final_audio_path, thumbnail_path, error_msg = (
cls._move_files_to_final_location(audio_path, metadata)
)
if not final_audio_path:
if not error_msg:
error_msg = "Failed to move files to final location"
stream.status = "failed"
stream.error = error_msg
db.session.commit()
logger.error(
f"Failed to move files for stream {stream_id}: {error_msg}"
)
return
# Create sound entry with final path
sound, error_msg = cls._create_sound_entry(
final_audio_path, metadata, thumbnail_path
)
if not sound:
if not error_msg:
error_msg = "Failed to create sound entry in database"
stream.status = "failed"
stream.error = error_msg
db.session.commit()
logger.error(
f"Failed to create sound entry for stream {stream_id}: {error_msg}"
)
return
# Update stream with sound_id and mark as completed
stream.sound_id = sound.id
stream.status = "completed"
stream.error = None # Clear any previous errors
db.session.commit()
logger.info(
f"Successfully processed stream {stream_id} -> sound {sound.id}"
)
except Exception as e:
error_msg = f"Unexpected error during processing: {str(e)}"
logger.error(f"Error processing stream {stream_id}: {error_msg}")
try:
stream = Stream.query.get(stream_id)
if stream:
stream.status = "failed"
stream.error = error_msg
db.session.commit()
except Exception as db_error:
logger.error(
f"Failed to update stream error in database: {db_error}"
)
@classmethod
def _extract_metadata(
cls, url: str
) -> tuple[Optional[Dict], Optional[str]]:
"""Extract metadata from URL using yt-dlp."""
try:
import yt_dlp
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
# Extract service information
service = cls._detect_service(url)
service_id = cls._extract_service_id(url, info)
metadata = {
"service": info.get("extractor", service),
"service_id": info.get("id", service_id),
"title": info.get("title", ""),
"track": info.get("track", ""),
"artist": info.get("artist", "")
or info.get("uploader", ""),
"album": info.get("album", ""),
"genre": info.get("genre", ""),
"duration": info.get("duration", 0),
"description": info.get("description", ""),
}
return metadata, None
except Exception as e:
error_msg = f"yt-dlp extraction failed: {str(e)}"
logger.error(f"Error extracting metadata from {url}: {error_msg}")
return None, error_msg
@classmethod
def _download_audio(
cls, url: str, metadata: Dict
) -> tuple[Optional[str], Optional[str]]:
"""Download audio from URL using yt-dlp."""
try:
import yt_dlp
# Generate filename
title = metadata.get("title", "unknown")
safe_title = re.sub(r"[^\w\s-]", "", title)[:50]
filename = f"{safe_title}_{metadata.get('service_id', 'unknown')}"
output_path = os.path.join(
cls._downloads_dir, f"{filename}.%(ext)s"
)
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": output_path,
"extractaudio": True,
"audioformat": "opus",
"audioquality": "192",
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "opus",
"preferredquality": "192",
}
],
"writethumbnail": True,
"quiet": True,
"no_warnings": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# Find the downloaded file
final_path = os.path.join(cls._downloads_dir, f"{filename}.opus")
if os.path.exists(final_path):
return final_path, None
# If opus doesn't exist, look for other formats
for ext in ["mp3", "wav", "m4a", "webm", "ogg"]:
alt_path = os.path.join(cls._downloads_dir, f"{filename}.{ext}")
if os.path.exists(alt_path):
return alt_path, None
error_msg = f"Downloaded file not found after yt-dlp processing"
logger.error(f"Downloaded file not found for {url}")
return None, error_msg
except Exception as e:
error_msg = f"yt-dlp download failed: {str(e)}"
logger.error(f"Error downloading audio from {url}: {error_msg}")
return None, error_msg
@classmethod
def _move_files_to_final_location(
cls, audio_path: str, metadata: dict
) -> tuple[Optional[str], Optional[str], Optional[str]]:
"""Move downloaded files to their final locations.
Returns:
tuple: (final_audio_path, thumbnail_path, error_message)
"""
try:
# Create target directories
stream_dir = "sounds/stream"
thumbnail_dir = "sounds/stream/thumbnails"
os.makedirs(stream_dir, exist_ok=True)
os.makedirs(thumbnail_dir, exist_ok=True)
# Generate safe filename
title = metadata.get("title", "unknown")
safe_title = re.sub(r"[^\w\s-]", "", title)[:50]
service_id = metadata.get("service_id", "unknown")
base_filename = f"{safe_title}_{service_id}"
# Move audio file
audio_extension = os.path.splitext(audio_path)[1]
final_audio_filename = f"{base_filename}{audio_extension}"
final_audio_path = os.path.join(stream_dir, final_audio_filename)
# If file already exists, add a counter
counter = 1
while os.path.exists(final_audio_path):
final_audio_filename = (
f"{base_filename}_{counter}{audio_extension}"
)
final_audio_path = os.path.join(
stream_dir, final_audio_filename
)
counter += 1
shutil.move(audio_path, final_audio_path)
logger.info(f"Moved audio file to: {final_audio_path}")
# Look for and move thumbnail
thumbnail_path = None
temp_dir = os.path.dirname(audio_path)
# Common thumbnail extensions
for thumb_ext in [".jpg", ".jpeg", ".png", ".webp", ".gif"]:
temp_thumb_path = os.path.join(
temp_dir,
f"{os.path.splitext(os.path.basename(audio_path))[0]}{thumb_ext}",
)
if os.path.exists(temp_thumb_path):
final_thumb_filename = f"{base_filename}{thumb_ext}"
final_thumb_path = os.path.join(
thumbnail_dir, final_thumb_filename
)
# Handle duplicate thumbnail names
thumb_counter = 1
while os.path.exists(final_thumb_path):
final_thumb_filename = (
f"{base_filename}_{thumb_counter}{thumb_ext}"
)
final_thumb_path = os.path.join(
thumbnail_dir, final_thumb_filename
)
thumb_counter += 1
shutil.move(temp_thumb_path, final_thumb_path)
thumbnail_path = final_thumb_path
logger.info(f"Moved thumbnail to: {final_thumb_path}")
break
return final_audio_path, thumbnail_path, None
except Exception as e:
error_msg = f"File move operation failed: {str(e)}"
logger.error(f"Error moving files: {error_msg}")
return None, None, error_msg
@classmethod
def _create_sound_entry(
cls,
audio_path: str,
metadata: dict,
thumbnail_path: str | None = None,
) -> tuple[Sound | None, str | None]:
"""Create a sound entry from the downloaded audio."""
try:
# Get file info
file_size = os.path.getsize(audio_path)
# Generate hash
file_hash = cls._calculate_file_hash(audio_path)
# Get duration (use metadata duration or calculate from file)
duration_ms = int((metadata.get("duration", 0) or 0) * 1000)
# Get thumbnail filename if available
thumbnail_filename = None
if thumbnail_path:
thumbnail_filename = os.path.basename(thumbnail_path)
# Create sound entry
sound = Sound(
type="STR", # Stream type
name=metadata.get("title", "Unknown Title"),
filename=os.path.basename(audio_path),
thumbnail=thumbnail_filename,
duration=duration_ms,
size=file_size,
hash=file_hash,
is_music=True, # Streams are typically music
is_deletable=True,
)
db.session.add(sound)
db.session.commit()
# Add sound to main playlist
cls._add_sound_to_main_playlist(sound)
# Normalize the sound
cls._normalize_sound(sound)
return sound, None
except Exception as e:
error_msg = f"Database error while creating sound entry: {str(e)}"
logger.error(
f"Error creating sound entry for {audio_path}: {error_msg}"
)
return None, error_msg
@classmethod
def _update_stream_metadata(cls, stream: Stream, metadata: Dict) -> None:
"""Update stream with extracted metadata."""
stream.service = metadata.get("service")
stream.service_id = metadata.get("service_id")
stream.title = metadata.get("title")
stream.track = metadata.get("track")
stream.artist = metadata.get("artist")
stream.album = metadata.get("album")
stream.genre = metadata.get("genre")
db.session.commit()
@classmethod
def _detect_service(cls, url: str) -> str:
"""Detect the streaming service from URL."""
domain = urlparse(url).netloc.lower()
if "youtube.com" in domain or "youtu.be" in domain:
return "youtube"
elif "soundcloud.com" in domain:
return "soundcloud"
elif "dailymotion.com" in domain:
return "dailymotion"
elif "spotify.com" in domain:
return "spotify"
elif "vimeo.com" in domain:
return "vimeo"
elif "twitch.tv" in domain:
return "twitch"
else:
return "unknown"
@classmethod
def _extract_service_id(cls, url: str, info: Dict) -> str:
"""Extract service-specific ID from URL or info."""
service = cls._detect_service(url)
if service == "youtube":
# Try to get from info first
if "id" in info:
return info["id"]
# Parse from URL
parsed = urlparse(url)
if "youtu.be" in parsed.netloc:
return parsed.path[1:] # Remove leading slash
elif "youtube.com" in parsed.netloc:
query_params = parse_qs(parsed.query)
return query_params.get("v", [""])[0]
elif service == "soundcloud":
if "id" in info:
return str(info["id"])
# Fallback to using info ID or last part of URL
if "id" in info:
return str(info["id"])
return urlparse(url).path.split("/")[-1] or "unknown"
@classmethod
def _add_sound_to_main_playlist(cls, sound: Sound) -> None:
"""Add a sound to the main playlist."""
try:
from app.models.playlist import Playlist
# Find the main playlist
main_playlist = Playlist.find_main_playlist()
if main_playlist:
# Add sound to the main playlist
main_playlist.add_sound(sound.id, commit=True)
logger.info(f"Added sound {sound.id} to main playlist")
else:
logger.warning("Main playlist not found - sound not added to any playlist")
except Exception as e:
logger.error(f"Failed to add sound {sound.id} to main playlist: {e}")
@classmethod
def _normalize_sound(cls, sound: Sound) -> None:
"""Normalize a stream sound using the sound normalizer service."""
try:
from app.services.sound_normalizer_service import SoundNormalizerService
logger.info(f"Starting normalization of stream sound {sound.id}: {sound.name}")
# Normalize the sound (overwrite=True since it's a new sound)
result = SoundNormalizerService.normalize_sound(
sound.id,
overwrite=True,
two_pass=True
)
if result.get("success"):
logger.info(f"Successfully normalized stream sound {sound.id}")
else:
error_msg = result.get("error", "Unknown normalization error")
logger.warning(f"Failed to normalize stream sound {sound.id}: {error_msg}")
except Exception as e:
logger.error(f"Error normalizing stream sound {sound.id}: {e}")
@classmethod
def _calculate_file_hash(cls, file_path: str) -> str:
"""Calculate SHA256 hash of file."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()

View File

@@ -19,6 +19,7 @@ dependencies = [
"python-dotenv==1.1.1", "python-dotenv==1.1.1",
"requests==2.32.4", "requests==2.32.4",
"werkzeug==3.1.3", "werkzeug==3.1.3",
"yt-dlp>=2025.6.30",
] ]
[dependency-groups] [dependency-groups]

11
uv.lock generated
View File

@@ -640,6 +640,7 @@ dependencies = [
{ name = "python-dotenv" }, { name = "python-dotenv" },
{ name = "requests" }, { name = "requests" },
{ name = "werkzeug" }, { name = "werkzeug" },
{ name = "yt-dlp" },
] ]
[package.dev-dependencies] [package.dev-dependencies]
@@ -664,6 +665,7 @@ requires-dist = [
{ name = "python-dotenv", specifier = "==1.1.1" }, { name = "python-dotenv", specifier = "==1.1.1" },
{ name = "requests", specifier = "==2.32.4" }, { name = "requests", specifier = "==2.32.4" },
{ name = "werkzeug", specifier = "==3.1.3" }, { name = "werkzeug", specifier = "==3.1.3" },
{ name = "yt-dlp", specifier = ">=2025.6.30" },
] ]
[package.metadata.requires-dev] [package.metadata.requires-dev]
@@ -776,3 +778,12 @@ sdist = { url = "https://files.pythonhosted.org/packages/c9/4a/44d3c295350d77642
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/78/58/e860788190eba3bcce367f74d29c4675466ce8dddfba85f7827588416f01/wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736", size = 24226 }, { url = "https://files.pythonhosted.org/packages/78/58/e860788190eba3bcce367f74d29c4675466ce8dddfba85f7827588416f01/wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736", size = 24226 },
] ]
[[package]]
name = "yt-dlp"
version = "2025.6.30"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/23/9c/ff64c2fed7909f43a9a0aedb7395c65404e71c2439198764685a6e3b3059/yt_dlp-2025.6.30.tar.gz", hash = "sha256:6d0ae855c0a55bfcc28dffba804ec8525b9b955d34a41191a1561a4cec03d8bd", size = 3034364 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/14/41/2f048ae3f6d0fa2e59223f08ba5049dbcdac628b0a9f9deac722dd9260a5/yt_dlp-2025.6.30-py3-none-any.whl", hash = "sha256:541becc29ed7b7b3a08751c0a66da4b7f8ee95cb81066221c78e83598bc3d1f3", size = 3279333 },
]