Files
sdb2-backend/app/models/scheduled_task.py

126 lines
3.8 KiB
Python

"""Scheduled task model for flexible task scheduling with timezone support."""
from datetime import UTC, datetime
from enum import Enum
from typing import Any
from sqlmodel import JSON, Column, Field
from app.models.base import BaseModel
class TaskType(str, Enum):
"""Available task types."""
CREDIT_RECHARGE = "credit_recharge"
PLAY_SOUND = "play_sound"
PLAY_PLAYLIST = "play_playlist"
class TaskStatus(str, Enum):
"""Task execution status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class RecurrenceType(str, Enum):
"""Recurrence patterns."""
NONE = "none" # One-shot task
MINUTELY = "minutely"
HOURLY = "hourly"
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
YEARLY = "yearly"
CRON = "cron" # Custom cron expression
class ScheduledTask(BaseModel, table=True):
"""Model for scheduled tasks with timezone support."""
__tablename__ = "scheduled_task"
id: int | None = Field(primary_key=True, default=None)
name: str = Field(max_length=255, description="Human-readable task name")
task_type: TaskType = Field(description="Type of task to execute")
status: TaskStatus = Field(default=TaskStatus.PENDING)
# Scheduling fields with timezone support
scheduled_at: datetime = Field(description="When the task should be executed (UTC)")
timezone: str = Field(
default="UTC",
description="Timezone for scheduling (e.g., 'America/New_York')",
)
recurrence_type: RecurrenceType = Field(default=RecurrenceType.NONE)
cron_expression: str | None = Field(
default=None,
description="Cron expression for custom recurrence",
)
recurrence_count: int | None = Field(
default=None,
description="Number of times to repeat (None for infinite)",
)
executions_count: int = Field(default=0, description="Number of times executed")
# Task parameters
parameters: dict[str, Any] = Field(
default_factory=dict,
sa_column=Column(JSON),
description="Task-specific parameters",
)
# User association (None for system tasks)
user_id: int | None = Field(
default=None,
foreign_key="user.id",
description="User who created the task (None for system tasks)",
)
# Execution tracking
last_executed_at: datetime | None = Field(
default=None,
description="When the task was last executed (UTC)",
)
next_execution_at: datetime | None = Field(
default=None,
description="When the task should be executed next (UTC, for recurring tasks)",
)
error_message: str | None = Field(
default=None,
description="Error message if execution failed",
)
# Task lifecycle
is_active: bool = Field(default=True, description="Whether the task is active")
expires_at: datetime | None = Field(
default=None,
description="When the task expires (UTC, optional)",
)
def is_expired(self) -> bool:
"""Check if the task has expired."""
if self.expires_at is None:
return False
return datetime.now(tz=UTC).replace(tzinfo=None) > self.expires_at
def is_recurring(self) -> bool:
"""Check if the task is recurring."""
return self.recurrence_type != RecurrenceType.NONE
def should_repeat(self) -> bool:
"""Check if the task should be repeated."""
if not self.is_recurring():
return False
if self.recurrence_count is None:
return True
return self.executions_count < self.recurrence_count
def is_system_task(self) -> bool:
"""Check if this is a system task (no user association)."""
return self.user_id is None