"""Tests for extraction background processor.""" from unittest.mock import AsyncMock, Mock, patch import pytest from app.services.extraction_processor import ExtractionProcessor class TestExtractionProcessor: """Test extraction background processor.""" @pytest.fixture def processor(self): """Create an extraction processor instance.""" # Use a custom processor instance to avoid affecting the global one return ExtractionProcessor() def test_init(self, processor): """Test processor initialization.""" assert processor.max_concurrent > 0 assert len(processor.running_extractions) == 0 assert processor.processing_lock is not None assert processor.shutdown_event is not None assert processor.processor_task is None @pytest.mark.asyncio async def test_start_and_stop(self, processor): """Test starting and stopping the processor.""" # Mock the _process_queue method to avoid actual processing with patch.object( processor, "_process_queue", new_callable=AsyncMock, ) as mock_process: # Start the processor await processor.start() assert processor.processor_task is not None assert not processor.processor_task.done() # Stop the processor await processor.stop() assert processor.processor_task.done() @pytest.mark.asyncio async def test_start_already_running(self, processor): """Test starting processor when already running.""" with patch.object(processor, "_process_queue", new_callable=AsyncMock): # Start first time await processor.start() first_task = processor.processor_task # Start second time (should not create new task) await processor.start() assert processor.processor_task is first_task # Clean up await processor.stop() @pytest.mark.asyncio async def test_queue_extraction(self, processor): """Test queuing an extraction.""" extraction_id = 123 await processor.queue_extraction(extraction_id) # The extraction should not be in running_extractions yet # (it gets added when actually started by the processor) assert extraction_id not in processor.running_extractions @pytest.mark.asyncio async def test_queue_extraction_already_running(self, processor): """Test queuing an extraction that's already running.""" extraction_id = 123 processor.running_extractions.add(extraction_id) await processor.queue_extraction(extraction_id) # Should still be in running extractions assert extraction_id in processor.running_extractions def test_get_status(self, processor): """Test getting processor status.""" status = processor.get_status() assert "running" in status assert "max_concurrent" in status assert "currently_processing" in status assert "processing_ids" in status assert "available_slots" in status assert status["max_concurrent"] == processor.max_concurrent assert status["currently_processing"] == 0 assert status["available_slots"] == processor.max_concurrent def test_get_status_with_running_extractions(self, processor): """Test getting processor status with running extractions.""" processor.running_extractions.add(123) processor.running_extractions.add(456) status = processor.get_status() assert status["currently_processing"] == 2 assert status["available_slots"] == processor.max_concurrent - 2 assert 123 in status["processing_ids"] assert 456 in status["processing_ids"] def test_on_extraction_completed(self, processor): """Test extraction completion callback.""" extraction_id = 123 processor.running_extractions.add(extraction_id) # Create a mock completed task mock_task = Mock() mock_task.exception.return_value = None processor._on_extraction_completed(extraction_id, mock_task) # Should be removed from running extractions assert extraction_id not in processor.running_extractions def test_on_extraction_completed_with_exception(self, processor): """Test extraction completion callback with exception.""" extraction_id = 123 processor.running_extractions.add(extraction_id) # Create a mock task with exception mock_task = Mock() mock_task.exception.return_value = Exception("Test error") processor._on_extraction_completed(extraction_id, mock_task) # Should still be removed from running extractions assert extraction_id not in processor.running_extractions @pytest.mark.asyncio async def test_process_single_extraction_success(self, processor): """Test processing a single extraction successfully.""" extraction_id = 123 # Mock the extraction service mock_service = Mock() mock_service.process_extraction = AsyncMock( return_value={"status": "completed", "id": extraction_id}, ) with ( patch( "app.services.extraction_processor.AsyncSession", ) as mock_session_class, patch( "app.services.extraction_processor.ExtractionService", return_value=mock_service, ), ): mock_session = AsyncMock() mock_session_class.return_value.__aenter__.return_value = mock_session await processor._process_single_extraction(extraction_id) mock_service.process_extraction.assert_called_once_with(extraction_id) @pytest.mark.asyncio async def test_process_single_extraction_failure(self, processor): """Test processing a single extraction with failure.""" extraction_id = 123 # Mock the extraction service to raise an exception mock_service = Mock() mock_service.process_extraction = AsyncMock(side_effect=Exception("Test error")) with ( patch( "app.services.extraction_processor.AsyncSession", ) as mock_session_class, patch( "app.services.extraction_processor.ExtractionService", return_value=mock_service, ), ): mock_session = AsyncMock() mock_session_class.return_value.__aenter__.return_value = mock_session # Should not raise exception (errors are logged) await processor._process_single_extraction(extraction_id) mock_service.process_extraction.assert_called_once_with(extraction_id) @pytest.mark.asyncio async def test_process_pending_extractions_no_slots(self, processor): """Test processing when no slots are available.""" # Fill all slots for i in range(processor.max_concurrent): processor.running_extractions.add(i) # Mock extraction service mock_service = Mock() mock_service.get_pending_extractions = AsyncMock( return_value=[{"id": 100, "status": "pending"}], ) with ( patch( "app.services.extraction_processor.AsyncSession", ) as mock_session_class, patch( "app.services.extraction_processor.ExtractionService", return_value=mock_service, ), ): mock_session = AsyncMock() mock_session_class.return_value.__aenter__.return_value = mock_session await processor._process_pending_extractions() # Should not have started any new extractions assert 100 not in processor.running_extractions @pytest.mark.asyncio async def test_process_pending_extractions_with_slots(self, processor): """Test processing when slots are available.""" # Mock extraction service mock_service = Mock() mock_service.get_pending_extractions = AsyncMock( return_value=[ {"id": 100, "status": "pending"}, {"id": 101, "status": "pending"}, ], ) with ( patch( "app.services.extraction_processor.AsyncSession", ) as mock_session_class, patch.object( processor, "_process_single_extraction", new_callable=AsyncMock, ) as mock_process, patch( "app.services.extraction_processor.ExtractionService", return_value=mock_service, ), patch("asyncio.create_task") as mock_create_task, ): mock_session = AsyncMock() mock_session_class.return_value.__aenter__.return_value = mock_session # Mock task creation mock_task = Mock() mock_create_task.return_value = mock_task await processor._process_pending_extractions() # Should have added extractions to running set assert 100 in processor.running_extractions assert 101 in processor.running_extractions # Should have created tasks for both assert mock_create_task.call_count == 2 @pytest.mark.asyncio async def test_process_pending_extractions_respect_limit(self, processor): """Test that processing respects concurrency limit.""" # Set max concurrent to 1 for this test processor.max_concurrent = 1 # Mock extraction service with multiple pending extractions mock_service = Mock() mock_service.get_pending_extractions = AsyncMock( return_value=[ {"id": 100, "status": "pending"}, {"id": 101, "status": "pending"}, {"id": 102, "status": "pending"}, ], ) with ( patch( "app.services.extraction_processor.AsyncSession", ) as mock_session_class, patch.object( processor, "_process_single_extraction", new_callable=AsyncMock, ) as mock_process, patch( "app.services.extraction_processor.ExtractionService", return_value=mock_service, ), patch("asyncio.create_task") as mock_create_task, ): mock_session = AsyncMock() mock_session_class.return_value.__aenter__.return_value = mock_session # Mock task creation mock_task = Mock() mock_create_task.return_value = mock_task await processor._process_pending_extractions() # Should only have started one extraction (due to limit) assert len(processor.running_extractions) == 1 assert mock_create_task.call_count == 1