Files
sdb2-backend/app/api/v1/scheduler.py
JSC 83239cb4fa Add Alembic for database migrations and initial migration scripts
- Created alembic.ini configuration file for Alembic migrations.
- Added README file for Alembic with a brief description.
- Implemented env.py for Alembic to manage database migrations.
- Created script.py.mako template for migration scripts.
- Added initial migration script to create database tables.
- Created a migration script to add initial plan and playlist data.
- Updated database initialization to run Alembic migrations.
- Enhanced credit service to automatically recharge user credits based on their plan.
- Implemented delete_task method in scheduler service to remove scheduled tasks.
- Updated scheduler API to reflect task deletion instead of cancellation.
- Added CLI tool for managing database migrations.
- Updated tests to cover new functionality for task deletion and credit recharge.
- Updated pyproject.toml and lock files to include Alembic as a dependency.
2025-09-16 13:45:14 +02:00

231 lines
7.6 KiB
Python

"""API endpoints for scheduled task management."""
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.database import get_db
from app.core.dependencies import (
get_admin_user,
get_current_active_user,
)
from app.core.services import get_global_scheduler_service
from app.models.scheduled_task import ScheduledTask, TaskStatus, TaskType
from app.models.user import User
from app.repositories.scheduled_task import ScheduledTaskRepository
from app.schemas.scheduler import (
ScheduledTaskCreate,
ScheduledTaskResponse,
ScheduledTaskUpdate,
TaskFilterParams,
)
from app.services.scheduler import SchedulerService
router = APIRouter(prefix="/scheduler")
def get_scheduler_service() -> SchedulerService:
"""Get the global scheduler service instance."""
return get_global_scheduler_service()
def get_task_filters(
status: Annotated[
TaskStatus | None, Query(description="Filter by task status"),
] = None,
task_type: Annotated[
TaskType | None, Query(description="Filter by task type"),
] = None,
limit: Annotated[int, Query(description="Maximum number of tasks to return")] = 50,
offset: Annotated[int, Query(description="Number of tasks to skip")] = 0,
) -> TaskFilterParams:
"""Create task filter parameters from query parameters."""
return TaskFilterParams(
status=status,
task_type=task_type,
limit=limit,
offset=offset,
)
@router.post("/tasks", response_model=ScheduledTaskResponse)
async def create_task(
task_data: ScheduledTaskCreate,
current_user: Annotated[User, Depends(get_current_active_user)],
scheduler_service: Annotated[SchedulerService, Depends(get_scheduler_service)],
) -> ScheduledTask:
"""Create a new scheduled task."""
try:
return await scheduler_service.create_task(
task_data=task_data,
user_id=current_user.id,
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e
@router.get("/tasks", response_model=list[ScheduledTaskResponse])
async def get_user_tasks(
filters: Annotated[TaskFilterParams, Depends(get_task_filters)],
current_user: Annotated[User, Depends(get_current_active_user)],
scheduler_service: Annotated[SchedulerService, Depends(get_scheduler_service)],
) -> list[ScheduledTask]:
"""Get user's scheduled tasks."""
return await scheduler_service.get_user_tasks(
user_id=current_user.id,
status=filters.status,
task_type=filters.task_type,
limit=filters.limit,
offset=filters.offset,
)
@router.get("/tasks/{task_id}", response_model=ScheduledTaskResponse)
async def get_task(
task_id: int,
current_user: Annotated[User, Depends(get_current_active_user)] = ...,
db_session: Annotated[AsyncSession, Depends(get_db)] = ...,
) -> ScheduledTask:
"""Get a specific scheduled task."""
repo = ScheduledTaskRepository(db_session)
task = await repo.get_by_id(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
# Check if user owns the task or is admin
if task.user_id != current_user.id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Access denied")
return task
@router.patch("/tasks/{task_id}", response_model=ScheduledTaskResponse)
async def update_task(
task_id: int,
task_update: ScheduledTaskUpdate,
current_user: Annotated[User, Depends(get_current_active_user)] = ...,
db_session: Annotated[AsyncSession, Depends(get_db)] = ...,
) -> ScheduledTask:
"""Update a scheduled task."""
repo = ScheduledTaskRepository(db_session)
task = await repo.get_by_id(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
# Check if user owns the task or is admin
if task.user_id != current_user.id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Access denied")
# Update task fields
update_data = task_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value)
return await repo.update(task)
@router.delete("/tasks/{task_id}")
async def delete_task(
task_id: int,
current_user: Annotated[User, Depends(get_current_active_user)] = ...,
scheduler_service: Annotated[
SchedulerService, Depends(get_scheduler_service),
] = ...,
db_session: Annotated[AsyncSession, Depends(get_db)] = ...,
) -> dict:
"""Delete a scheduled task completely."""
repo = ScheduledTaskRepository(db_session)
task = await repo.get_by_id(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
# Check if user owns the task or is admin
if task.user_id != current_user.id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Access denied")
success = await scheduler_service.delete_task(task_id)
if not success:
raise HTTPException(status_code=400, detail="Failed to delete task")
return {"message": "Task deleted successfully"}
# Admin-only endpoints
@router.get("/admin/tasks", response_model=list[ScheduledTaskResponse])
async def get_all_tasks(
status: Annotated[
TaskStatus | None, Query(description="Filter by task status"),
] = None,
task_type: Annotated[
TaskType | None, Query(description="Filter by task type"),
] = None,
limit: Annotated[
int | None, Query(description="Maximum number of tasks to return"),
] = 100,
offset: Annotated[
int | None, Query(description="Number of tasks to skip"),
] = 0,
_: Annotated[User, Depends(get_admin_user)] = ...,
db_session: Annotated[AsyncSession, Depends(get_db)] = ...,
) -> list[ScheduledTask]:
"""Get all scheduled tasks (admin only)."""
# Build query with pagination and filtering
statement = select(ScheduledTask)
if status:
statement = statement.where(ScheduledTask.status == status)
if task_type:
statement = statement.where(ScheduledTask.task_type == task_type)
statement = statement.order_by(ScheduledTask.scheduled_at.desc())
if offset:
statement = statement.offset(offset)
if limit:
statement = statement.limit(limit)
result = await db_session.exec(statement)
return list(result.all())
@router.get("/admin/system-tasks", response_model=list[ScheduledTaskResponse])
async def get_system_tasks(
status: Annotated[
TaskStatus | None, Query(description="Filter by task status"),
] = None,
task_type: Annotated[
TaskType | None, Query(description="Filter by task type"),
] = None,
_: Annotated[User, Depends(get_admin_user)] = ...,
db_session: Annotated[AsyncSession, Depends(get_db)] = ...,
) -> list[ScheduledTask]:
"""Get system tasks (admin only)."""
repo = ScheduledTaskRepository(db_session)
return await repo.get_system_tasks(status=status, task_type=task_type)
@router.post("/admin/system-tasks", response_model=ScheduledTaskResponse)
async def create_system_task(
task_data: ScheduledTaskCreate,
_: Annotated[User, Depends(get_admin_user)] = ...,
scheduler_service: Annotated[
SchedulerService, Depends(get_scheduler_service),
] = ...,
) -> ScheduledTask:
"""Create a system task (admin only)."""
try:
return await scheduler_service.create_task(
task_data=task_data,
user_id=None, # System task
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e