- Implement tests for admin extraction API endpoints including status retrieval, deletion of extractions, and permission checks. - Add tests for user extraction deletion, ensuring proper handling of permissions and non-existent extractions. - Enhance sound endpoint tests to include duplicate handling in responses. - Refactor favorite service tests to utilize mock dependencies for better maintainability and clarity. - Update sound scanner tests to improve file handling and ensure proper deletion of associated files.
240 lines
9.2 KiB
Python
240 lines
9.2 KiB
Python
"""Background extraction processor for handling extraction queue."""
|
|
|
|
import asyncio
|
|
import contextlib
|
|
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import engine
|
|
from app.core.logging import get_logger
|
|
from app.services.extraction import ExtractionService
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class ExtractionProcessor:
|
|
"""Background processor for handling extraction queue with concurrency control."""
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize the extraction processor."""
|
|
self.max_concurrent = settings.EXTRACTION_MAX_CONCURRENT
|
|
self.running_extractions: set[int] = set()
|
|
self.processing_lock = asyncio.Lock()
|
|
self.shutdown_event = asyncio.Event()
|
|
self.processor_task: asyncio.Task | None = None
|
|
|
|
logger.info(
|
|
"Initialized extraction processor with max concurrent: %d",
|
|
self.max_concurrent,
|
|
)
|
|
|
|
async def start(self) -> None:
|
|
"""Start the background extraction processor."""
|
|
if self.processor_task and not self.processor_task.done():
|
|
logger.warning("Extraction processor is already running")
|
|
return
|
|
|
|
# Reset any stuck extractions from previous runs
|
|
await self._reset_stuck_extractions()
|
|
|
|
self.shutdown_event.clear()
|
|
self.processor_task = asyncio.create_task(self._process_queue())
|
|
logger.info("Started extraction processor")
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the background extraction processor."""
|
|
logger.info("Stopping extraction processor...")
|
|
self.shutdown_event.set()
|
|
|
|
if self.processor_task and not self.processor_task.done():
|
|
try:
|
|
await asyncio.wait_for(self.processor_task, timeout=30.0)
|
|
except TimeoutError:
|
|
logger.warning(
|
|
"Extraction processor did not stop gracefully, cancelling...",
|
|
)
|
|
self.processor_task.cancel()
|
|
with contextlib.suppress(asyncio.CancelledError):
|
|
await self.processor_task
|
|
|
|
logger.info("Extraction processor stopped")
|
|
|
|
async def queue_extraction(self, extraction_id: int) -> None:
|
|
"""Queue an extraction for processing."""
|
|
async with self.processing_lock:
|
|
if extraction_id not in self.running_extractions:
|
|
logger.info("Queued extraction %d for processing", extraction_id)
|
|
# The processor will pick it up on the next cycle
|
|
else:
|
|
logger.warning(
|
|
"Extraction %d is already being processed",
|
|
extraction_id,
|
|
)
|
|
|
|
async def _process_queue(self) -> None:
|
|
"""Process the extraction queue in the main processing loop."""
|
|
logger.info("Starting extraction queue processor")
|
|
|
|
while not self.shutdown_event.is_set():
|
|
try:
|
|
await self._process_pending_extractions()
|
|
|
|
# Wait before checking for new extractions
|
|
try:
|
|
await asyncio.wait_for(self.shutdown_event.wait(), timeout=5.0)
|
|
break # Shutdown requested
|
|
except TimeoutError:
|
|
continue # Continue processing
|
|
|
|
except Exception:
|
|
logger.exception("Error in extraction queue processor")
|
|
# Wait a bit before retrying to avoid tight error loops
|
|
try:
|
|
await asyncio.wait_for(self.shutdown_event.wait(), timeout=10.0)
|
|
break # Shutdown requested
|
|
except TimeoutError:
|
|
continue
|
|
|
|
logger.info("Extraction queue processor stopped")
|
|
|
|
async def _process_pending_extractions(self) -> None:
|
|
"""Process pending extractions up to the concurrency limit."""
|
|
async with self.processing_lock:
|
|
# Check how many slots are available
|
|
available_slots = self.max_concurrent - len(self.running_extractions)
|
|
|
|
if available_slots <= 0:
|
|
return # No available slots
|
|
|
|
# Get pending extractions from database
|
|
async with AsyncSession(engine) as session:
|
|
extraction_service = ExtractionService(session)
|
|
pending_extractions = await extraction_service.get_pending_extractions()
|
|
|
|
# Filter out extractions that are already being processed
|
|
available_extractions = [
|
|
ext
|
|
for ext in pending_extractions
|
|
if ext["id"] not in self.running_extractions
|
|
]
|
|
|
|
# Start processing up to available slots
|
|
extractions_to_start = available_extractions[:available_slots]
|
|
|
|
for extraction_info in extractions_to_start:
|
|
extraction_id = extraction_info["id"]
|
|
self.running_extractions.add(extraction_id)
|
|
|
|
# Start processing this extraction in the background
|
|
task = asyncio.create_task(
|
|
self._process_single_extraction(extraction_id),
|
|
)
|
|
task.add_done_callback(
|
|
lambda t, eid=extraction_id: self._on_extraction_completed(
|
|
eid,
|
|
t,
|
|
),
|
|
)
|
|
|
|
logger.info(
|
|
"Started processing extraction %d (%d/%d slots used)",
|
|
extraction_id,
|
|
len(self.running_extractions),
|
|
self.max_concurrent,
|
|
)
|
|
|
|
async def _process_single_extraction(self, extraction_id: int) -> None:
|
|
"""Process a single extraction."""
|
|
try:
|
|
logger.info("Processing extraction %d", extraction_id)
|
|
|
|
async with AsyncSession(engine) as session:
|
|
extraction_service = ExtractionService(session)
|
|
result = await extraction_service.process_extraction(extraction_id)
|
|
|
|
logger.info(
|
|
"Completed extraction %d with status: %s",
|
|
extraction_id,
|
|
result["status"],
|
|
)
|
|
|
|
except Exception:
|
|
logger.exception("Error processing extraction %d", extraction_id)
|
|
|
|
def _on_extraction_completed(self, extraction_id: int, task: asyncio.Task) -> None:
|
|
"""Handle completion of an extraction task."""
|
|
# Remove from running set
|
|
self.running_extractions.discard(extraction_id)
|
|
|
|
# Check if the task had an exception
|
|
if task.exception():
|
|
logger.error(
|
|
"Extraction %d completed with exception: %s",
|
|
extraction_id,
|
|
task.exception(),
|
|
)
|
|
else:
|
|
logger.info(
|
|
"Extraction %d completed successfully (%d/%d slots used)",
|
|
extraction_id,
|
|
len(self.running_extractions),
|
|
self.max_concurrent,
|
|
)
|
|
|
|
async def _reset_stuck_extractions(self) -> None:
|
|
"""Reset any extractions stuck in 'processing' status back to 'pending'."""
|
|
try:
|
|
async with AsyncSession(engine) as session:
|
|
extraction_service = ExtractionService(session)
|
|
|
|
# Get all extractions stuck in processing status
|
|
stuck_extractions = (
|
|
await extraction_service.extraction_repo.get_by_status("processing")
|
|
)
|
|
|
|
if not stuck_extractions:
|
|
logger.info("No stuck extractions found to reset")
|
|
return
|
|
|
|
reset_count = 0
|
|
for extraction in stuck_extractions:
|
|
try:
|
|
await extraction_service.extraction_repo.update(
|
|
extraction, {"status": "pending", "error": None},
|
|
)
|
|
reset_count += 1
|
|
logger.info(
|
|
"Reset stuck extraction %d from processing to pending",
|
|
extraction.id,
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to reset extraction %d", extraction.id,
|
|
)
|
|
|
|
await session.commit()
|
|
logger.info(
|
|
"Successfully reset %d stuck extractions from processing to "
|
|
"pending",
|
|
reset_count,
|
|
)
|
|
|
|
except Exception:
|
|
logger.exception("Failed to reset stuck extractions")
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get the current status of the extraction processor."""
|
|
return {
|
|
"running": self.processor_task is not None
|
|
and not self.processor_task.done(),
|
|
"max_concurrent": self.max_concurrent,
|
|
"currently_processing": len(self.running_extractions),
|
|
"processing_ids": list(self.running_extractions),
|
|
"available_slots": self.max_concurrent - len(self.running_extractions),
|
|
}
|
|
|
|
|
|
# Global extraction processor instance
|
|
extraction_processor = ExtractionProcessor()
|