"""Background extraction processor for handling extraction queue.""" import asyncio from typing import Set from sqlmodel.ext.asyncio.session import AsyncSession from app.core.config import settings from app.core.database import engine from app.core.logging import get_logger from app.services.extraction import ExtractionService logger = get_logger(__name__) class ExtractionProcessor: """Background processor for handling extraction queue with concurrency control.""" def __init__(self) -> None: """Initialize the extraction processor.""" self.max_concurrent = settings.EXTRACTION_MAX_CONCURRENT self.running_extractions: Set[int] = set() self.processing_lock = asyncio.Lock() self.shutdown_event = asyncio.Event() self.processor_task: asyncio.Task | None = None logger.info( "Initialized extraction processor with max concurrent: %d", self.max_concurrent, ) async def start(self) -> None: """Start the background extraction processor.""" if self.processor_task and not self.processor_task.done(): logger.warning("Extraction processor is already running") return self.shutdown_event.clear() self.processor_task = asyncio.create_task(self._process_queue()) logger.info("Started extraction processor") async def stop(self) -> None: """Stop the background extraction processor.""" logger.info("Stopping extraction processor...") self.shutdown_event.set() if self.processor_task and not self.processor_task.done(): try: await asyncio.wait_for(self.processor_task, timeout=30.0) except asyncio.TimeoutError: logger.warning( "Extraction processor did not stop gracefully, cancelling..." ) self.processor_task.cancel() try: await self.processor_task except asyncio.CancelledError: pass logger.info("Extraction processor stopped") async def queue_extraction(self, extraction_id: int) -> None: """Queue an extraction for processing.""" async with self.processing_lock: if extraction_id not in self.running_extractions: logger.info("Queued extraction %d for processing", extraction_id) # The processor will pick it up on the next cycle else: logger.warning( "Extraction %d is already being processed", extraction_id ) async def _process_queue(self) -> None: """Main processing loop that handles the extraction queue.""" logger.info("Starting extraction queue processor") while not self.shutdown_event.is_set(): try: await self._process_pending_extractions() # Wait before checking for new extractions try: await asyncio.wait_for(self.shutdown_event.wait(), timeout=5.0) break # Shutdown requested except asyncio.TimeoutError: continue # Continue processing except Exception as e: logger.exception("Error in extraction queue processor: %s", e) # Wait a bit before retrying to avoid tight error loops try: await asyncio.wait_for(self.shutdown_event.wait(), timeout=10.0) break # Shutdown requested except asyncio.TimeoutError: continue logger.info("Extraction queue processor stopped") async def _process_pending_extractions(self) -> None: """Process pending extractions up to the concurrency limit.""" async with self.processing_lock: # Check how many slots are available available_slots = self.max_concurrent - len(self.running_extractions) if available_slots <= 0: return # No available slots # Get pending extractions from database async with AsyncSession(engine) as session: extraction_service = ExtractionService(session) pending_extractions = await extraction_service.get_pending_extractions() # Filter out extractions that are already being processed available_extractions = [ ext for ext in pending_extractions if ext["id"] not in self.running_extractions ] # Start processing up to available slots extractions_to_start = available_extractions[:available_slots] for extraction_info in extractions_to_start: extraction_id = extraction_info["id"] self.running_extractions.add(extraction_id) # Start processing this extraction in the background task = asyncio.create_task( self._process_single_extraction(extraction_id) ) task.add_done_callback( lambda t, eid=extraction_id: self._on_extraction_completed( eid, t, ) ) logger.info( "Started processing extraction %d (%d/%d slots used)", extraction_id, len(self.running_extractions), self.max_concurrent, ) async def _process_single_extraction(self, extraction_id: int) -> None: """Process a single extraction.""" try: logger.info("Processing extraction %d", extraction_id) async with AsyncSession(engine) as session: extraction_service = ExtractionService(session) result = await extraction_service.process_extraction(extraction_id) logger.info( "Completed extraction %d with status: %s", extraction_id, result["status"], ) except Exception as e: logger.exception("Error processing extraction %d: %s", extraction_id, e) def _on_extraction_completed(self, extraction_id: int, task: asyncio.Task) -> None: """Callback when an extraction task is completed.""" # Remove from running set self.running_extractions.discard(extraction_id) # Check if the task had an exception if task.exception(): logger.error( "Extraction %d completed with exception: %s", extraction_id, task.exception(), ) else: logger.info( "Extraction %d completed successfully (%d/%d slots used)", extraction_id, len(self.running_extractions), self.max_concurrent, ) def get_status(self) -> dict: """Get the current status of the extraction processor.""" return { "running": self.processor_task is not None and not self.processor_task.done(), "max_concurrent": self.max_concurrent, "currently_processing": len(self.running_extractions), "processing_ids": list(self.running_extractions), "available_slots": self.max_concurrent - len(self.running_extractions), } # Global extraction processor instance extraction_processor = ExtractionProcessor()