From e8f979c137a2925c24e5d72f49ff52142ccd1567 Mon Sep 17 00:00:00 2001 From: JSC Date: Sat, 13 Sep 2025 23:44:20 +0200 Subject: [PATCH] feat: Add MINUTELY recurrence type and enhance scheduler handling --- app/models/scheduled_task.py | 1 + app/services/scheduler.py | 82 ++++++++++++++++++++++++++---------- 2 files changed, 60 insertions(+), 23 deletions(-) diff --git a/app/models/scheduled_task.py b/app/models/scheduled_task.py index 9639eb7..4f04dcb 100644 --- a/app/models/scheduled_task.py +++ b/app/models/scheduled_task.py @@ -31,6 +31,7 @@ class RecurrenceType(str, Enum): """Recurrence patterns.""" NONE = "none" # One-shot task + MINUTELY = "minutely" HOURLY = "hourly" DAILY = "daily" WEEKLY = "weekly" diff --git a/app/services/scheduler.py b/app/services/scheduler.py index e88ced8..49ca1aa 100644 --- a/app/services/scheduler.py +++ b/app/services/scheduler.py @@ -267,6 +267,7 @@ class SchedulerService: # Handle interval-based recurrence types interval_configs = { + RecurrenceType.MINUTELY: {"minutes": 1}, RecurrenceType.HOURLY: {"hours": 1}, RecurrenceType.DAILY: {"days": 1}, RecurrenceType.WEEKLY: {"weeks": 1}, @@ -301,6 +302,8 @@ class SchedulerService: """Execute a scheduled task.""" task_id_str = str(task_id) + logger.info("APScheduler triggered task %s execution", task_id) + # Prevent concurrent execution of the same task if task_id_str in self._running_tasks: logger.warning("Task %s is already running, skipping execution", task_id) @@ -318,9 +321,17 @@ class SchedulerService: logger.warning("Task %s not found", task_id) return + logger.info( + "Task %s current state - status: %s, is_active: %s, executions: %s", + task_id, task.status, task.is_active, task.executions_count, + ) + # Check if task is still active and pending if not task.is_active or task.status != TaskStatus.PENDING: - logger.info("Task %s not active or not pending, skipping", task_id) + logger.warning( + "Task %s execution skipped - is_active: %s, status: %s (should be %s)", + task_id, task.is_active, task.status, TaskStatus.PENDING, + ) return # Check if task has expired @@ -333,6 +344,9 @@ class SchedulerService: return # Mark task as running + logger.info( + "Task %s starting execution (type: %s)", task_id, task.recurrence_type, + ) await repo.mark_as_running(task) # Execute the task @@ -345,19 +359,37 @@ class SchedulerService: ) await handler_registry.execute_task(task) - # Calculate next execution time for recurring tasks - next_execution_at = None - if task.should_repeat(): - next_execution_at = self._calculate_next_execution(task) + # Handle completion based on task type + if task.recurrence_type == RecurrenceType.CRON: + # For CRON tasks, update execution metadata but keep PENDING + # APScheduler handles the recurring schedule automatically + logger.info( + "Task %s (CRON) executed successfully, updating metadata", task_id, + ) + task.last_executed_at = datetime.now(tz=UTC) + task.executions_count += 1 + task.error_message = None + task.status = TaskStatus.PENDING # Explicitly set to PENDING + session.add(task) + await session.commit() + logger.info( + "Task %s (CRON) metadata updated, status: %s, executions: %s", + task_id, task.status, task.executions_count, + ) + else: + # For non-CRON recurring tasks, calculate next execution + next_execution_at = None + if task.should_repeat(): + next_execution_at = self._calculate_next_execution(task) - # Mark as completed - await repo.mark_as_completed(task, next_execution_at) + # Mark as completed + await repo.mark_as_completed(task, next_execution_at) - # Reschedule if recurring - if next_execution_at and task.should_repeat(): - # Refresh task to get updated data - await session.refresh(task) - await self._schedule_apscheduler_job(task) + # Reschedule if recurring + if next_execution_at and task.should_repeat(): + # Refresh task to get updated data + await session.refresh(task) + await self._schedule_apscheduler_job(task) except Exception as e: await repo.mark_as_failed(task, str(e)) @@ -370,17 +402,21 @@ class SchedulerService: """Calculate the next execution time for a recurring task.""" now = datetime.now(tz=UTC) - if task.recurrence_type == RecurrenceType.HOURLY: - return now + timedelta(hours=1) - if task.recurrence_type == RecurrenceType.DAILY: - return now + timedelta(days=1) - if task.recurrence_type == RecurrenceType.WEEKLY: - return now + timedelta(weeks=1) - if task.recurrence_type == RecurrenceType.MONTHLY: - # Add approximately one month - return now + timedelta(days=30) - if task.recurrence_type == RecurrenceType.YEARLY: - return now + timedelta(days=365) + recurrence_deltas = { + RecurrenceType.MINUTELY: timedelta(minutes=1), + RecurrenceType.HOURLY: timedelta(hours=1), + RecurrenceType.DAILY: timedelta(days=1), + RecurrenceType.WEEKLY: timedelta(weeks=1), + RecurrenceType.MONTHLY: timedelta(days=30), # Approximate + RecurrenceType.YEARLY: timedelta(days=365), # Approximate + } + + if task.recurrence_type in recurrence_deltas: + return now + recurrence_deltas[task.recurrence_type] + + if task.recurrence_type == RecurrenceType.CRON and task.cron_expression: + # For CRON tasks, let APScheduler handle the timing + return now return None