Add comprehensive tests for scheduled task repository, scheduler service, and task handlers
- Implemented tests for ScheduledTaskRepository covering task creation, retrieval, filtering, and status updates. - Developed tests for SchedulerService including task creation, cancellation, user task retrieval, and maintenance jobs. - Created tests for TaskHandlerRegistry to validate task execution for various types, including credit recharge and sound playback. - Ensured proper error handling and edge cases in task execution scenarios. - Added fixtures and mocks to facilitate isolated testing of services and repositories.
This commit is contained in:
40
test_api_task.py
Normal file
40
test_api_task.py
Normal file
@@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Test creating a task via the scheduler service to simulate API call."""
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from app.core.database import get_session_factory
|
||||
from app.main import get_global_scheduler_service
|
||||
from app.models.scheduled_task import TaskType, RecurrenceType
|
||||
|
||||
async def test_api_task_creation():
|
||||
"""Test creating a task through the scheduler service (simulates API call)."""
|
||||
try:
|
||||
scheduler_service = get_global_scheduler_service()
|
||||
|
||||
# Create a task for 2 minutes from now
|
||||
future_time = datetime.utcnow() + timedelta(minutes=2)
|
||||
|
||||
print(f"Creating task scheduled for: {future_time}")
|
||||
print(f"Current time: {datetime.utcnow()}")
|
||||
|
||||
task = await scheduler_service.create_task(
|
||||
name=f"API Test Task {future_time.strftime('%H:%M:%S')}",
|
||||
task_type=TaskType.PLAY_SOUND,
|
||||
scheduled_at=future_time,
|
||||
parameters={"sound_id": 1},
|
||||
user_id=1,
|
||||
timezone="UTC",
|
||||
recurrence_type=RecurrenceType.NONE,
|
||||
)
|
||||
|
||||
print(f"Created task: {task.name} (ID: {task.id})")
|
||||
print(f"Task will execute in: {future_time - datetime.utcnow()}")
|
||||
print("Task should be automatically scheduled in APScheduler!")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(test_api_task_creation())
|
||||
Reference in New Issue
Block a user