- Implemented tests for ScheduledTaskRepository covering task creation, retrieval, filtering, and status updates. - Developed tests for SchedulerService including task creation, cancellation, user task retrieval, and maintenance jobs. - Created tests for TaskHandlerRegistry to validate task execution for various types, including credit recharge and sound playback. - Ensured proper error handling and edge cases in task execution scenarios. - Added fixtures and mocks to facilitate isolated testing of services and repositories.
40 lines
1.4 KiB
Python
40 lines
1.4 KiB
Python
#!/usr/bin/env python3
|
|
"""Test creating a task via the scheduler service to simulate API call."""
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.core.database import get_session_factory
|
|
from app.main import get_global_scheduler_service
|
|
from app.models.scheduled_task import TaskType, RecurrenceType
|
|
|
|
async def test_api_task_creation():
|
|
"""Test creating a task through the scheduler service (simulates API call)."""
|
|
try:
|
|
scheduler_service = get_global_scheduler_service()
|
|
|
|
# Create a task for 2 minutes from now
|
|
future_time = datetime.utcnow() + timedelta(minutes=2)
|
|
|
|
print(f"Creating task scheduled for: {future_time}")
|
|
print(f"Current time: {datetime.utcnow()}")
|
|
|
|
task = await scheduler_service.create_task(
|
|
name=f"API Test Task {future_time.strftime('%H:%M:%S')}",
|
|
task_type=TaskType.PLAY_SOUND,
|
|
scheduled_at=future_time,
|
|
parameters={"sound_id": 1},
|
|
user_id=1,
|
|
timezone="UTC",
|
|
recurrence_type=RecurrenceType.NONE,
|
|
)
|
|
|
|
print(f"Created task: {task.name} (ID: {task.id})")
|
|
print(f"Task will execute in: {future_time - datetime.utcnow()}")
|
|
print("Task should be automatically scheduled in APScheduler!")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_api_task_creation()) |