feat: Add MINUTELY recurrence type and enhance scheduler handling
This commit is contained in:
@@ -31,6 +31,7 @@ class RecurrenceType(str, Enum):
|
||||
"""Recurrence patterns."""
|
||||
|
||||
NONE = "none" # One-shot task
|
||||
MINUTELY = "minutely"
|
||||
HOURLY = "hourly"
|
||||
DAILY = "daily"
|
||||
WEEKLY = "weekly"
|
||||
|
||||
@@ -267,6 +267,7 @@ class SchedulerService:
|
||||
|
||||
# Handle interval-based recurrence types
|
||||
interval_configs = {
|
||||
RecurrenceType.MINUTELY: {"minutes": 1},
|
||||
RecurrenceType.HOURLY: {"hours": 1},
|
||||
RecurrenceType.DAILY: {"days": 1},
|
||||
RecurrenceType.WEEKLY: {"weeks": 1},
|
||||
@@ -301,6 +302,8 @@ class SchedulerService:
|
||||
"""Execute a scheduled task."""
|
||||
task_id_str = str(task_id)
|
||||
|
||||
logger.info("APScheduler triggered task %s execution", task_id)
|
||||
|
||||
# Prevent concurrent execution of the same task
|
||||
if task_id_str in self._running_tasks:
|
||||
logger.warning("Task %s is already running, skipping execution", task_id)
|
||||
@@ -318,9 +321,17 @@ class SchedulerService:
|
||||
logger.warning("Task %s not found", task_id)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Task %s current state - status: %s, is_active: %s, executions: %s",
|
||||
task_id, task.status, task.is_active, task.executions_count,
|
||||
)
|
||||
|
||||
# Check if task is still active and pending
|
||||
if not task.is_active or task.status != TaskStatus.PENDING:
|
||||
logger.info("Task %s not active or not pending, skipping", task_id)
|
||||
logger.warning(
|
||||
"Task %s execution skipped - is_active: %s, status: %s (should be %s)",
|
||||
task_id, task.is_active, task.status, TaskStatus.PENDING,
|
||||
)
|
||||
return
|
||||
|
||||
# Check if task has expired
|
||||
@@ -333,6 +344,9 @@ class SchedulerService:
|
||||
return
|
||||
|
||||
# Mark task as running
|
||||
logger.info(
|
||||
"Task %s starting execution (type: %s)", task_id, task.recurrence_type,
|
||||
)
|
||||
await repo.mark_as_running(task)
|
||||
|
||||
# Execute the task
|
||||
@@ -345,19 +359,37 @@ class SchedulerService:
|
||||
)
|
||||
await handler_registry.execute_task(task)
|
||||
|
||||
# Calculate next execution time for recurring tasks
|
||||
next_execution_at = None
|
||||
if task.should_repeat():
|
||||
next_execution_at = self._calculate_next_execution(task)
|
||||
# Handle completion based on task type
|
||||
if task.recurrence_type == RecurrenceType.CRON:
|
||||
# For CRON tasks, update execution metadata but keep PENDING
|
||||
# APScheduler handles the recurring schedule automatically
|
||||
logger.info(
|
||||
"Task %s (CRON) executed successfully, updating metadata", task_id,
|
||||
)
|
||||
task.last_executed_at = datetime.now(tz=UTC)
|
||||
task.executions_count += 1
|
||||
task.error_message = None
|
||||
task.status = TaskStatus.PENDING # Explicitly set to PENDING
|
||||
session.add(task)
|
||||
await session.commit()
|
||||
logger.info(
|
||||
"Task %s (CRON) metadata updated, status: %s, executions: %s",
|
||||
task_id, task.status, task.executions_count,
|
||||
)
|
||||
else:
|
||||
# For non-CRON recurring tasks, calculate next execution
|
||||
next_execution_at = None
|
||||
if task.should_repeat():
|
||||
next_execution_at = self._calculate_next_execution(task)
|
||||
|
||||
# Mark as completed
|
||||
await repo.mark_as_completed(task, next_execution_at)
|
||||
# Mark as completed
|
||||
await repo.mark_as_completed(task, next_execution_at)
|
||||
|
||||
# Reschedule if recurring
|
||||
if next_execution_at and task.should_repeat():
|
||||
# Refresh task to get updated data
|
||||
await session.refresh(task)
|
||||
await self._schedule_apscheduler_job(task)
|
||||
# Reschedule if recurring
|
||||
if next_execution_at and task.should_repeat():
|
||||
# Refresh task to get updated data
|
||||
await session.refresh(task)
|
||||
await self._schedule_apscheduler_job(task)
|
||||
|
||||
except Exception as e:
|
||||
await repo.mark_as_failed(task, str(e))
|
||||
@@ -370,17 +402,21 @@ class SchedulerService:
|
||||
"""Calculate the next execution time for a recurring task."""
|
||||
now = datetime.now(tz=UTC)
|
||||
|
||||
if task.recurrence_type == RecurrenceType.HOURLY:
|
||||
return now + timedelta(hours=1)
|
||||
if task.recurrence_type == RecurrenceType.DAILY:
|
||||
return now + timedelta(days=1)
|
||||
if task.recurrence_type == RecurrenceType.WEEKLY:
|
||||
return now + timedelta(weeks=1)
|
||||
if task.recurrence_type == RecurrenceType.MONTHLY:
|
||||
# Add approximately one month
|
||||
return now + timedelta(days=30)
|
||||
if task.recurrence_type == RecurrenceType.YEARLY:
|
||||
return now + timedelta(days=365)
|
||||
recurrence_deltas = {
|
||||
RecurrenceType.MINUTELY: timedelta(minutes=1),
|
||||
RecurrenceType.HOURLY: timedelta(hours=1),
|
||||
RecurrenceType.DAILY: timedelta(days=1),
|
||||
RecurrenceType.WEEKLY: timedelta(weeks=1),
|
||||
RecurrenceType.MONTHLY: timedelta(days=30), # Approximate
|
||||
RecurrenceType.YEARLY: timedelta(days=365), # Approximate
|
||||
}
|
||||
|
||||
if task.recurrence_type in recurrence_deltas:
|
||||
return now + recurrence_deltas[task.recurrence_type]
|
||||
|
||||
if task.recurrence_type == RecurrenceType.CRON and task.cron_expression:
|
||||
# For CRON tasks, let APScheduler handle the timing
|
||||
return now
|
||||
|
||||
return None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user