- Updated type hints from List/Optional to list/None for better readability and consistency across the codebase. - Refactored import statements for better organization and clarity. - Enhanced the ScheduledTaskBase schema to use modern type hints. - Cleaned up unnecessary comments and whitespace in various files. - Improved error handling and logging in task execution handlers. - Updated test cases to reflect changes in type hints and ensure compatibility with the new structure.
41 lines
1.3 KiB
Python
41 lines
1.3 KiB
Python
#!/usr/bin/env python3
|
|
"""Test creating a task via the scheduler service to simulate API call."""
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.main import get_global_scheduler_service
|
|
from app.models.scheduled_task import RecurrenceType, TaskType
|
|
|
|
|
|
async def test_api_task_creation():
|
|
"""Test creating a task through the scheduler service (simulates API call)."""
|
|
try:
|
|
scheduler_service = get_global_scheduler_service()
|
|
|
|
# Create a task for 2 minutes from now
|
|
future_time = datetime.utcnow() + timedelta(minutes=2)
|
|
|
|
print(f"Creating task scheduled for: {future_time}")
|
|
print(f"Current time: {datetime.utcnow()}")
|
|
|
|
task = await scheduler_service.create_task(
|
|
name=f"API Test Task {future_time.strftime('%H:%M:%S')}",
|
|
task_type=TaskType.PLAY_SOUND,
|
|
scheduled_at=future_time,
|
|
parameters={"sound_id": 1},
|
|
user_id=1,
|
|
timezone="UTC",
|
|
recurrence_type=RecurrenceType.NONE,
|
|
)
|
|
|
|
print(f"Created task: {task.name} (ID: {task.id})")
|
|
print(f"Task will execute in: {future_time - datetime.utcnow()}")
|
|
print("Task should be automatically scheduled in APScheduler!")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_api_task_creation())
|