feat: Update Extraction model and service to support deferred service detection
This commit is contained in:
@@ -12,8 +12,8 @@ if TYPE_CHECKING:
|
||||
class Extraction(BaseModel, table=True):
|
||||
"""Database model for a stream."""
|
||||
|
||||
service: str = Field(nullable=False)
|
||||
service_id: str = Field(nullable=False)
|
||||
service: str | None = Field(default=None)
|
||||
service_id: str | None = Field(default=None)
|
||||
user_id: int = Field(foreign_key="user.id", nullable=False)
|
||||
sound_id: int | None = Field(foreign_key="sound.id", default=None)
|
||||
url: str = Field(nullable=False)
|
||||
@@ -25,14 +25,8 @@ class Extraction(BaseModel, table=True):
|
||||
status: str = Field(nullable=False, default="pending")
|
||||
error: str | None = Field(default=None)
|
||||
|
||||
# constraints
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"service",
|
||||
"service_id",
|
||||
name="uq_extraction_service_service_id",
|
||||
),
|
||||
)
|
||||
# constraints - only enforce uniqueness when both service and service_id are not null
|
||||
__table_args__ = ()
|
||||
|
||||
# relationships
|
||||
sound: "Sound" = Relationship(back_populates="extractions")
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Extraction service for audio extraction from external services using yt-dlp."""
|
||||
|
||||
import asyncio
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import TypedDict
|
||||
@@ -24,8 +25,8 @@ class ExtractionInfo(TypedDict):
|
||||
|
||||
id: int
|
||||
url: str
|
||||
service: str
|
||||
service_id: str
|
||||
service: str | None
|
||||
service_id: str | None
|
||||
title: str | None
|
||||
status: str
|
||||
error: str | None
|
||||
@@ -61,39 +62,13 @@ class ExtractionService:
|
||||
logger.info("Creating extraction for URL: %s (user: %d)", url, user_id)
|
||||
|
||||
try:
|
||||
# First, detect service and service_id using yt-dlp
|
||||
service_info = self._detect_service_info(url)
|
||||
|
||||
if not service_info:
|
||||
raise ValueError("Unable to detect service information from URL")
|
||||
|
||||
service = service_info["service"]
|
||||
service_id = service_info["service_id"]
|
||||
title = service_info.get("title")
|
||||
|
||||
logger.info(
|
||||
"Detected service: %s, service_id: %s, title: %s",
|
||||
service,
|
||||
service_id,
|
||||
title,
|
||||
)
|
||||
|
||||
# Check if extraction already exists
|
||||
existing = await self.extraction_repo.get_by_service_and_id(
|
||||
service, service_id
|
||||
)
|
||||
if existing:
|
||||
error_msg = f"Extraction already exists for {service}:{service_id}"
|
||||
logger.warning(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
# Create the extraction record
|
||||
# Create the extraction record without service detection for fast response
|
||||
extraction_data = {
|
||||
"url": url,
|
||||
"user_id": user_id,
|
||||
"service": service,
|
||||
"service_id": service_id,
|
||||
"title": title,
|
||||
"service": None, # Will be detected during processing
|
||||
"service_id": None, # Will be detected during processing
|
||||
"title": None, # Will be detected during processing
|
||||
"status": "pending",
|
||||
}
|
||||
|
||||
@@ -115,7 +90,7 @@ class ExtractionService:
|
||||
logger.exception("Failed to create extraction for URL: %s", url)
|
||||
raise
|
||||
|
||||
def _detect_service_info(self, url: str) -> dict | None:
|
||||
async def _detect_service_info(self, url: str) -> dict[str, str | None] | None:
|
||||
"""Detect service information from URL using yt-dlp."""
|
||||
try:
|
||||
# Configure yt-dlp for info extraction only
|
||||
@@ -125,35 +100,22 @@ class ExtractionService:
|
||||
"extract_flat": False,
|
||||
}
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
# Extract info without downloading
|
||||
info = ydl.extract_info(url, download=False)
|
||||
def _extract_info() -> dict | None:
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
# Extract info without downloading
|
||||
return ydl.extract_info(url, download=False)
|
||||
|
||||
if not info:
|
||||
return None
|
||||
# Run the blocking operation in a thread pool
|
||||
info = await asyncio.to_thread(_extract_info)
|
||||
|
||||
# Map extractor names to our service names
|
||||
extractor_map = {
|
||||
"youtube": "youtube",
|
||||
"dailymotion": "dailymotion",
|
||||
"vimeo": "vimeo",
|
||||
"soundcloud": "soundcloud",
|
||||
"twitter": "twitter",
|
||||
"tiktok": "tiktok",
|
||||
"instagram": "instagram",
|
||||
}
|
||||
if not info:
|
||||
return None
|
||||
|
||||
extractor = info.get("extractor", "").lower()
|
||||
service = extractor_map.get(extractor, extractor)
|
||||
|
||||
return {
|
||||
"service": service,
|
||||
"service_id": str(info.get("id", "")),
|
||||
"title": info.get("title"),
|
||||
"duration": info.get("duration"),
|
||||
"uploader": info.get("uploader"),
|
||||
"description": info.get("description"),
|
||||
}
|
||||
return {
|
||||
"service": info.get("extractor", ""),
|
||||
"service_id": str(info.get("id", "")),
|
||||
"title": info.get("title"),
|
||||
}
|
||||
|
||||
except Exception:
|
||||
logger.exception("Failed to detect service info for URL: %s", url)
|
||||
@@ -171,9 +133,9 @@ class ExtractionService:
|
||||
# Store all needed values early to avoid session detachment issues
|
||||
user_id = extraction.user_id
|
||||
extraction_url = extraction.url
|
||||
extraction_title = extraction.title
|
||||
extraction_service = extraction.service
|
||||
extraction_service_id = extraction.service_id
|
||||
extraction_title = extraction.title
|
||||
|
||||
logger.info("Processing extraction %d: %s", extraction_id, extraction_url)
|
||||
|
||||
@@ -181,20 +143,51 @@ class ExtractionService:
|
||||
# Update status to processing
|
||||
await self.extraction_repo.update(extraction, {"status": "processing"})
|
||||
|
||||
# Detect service info if not already available
|
||||
if not extraction_service or not extraction_service_id:
|
||||
logger.info("Detecting service info for extraction %d", extraction_id)
|
||||
service_info = await self._detect_service_info(extraction_url)
|
||||
|
||||
if not service_info:
|
||||
raise ValueError("Unable to detect service information from URL")
|
||||
|
||||
# Check if extraction already exists for this service
|
||||
existing = await self.extraction_repo.get_by_service_and_id(
|
||||
service_info["service"], service_info["service_id"]
|
||||
)
|
||||
if existing and existing.id != extraction_id:
|
||||
error_msg = f"Extraction already exists for {service_info['service']}:{service_info['service_id']}"
|
||||
logger.warning(error_msg)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
# Update extraction with service info
|
||||
update_data = {
|
||||
"service": service_info["service"],
|
||||
"service_id": service_info["service_id"],
|
||||
"title": service_info.get("title") or extraction_title,
|
||||
}
|
||||
await self.extraction_repo.update(extraction, update_data)
|
||||
|
||||
# Update values for processing
|
||||
extraction_service = service_info["service"]
|
||||
extraction_service_id = service_info["service_id"]
|
||||
extraction_title = service_info.get("title") or extraction_title
|
||||
|
||||
# Extract audio and thumbnail
|
||||
audio_file, thumbnail_file = await self._extract_media(
|
||||
extraction_id, extraction_url
|
||||
)
|
||||
|
||||
# Move files to final locations
|
||||
final_audio_path, final_thumbnail_path = (
|
||||
await self._move_files_to_final_location(
|
||||
audio_file,
|
||||
thumbnail_file,
|
||||
extraction_title,
|
||||
extraction_service,
|
||||
extraction_service_id,
|
||||
)
|
||||
(
|
||||
final_audio_path,
|
||||
final_thumbnail_path,
|
||||
) = await self._move_files_to_final_location(
|
||||
audio_file,
|
||||
thumbnail_file,
|
||||
extraction_title,
|
||||
extraction_service,
|
||||
extraction_service_id,
|
||||
)
|
||||
|
||||
# Create Sound record
|
||||
@@ -294,36 +287,40 @@ class ExtractionService:
|
||||
],
|
||||
}
|
||||
|
||||
try:
|
||||
def _download_media() -> None:
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
# Download and extract
|
||||
ydl.download([extraction_url])
|
||||
|
||||
# Find the extracted files
|
||||
audio_files = list(
|
||||
temp_dir.glob(
|
||||
f"extraction_{extraction_id}_*.{settings.EXTRACTION_AUDIO_FORMAT}"
|
||||
)
|
||||
)
|
||||
thumbnail_files = (
|
||||
list(temp_dir.glob(f"extraction_{extraction_id}_*.webp"))
|
||||
+ list(temp_dir.glob(f"extraction_{extraction_id}_*.jpg"))
|
||||
+ list(temp_dir.glob(f"extraction_{extraction_id}_*.png"))
|
||||
try:
|
||||
# Run the blocking download operation in a thread pool
|
||||
await asyncio.to_thread(_download_media)
|
||||
|
||||
# Find the extracted files
|
||||
audio_files = list(
|
||||
temp_dir.glob(
|
||||
f"extraction_{extraction_id}_*.{settings.EXTRACTION_AUDIO_FORMAT}"
|
||||
)
|
||||
)
|
||||
thumbnail_files = (
|
||||
list(temp_dir.glob(f"extraction_{extraction_id}_*.webp"))
|
||||
+ list(temp_dir.glob(f"extraction_{extraction_id}_*.jpg"))
|
||||
+ list(temp_dir.glob(f"extraction_{extraction_id}_*.png"))
|
||||
)
|
||||
|
||||
if not audio_files:
|
||||
raise RuntimeError("No audio file was created during extraction")
|
||||
if not audio_files:
|
||||
raise RuntimeError("No audio file was created during extraction")
|
||||
|
||||
audio_file = audio_files[0]
|
||||
thumbnail_file = thumbnail_files[0] if thumbnail_files else None
|
||||
audio_file = audio_files[0]
|
||||
thumbnail_file = thumbnail_files[0] if thumbnail_files else None
|
||||
|
||||
logger.info(
|
||||
"Extracted audio: %s, thumbnail: %s",
|
||||
audio_file,
|
||||
thumbnail_file or "None",
|
||||
)
|
||||
logger.info(
|
||||
"Extracted audio: %s, thumbnail: %s",
|
||||
audio_file,
|
||||
thumbnail_file or "None",
|
||||
)
|
||||
|
||||
return audio_file, thumbnail_file
|
||||
return audio_file, thumbnail_file
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("yt-dlp extraction failed for %s", extraction_url)
|
||||
@@ -334,12 +331,14 @@ class ExtractionService:
|
||||
audio_file: Path,
|
||||
thumbnail_file: Path | None,
|
||||
title: str | None,
|
||||
service: str,
|
||||
service_id: str,
|
||||
service: str | None,
|
||||
service_id: str | None,
|
||||
) -> tuple[Path, Path | None]:
|
||||
"""Move extracted files to their final locations."""
|
||||
# Generate clean filename based on title and service
|
||||
safe_title = self._sanitize_filename(title or f"{service}_{service_id}")
|
||||
safe_title = self._sanitize_filename(
|
||||
title or f"{service or 'unknown'}_{service_id or 'unknown'}"
|
||||
)
|
||||
|
||||
# Move audio file
|
||||
final_audio_path = (
|
||||
@@ -395,7 +394,11 @@ class ExtractionService:
|
||||
counter += 1
|
||||
|
||||
async def _create_sound_record(
|
||||
self, audio_path: Path, title: str | None, service: str, service_id: str
|
||||
self,
|
||||
audio_path: Path,
|
||||
title: str | None,
|
||||
service: str | None,
|
||||
service_id: str | None,
|
||||
) -> Sound:
|
||||
"""Create a Sound record for the extracted audio."""
|
||||
# Get audio metadata
|
||||
@@ -406,7 +409,7 @@ class ExtractionService:
|
||||
# Create sound data
|
||||
sound_data = {
|
||||
"type": "EXT",
|
||||
"name": title or f"{service}_{service_id}",
|
||||
"name": title or f"{service or 'unknown'}_{service_id or 'unknown'}",
|
||||
"filename": audio_path.name,
|
||||
"duration": duration,
|
||||
"size": size,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
"""Background extraction processor for handling extraction queue."""
|
||||
|
||||
import asyncio
|
||||
from typing import Set
|
||||
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
@@ -19,7 +18,7 @@ class ExtractionProcessor:
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the extraction processor."""
|
||||
self.max_concurrent = settings.EXTRACTION_MAX_CONCURRENT
|
||||
self.running_extractions: Set[int] = set()
|
||||
self.running_extractions: set[int] = set()
|
||||
self.processing_lock = asyncio.Lock()
|
||||
self.shutdown_event = asyncio.Event()
|
||||
self.processor_task: asyncio.Task | None = None
|
||||
@@ -71,7 +70,7 @@ class ExtractionProcessor:
|
||||
)
|
||||
|
||||
async def _process_queue(self) -> None:
|
||||
"""Main processing loop that handles the extraction queue."""
|
||||
"""Process the extraction queue in the main processing loop."""
|
||||
logger.info("Starting extraction queue processor")
|
||||
|
||||
while not self.shutdown_event.is_set():
|
||||
@@ -161,7 +160,7 @@ class ExtractionProcessor:
|
||||
logger.exception("Error processing extraction %d: %s", extraction_id, e)
|
||||
|
||||
def _on_extraction_completed(self, extraction_id: int, task: asyncio.Task) -> None:
|
||||
"""Callback when an extraction task is completed."""
|
||||
"""Handle completion of an extraction task."""
|
||||
# Remove from running set
|
||||
self.running_extractions.discard(extraction_id)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user