- Added `ExtractionProcessor` class to handle extraction queue processing in the background. - Implemented methods for starting, stopping, and queuing extractions with concurrency limits. - Integrated logging for monitoring the processor's status and actions. - Created tests for the extraction processor to ensure functionality and error handling. test: Add unit tests for extraction API endpoints - Created tests for successful extraction creation, authentication checks, and processor status retrieval. - Ensured proper responses for authenticated and unauthenticated requests. test: Implement unit tests for extraction repository - Added tests for creating, retrieving, and updating extractions in the repository. - Mocked database interactions to validate repository behavior without actual database access. test: Add comprehensive tests for extraction service - Developed tests for extraction creation, service detection, and sound record creation. - Included tests for handling duplicate extractions and invalid URLs. test: Add unit tests for extraction background processor - Created tests for the `ExtractionProcessor` class to validate its behavior under various conditions. - Ensured proper handling of extraction queuing, processing, and completion callbacks. fix: Update OAuth service tests to use AsyncMock - Modified OAuth provider tests to use `AsyncMock` for mocking asynchronous HTTP requests.
299 lines
11 KiB
Python
299 lines
11 KiB
Python
"""Tests for extraction background processor."""
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, Mock, patch
|
|
|
|
import pytest
|
|
|
|
from app.services.extraction_processor import ExtractionProcessor
|
|
|
|
|
|
class TestExtractionProcessor:
|
|
"""Test extraction background processor."""
|
|
|
|
@pytest.fixture
|
|
def processor(self):
|
|
"""Create an extraction processor instance."""
|
|
# Use a custom processor instance to avoid affecting the global one
|
|
return ExtractionProcessor()
|
|
|
|
def test_init(self, processor):
|
|
"""Test processor initialization."""
|
|
assert processor.max_concurrent > 0
|
|
assert len(processor.running_extractions) == 0
|
|
assert processor.processing_lock is not None
|
|
assert processor.shutdown_event is not None
|
|
assert processor.processor_task is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_and_stop(self, processor):
|
|
"""Test starting and stopping the processor."""
|
|
# Mock the _process_queue method to avoid actual processing
|
|
with patch.object(processor, "_process_queue", new_callable=AsyncMock) as mock_process:
|
|
|
|
# Start the processor
|
|
await processor.start()
|
|
assert processor.processor_task is not None
|
|
assert not processor.processor_task.done()
|
|
|
|
# Stop the processor
|
|
await processor.stop()
|
|
assert processor.processor_task.done()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_already_running(self, processor):
|
|
"""Test starting processor when already running."""
|
|
with patch.object(processor, "_process_queue", new_callable=AsyncMock):
|
|
|
|
# Start first time
|
|
await processor.start()
|
|
first_task = processor.processor_task
|
|
|
|
# Start second time (should not create new task)
|
|
await processor.start()
|
|
assert processor.processor_task is first_task
|
|
|
|
# Clean up
|
|
await processor.stop()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_queue_extraction(self, processor):
|
|
"""Test queuing an extraction."""
|
|
extraction_id = 123
|
|
|
|
await processor.queue_extraction(extraction_id)
|
|
# The extraction should not be in running_extractions yet
|
|
# (it gets added when actually started by the processor)
|
|
assert extraction_id not in processor.running_extractions
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_queue_extraction_already_running(self, processor):
|
|
"""Test queuing an extraction that's already running."""
|
|
extraction_id = 123
|
|
processor.running_extractions.add(extraction_id)
|
|
|
|
await processor.queue_extraction(extraction_id)
|
|
# Should still be in running extractions
|
|
assert extraction_id in processor.running_extractions
|
|
|
|
def test_get_status(self, processor):
|
|
"""Test getting processor status."""
|
|
status = processor.get_status()
|
|
|
|
assert "running" in status
|
|
assert "max_concurrent" in status
|
|
assert "currently_processing" in status
|
|
assert "processing_ids" in status
|
|
assert "available_slots" in status
|
|
|
|
assert status["max_concurrent"] == processor.max_concurrent
|
|
assert status["currently_processing"] == 0
|
|
assert status["available_slots"] == processor.max_concurrent
|
|
|
|
def test_get_status_with_running_extractions(self, processor):
|
|
"""Test getting processor status with running extractions."""
|
|
processor.running_extractions.add(123)
|
|
processor.running_extractions.add(456)
|
|
|
|
status = processor.get_status()
|
|
|
|
assert status["currently_processing"] == 2
|
|
assert status["available_slots"] == processor.max_concurrent - 2
|
|
assert 123 in status["processing_ids"]
|
|
assert 456 in status["processing_ids"]
|
|
|
|
def test_on_extraction_completed(self, processor):
|
|
"""Test extraction completion callback."""
|
|
extraction_id = 123
|
|
processor.running_extractions.add(extraction_id)
|
|
|
|
# Create a mock completed task
|
|
mock_task = Mock()
|
|
mock_task.exception.return_value = None
|
|
|
|
processor._on_extraction_completed(extraction_id, mock_task)
|
|
|
|
# Should be removed from running extractions
|
|
assert extraction_id not in processor.running_extractions
|
|
|
|
def test_on_extraction_completed_with_exception(self, processor):
|
|
"""Test extraction completion callback with exception."""
|
|
extraction_id = 123
|
|
processor.running_extractions.add(extraction_id)
|
|
|
|
# Create a mock task with exception
|
|
mock_task = Mock()
|
|
mock_task.exception.return_value = Exception("Test error")
|
|
|
|
processor._on_extraction_completed(extraction_id, mock_task)
|
|
|
|
# Should still be removed from running extractions
|
|
assert extraction_id not in processor.running_extractions
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_single_extraction_success(self, processor):
|
|
"""Test processing a single extraction successfully."""
|
|
extraction_id = 123
|
|
|
|
# Mock the extraction service
|
|
mock_service = Mock()
|
|
mock_service.process_extraction = AsyncMock(
|
|
return_value={"status": "completed", "id": extraction_id}
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"app.services.extraction_processor.AsyncSession"
|
|
) as mock_session_class,
|
|
patch(
|
|
"app.services.extraction_processor.ExtractionService",
|
|
return_value=mock_service,
|
|
),
|
|
):
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session_class.return_value.__aenter__.return_value = mock_session
|
|
|
|
await processor._process_single_extraction(extraction_id)
|
|
|
|
mock_service.process_extraction.assert_called_once_with(extraction_id)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_single_extraction_failure(self, processor):
|
|
"""Test processing a single extraction with failure."""
|
|
extraction_id = 123
|
|
|
|
# Mock the extraction service to raise an exception
|
|
mock_service = Mock()
|
|
mock_service.process_extraction = AsyncMock(side_effect=Exception("Test error"))
|
|
|
|
with (
|
|
patch(
|
|
"app.services.extraction_processor.AsyncSession"
|
|
) as mock_session_class,
|
|
patch(
|
|
"app.services.extraction_processor.ExtractionService",
|
|
return_value=mock_service,
|
|
),
|
|
):
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session_class.return_value.__aenter__.return_value = mock_session
|
|
|
|
# Should not raise exception (errors are logged)
|
|
await processor._process_single_extraction(extraction_id)
|
|
|
|
mock_service.process_extraction.assert_called_once_with(extraction_id)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_pending_extractions_no_slots(self, processor):
|
|
"""Test processing when no slots are available."""
|
|
# Fill all slots
|
|
for i in range(processor.max_concurrent):
|
|
processor.running_extractions.add(i)
|
|
|
|
# Mock extraction service
|
|
mock_service = Mock()
|
|
mock_service.get_pending_extractions = AsyncMock(
|
|
return_value=[{"id": 100, "status": "pending"}]
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"app.services.extraction_processor.AsyncSession"
|
|
) as mock_session_class,
|
|
patch(
|
|
"app.services.extraction_processor.ExtractionService",
|
|
return_value=mock_service,
|
|
),
|
|
):
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session_class.return_value.__aenter__.return_value = mock_session
|
|
|
|
await processor._process_pending_extractions()
|
|
|
|
# Should not have started any new extractions
|
|
assert 100 not in processor.running_extractions
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_pending_extractions_with_slots(self, processor):
|
|
"""Test processing when slots are available."""
|
|
# Mock extraction service
|
|
mock_service = Mock()
|
|
mock_service.get_pending_extractions = AsyncMock(
|
|
return_value=[
|
|
{"id": 100, "status": "pending"},
|
|
{"id": 101, "status": "pending"},
|
|
]
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"app.services.extraction_processor.AsyncSession"
|
|
) as mock_session_class,
|
|
patch.object(processor, "_process_single_extraction", new_callable=AsyncMock) as mock_process,
|
|
patch(
|
|
"app.services.extraction_processor.ExtractionService",
|
|
return_value=mock_service,
|
|
),
|
|
patch("asyncio.create_task") as mock_create_task,
|
|
):
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session_class.return_value.__aenter__.return_value = mock_session
|
|
|
|
# Mock task creation
|
|
mock_task = Mock()
|
|
mock_create_task.return_value = mock_task
|
|
|
|
await processor._process_pending_extractions()
|
|
|
|
# Should have added extractions to running set
|
|
assert 100 in processor.running_extractions
|
|
assert 101 in processor.running_extractions
|
|
|
|
# Should have created tasks for both
|
|
assert mock_create_task.call_count == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_pending_extractions_respect_limit(self, processor):
|
|
"""Test that processing respects concurrency limit."""
|
|
# Set max concurrent to 1 for this test
|
|
processor.max_concurrent = 1
|
|
|
|
# Mock extraction service with multiple pending extractions
|
|
mock_service = Mock()
|
|
mock_service.get_pending_extractions = AsyncMock(
|
|
return_value=[
|
|
{"id": 100, "status": "pending"},
|
|
{"id": 101, "status": "pending"},
|
|
{"id": 102, "status": "pending"},
|
|
]
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"app.services.extraction_processor.AsyncSession"
|
|
) as mock_session_class,
|
|
patch.object(processor, "_process_single_extraction", new_callable=AsyncMock) as mock_process,
|
|
patch(
|
|
"app.services.extraction_processor.ExtractionService",
|
|
return_value=mock_service,
|
|
),
|
|
patch("asyncio.create_task") as mock_create_task,
|
|
):
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session_class.return_value.__aenter__.return_value = mock_session
|
|
|
|
# Mock task creation
|
|
mock_task = Mock()
|
|
mock_create_task.return_value = mock_task
|
|
|
|
await processor._process_pending_extractions()
|
|
|
|
# Should only have started one extraction (due to limit)
|
|
assert len(processor.running_extractions) == 1
|
|
assert mock_create_task.call_count == 1
|