Files
sdb2-backend/app/services/extraction_processor.py
JSC 16eb789539
Some checks failed
Backend CI / lint (push) Failing after 4m53s
Backend CI / test (push) Failing after 4m31s
feat: Add method to get extractions by status and implement user info retrieval in extraction service
2025-08-24 13:24:48 +02:00

239 lines
9.2 KiB
Python

"""Background extraction processor for handling extraction queue."""
import asyncio
import contextlib
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.config import settings
from app.core.database import engine
from app.core.logging import get_logger
from app.services.extraction import ExtractionService
logger = get_logger(__name__)
class ExtractionProcessor:
"""Background processor for handling extraction queue with concurrency control."""
def __init__(self) -> None:
"""Initialize the extraction processor."""
self.max_concurrent = settings.EXTRACTION_MAX_CONCURRENT
self.running_extractions: set[int] = set()
self.processing_lock = asyncio.Lock()
self.shutdown_event = asyncio.Event()
self.processor_task: asyncio.Task | None = None
logger.info(
"Initialized extraction processor with max concurrent: %d",
self.max_concurrent,
)
async def start(self) -> None:
"""Start the background extraction processor."""
if self.processor_task and not self.processor_task.done():
logger.warning("Extraction processor is already running")
return
# Reset any stuck extractions from previous runs
await self._reset_stuck_extractions()
self.shutdown_event.clear()
self.processor_task = asyncio.create_task(self._process_queue())
logger.info("Started extraction processor")
async def stop(self) -> None:
"""Stop the background extraction processor."""
logger.info("Stopping extraction processor...")
self.shutdown_event.set()
if self.processor_task and not self.processor_task.done():
try:
await asyncio.wait_for(self.processor_task, timeout=30.0)
except TimeoutError:
logger.warning(
"Extraction processor did not stop gracefully, cancelling...",
)
self.processor_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.processor_task
logger.info("Extraction processor stopped")
async def queue_extraction(self, extraction_id: int) -> None:
"""Queue an extraction for processing."""
async with self.processing_lock:
if extraction_id not in self.running_extractions:
logger.info("Queued extraction %d for processing", extraction_id)
# The processor will pick it up on the next cycle
else:
logger.warning(
"Extraction %d is already being processed",
extraction_id,
)
async def _process_queue(self) -> None:
"""Process the extraction queue in the main processing loop."""
logger.info("Starting extraction queue processor")
while not self.shutdown_event.is_set():
try:
await self._process_pending_extractions()
# Wait before checking for new extractions
try:
await asyncio.wait_for(self.shutdown_event.wait(), timeout=5.0)
break # Shutdown requested
except TimeoutError:
continue # Continue processing
except Exception:
logger.exception("Error in extraction queue processor")
# Wait a bit before retrying to avoid tight error loops
try:
await asyncio.wait_for(self.shutdown_event.wait(), timeout=10.0)
break # Shutdown requested
except TimeoutError:
continue
logger.info("Extraction queue processor stopped")
async def _process_pending_extractions(self) -> None:
"""Process pending extractions up to the concurrency limit."""
async with self.processing_lock:
# Check how many slots are available
available_slots = self.max_concurrent - len(self.running_extractions)
if available_slots <= 0:
return # No available slots
# Get pending extractions from database
async with AsyncSession(engine) as session:
extraction_service = ExtractionService(session)
pending_extractions = await extraction_service.get_pending_extractions()
# Filter out extractions that are already being processed
available_extractions = [
ext
for ext in pending_extractions
if ext["id"] not in self.running_extractions
]
# Start processing up to available slots
extractions_to_start = available_extractions[:available_slots]
for extraction_info in extractions_to_start:
extraction_id = extraction_info["id"]
self.running_extractions.add(extraction_id)
# Start processing this extraction in the background
task = asyncio.create_task(
self._process_single_extraction(extraction_id),
)
task.add_done_callback(
lambda t, eid=extraction_id: self._on_extraction_completed(
eid,
t,
),
)
logger.info(
"Started processing extraction %d (%d/%d slots used)",
extraction_id,
len(self.running_extractions),
self.max_concurrent,
)
async def _process_single_extraction(self, extraction_id: int) -> None:
"""Process a single extraction."""
try:
logger.info("Processing extraction %d", extraction_id)
async with AsyncSession(engine) as session:
extraction_service = ExtractionService(session)
result = await extraction_service.process_extraction(extraction_id)
logger.info(
"Completed extraction %d with status: %s",
extraction_id,
result["status"],
)
except Exception:
logger.exception("Error processing extraction %d", extraction_id)
def _on_extraction_completed(self, extraction_id: int, task: asyncio.Task) -> None:
"""Handle completion of an extraction task."""
# Remove from running set
self.running_extractions.discard(extraction_id)
# Check if the task had an exception
if task.exception():
logger.error(
"Extraction %d completed with exception: %s",
extraction_id,
task.exception(),
)
else:
logger.info(
"Extraction %d completed successfully (%d/%d slots used)",
extraction_id,
len(self.running_extractions),
self.max_concurrent,
)
async def _reset_stuck_extractions(self) -> None:
"""Reset any extractions stuck in 'processing' status back to 'pending'."""
try:
async with AsyncSession(engine) as session:
extraction_service = ExtractionService(session)
# Get all extractions stuck in processing status
stuck_extractions = (
await extraction_service.extraction_repo.get_by_status("processing")
)
if not stuck_extractions:
logger.info("No stuck extractions found to reset")
return
reset_count = 0
for extraction in stuck_extractions:
try:
await extraction_service.extraction_repo.update(
extraction, {"status": "pending", "error": None}
)
reset_count += 1
logger.info(
"Reset stuck extraction %d from processing to pending",
extraction.id,
)
except Exception:
logger.exception(
"Failed to reset extraction %d", extraction.id
)
await session.commit()
logger.info(
"Successfully reset %d stuck extractions from processing to pending",
reset_count,
)
except Exception:
logger.exception("Failed to reset stuck extractions")
def get_status(self) -> dict:
"""Get the current status of the extraction processor."""
return {
"running": self.processor_task is not None
and not self.processor_task.done(),
"max_concurrent": self.max_concurrent,
"currently_processing": len(self.running_extractions),
"processing_ids": list(self.running_extractions),
"available_slots": self.max_concurrent - len(self.running_extractions),
}
# Global extraction processor instance
extraction_processor = ExtractionProcessor()