- Created alembic.ini configuration file for Alembic migrations. - Added README file for Alembic with a brief description. - Implemented env.py for Alembic to manage database migrations. - Created script.py.mako template for migration scripts. - Added initial migration script to create database tables. - Created a migration script to add initial plan and playlist data. - Updated database initialization to run Alembic migrations. - Enhanced credit service to automatically recharge user credits based on their plan. - Implemented delete_task method in scheduler service to remove scheduled tasks. - Updated scheduler API to reflect task deletion instead of cancellation. - Added CLI tool for managing database migrations. - Updated tests to cover new functionality for task deletion and credit recharge. - Updated pyproject.toml and lock files to include Alembic as a dependency.
187 lines
6.9 KiB
Python
187 lines
6.9 KiB
Python
"""Task execution handlers for different task types."""
|
|
|
|
from collections.abc import Callable
|
|
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from app.core.logging import get_logger
|
|
from app.models.scheduled_task import ScheduledTask, TaskType
|
|
from app.repositories.playlist import PlaylistRepository
|
|
from app.repositories.sound import SoundRepository
|
|
from app.services.credit import CreditService
|
|
from app.services.player import PlayerService
|
|
from app.services.vlc_player import VLCPlayerService
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class TaskExecutionError(Exception):
|
|
"""Exception raised when task execution fails."""
|
|
|
|
|
|
|
|
class TaskHandlerRegistry:
|
|
"""Registry for task execution handlers."""
|
|
|
|
def __init__(
|
|
self,
|
|
db_session: AsyncSession,
|
|
db_session_factory: Callable[[], AsyncSession],
|
|
credit_service: CreditService,
|
|
player_service: PlayerService,
|
|
) -> None:
|
|
"""Initialize the task handler registry."""
|
|
self.db_session = db_session
|
|
self.db_session_factory = db_session_factory
|
|
self.credit_service = credit_service
|
|
self.player_service = player_service
|
|
self.sound_repository = SoundRepository(db_session)
|
|
self.playlist_repository = PlaylistRepository(db_session)
|
|
|
|
# Register handlers
|
|
self._handlers = {
|
|
TaskType.CREDIT_RECHARGE: self._handle_credit_recharge,
|
|
TaskType.PLAY_SOUND: self._handle_play_sound,
|
|
TaskType.PLAY_PLAYLIST: self._handle_play_playlist,
|
|
}
|
|
|
|
async def execute_task(self, task: ScheduledTask) -> None:
|
|
"""Execute a task based on its type."""
|
|
handler = self._handlers.get(task.task_type)
|
|
if not handler:
|
|
msg = f"No handler registered for task type: {task.task_type}"
|
|
raise TaskExecutionError(msg)
|
|
|
|
logger.info(
|
|
"Executing task %s (%s): %s",
|
|
task.id,
|
|
task.task_type.value,
|
|
task.name,
|
|
)
|
|
|
|
try:
|
|
await handler(task)
|
|
logger.info("Task %s executed successfully", task.id)
|
|
except Exception as e:
|
|
logger.exception("Task %s execution failed", task.id)
|
|
msg = f"Task execution failed: {e!s}"
|
|
raise TaskExecutionError(msg) from e
|
|
|
|
async def _handle_credit_recharge(self, task: ScheduledTask) -> None:
|
|
"""Handle credit recharge task."""
|
|
parameters = task.parameters
|
|
user_id = parameters.get("user_id")
|
|
|
|
if user_id:
|
|
# Recharge specific user
|
|
try:
|
|
user_id_int = int(user_id)
|
|
except (ValueError, TypeError) as e:
|
|
msg = f"Invalid user_id format: {user_id}"
|
|
raise TaskExecutionError(msg) from e
|
|
|
|
transaction = await self.credit_service.recharge_user_credits_auto(user_id_int)
|
|
if transaction:
|
|
logger.info("Recharged credits for user %s: %s credits added", user_id, transaction.amount)
|
|
else:
|
|
logger.info("No credits added for user %s (already at maximum)", user_id)
|
|
else:
|
|
# Recharge all users (system task)
|
|
stats = await self.credit_service.recharge_all_users_credits()
|
|
logger.info("Recharged credits for all users: %s", stats)
|
|
|
|
async def _handle_play_sound(self, task: ScheduledTask) -> None:
|
|
"""Handle play sound task."""
|
|
parameters = task.parameters
|
|
sound_id = parameters.get("sound_id")
|
|
|
|
if not sound_id:
|
|
msg = "sound_id parameter is required for PLAY_SOUND tasks"
|
|
raise TaskExecutionError(msg)
|
|
|
|
try:
|
|
# Handle both integer and string sound IDs
|
|
sound_id_int = int(sound_id)
|
|
except (ValueError, TypeError) as e:
|
|
msg = f"Invalid sound_id format: {sound_id}"
|
|
raise TaskExecutionError(msg) from e
|
|
|
|
# Check if this is a user task (has user_id)
|
|
if task.user_id:
|
|
# User task: use credit-aware playback
|
|
|
|
vlc_service = VLCPlayerService(self.db_session_factory)
|
|
try:
|
|
result = await vlc_service.play_sound_with_credits(
|
|
sound_id_int, task.user_id,
|
|
)
|
|
logger.info(
|
|
(
|
|
"Played sound %s via scheduled task for user %s "
|
|
"(credits deducted: %s)"
|
|
),
|
|
result.get("sound_name", sound_id),
|
|
task.user_id,
|
|
result.get("credits_deducted", 0),
|
|
)
|
|
except Exception as e:
|
|
# Convert HTTP exceptions or credit errors to task execution errors
|
|
msg = f"Failed to play sound with credits: {e!s}"
|
|
raise TaskExecutionError(msg) from e
|
|
else:
|
|
# System task: play without credit deduction
|
|
sound = await self.sound_repository.get_by_id(sound_id_int)
|
|
if not sound:
|
|
msg = f"Sound not found: {sound_id}"
|
|
raise TaskExecutionError(msg)
|
|
|
|
|
|
vlc_service = VLCPlayerService(self.db_session_factory)
|
|
success = await vlc_service.play_sound(sound)
|
|
|
|
if not success:
|
|
msg = f"Failed to play sound {sound.filename}"
|
|
raise TaskExecutionError(msg)
|
|
|
|
logger.info("Played sound %s via scheduled system task", sound.filename)
|
|
|
|
async def _handle_play_playlist(self, task: ScheduledTask) -> None:
|
|
"""Handle play playlist task."""
|
|
parameters = task.parameters
|
|
playlist_id = parameters.get("playlist_id")
|
|
play_mode = parameters.get("play_mode", "continuous")
|
|
shuffle = parameters.get("shuffle", False)
|
|
|
|
if not playlist_id:
|
|
msg = "playlist_id parameter is required for PLAY_PLAYLIST tasks"
|
|
raise TaskExecutionError(msg)
|
|
|
|
try:
|
|
# Handle both integer and string playlist IDs
|
|
playlist_id_int = int(playlist_id)
|
|
except (ValueError, TypeError) as e:
|
|
msg = f"Invalid playlist_id format: {playlist_id}"
|
|
raise TaskExecutionError(msg) from e
|
|
|
|
# Get the playlist from database
|
|
playlist = await self.playlist_repository.get_by_id(playlist_id_int)
|
|
if not playlist:
|
|
msg = f"Playlist not found: {playlist_id}"
|
|
raise TaskExecutionError(msg)
|
|
|
|
# Load playlist in player
|
|
await self.player_service.load_playlist(playlist_id_int)
|
|
|
|
# Set play mode if specified
|
|
if play_mode in ["continuous", "loop", "loop_one", "random", "single"]:
|
|
await self.player_service.set_mode(play_mode)
|
|
|
|
# Enable shuffle if requested
|
|
if shuffle:
|
|
await self.player_service.set_shuffle(shuffle=True)
|
|
|
|
# Start playing
|
|
await self.player_service.play()
|
|
|
|
logger.info("Started playing playlist %s via scheduled task", playlist.name)
|