Add Alembic for database migrations and initial migration scripts
- Created alembic.ini configuration file for Alembic migrations. - Added README file for Alembic with a brief description. - Implemented env.py for Alembic to manage database migrations. - Created script.py.mako template for migration scripts. - Added initial migration script to create database tables. - Created a migration script to add initial plan and playlist data. - Updated database initialization to run Alembic migrations. - Enhanced credit service to automatically recharge user credits based on their plan. - Implemented delete_task method in scheduler service to remove scheduled tasks. - Updated scheduler API to reflect task deletion instead of cancellation. - Added CLI tool for managing database migrations. - Updated tests to cover new functionality for task deletion and credit recharge. - Updated pyproject.toml and lock files to include Alembic as a dependency.
This commit is contained in:
@@ -129,7 +129,7 @@ async def update_task(
|
||||
|
||||
|
||||
@router.delete("/tasks/{task_id}")
|
||||
async def cancel_task(
|
||||
async def delete_task(
|
||||
task_id: int,
|
||||
current_user: Annotated[User, Depends(get_current_active_user)] = ...,
|
||||
scheduler_service: Annotated[
|
||||
@@ -137,7 +137,7 @@ async def cancel_task(
|
||||
] = ...,
|
||||
db_session: Annotated[AsyncSession, Depends(get_db)] = ...,
|
||||
) -> dict:
|
||||
"""Cancel a scheduled task."""
|
||||
"""Delete a scheduled task completely."""
|
||||
repo = ScheduledTaskRepository(db_session)
|
||||
task = await repo.get_by_id(task_id)
|
||||
|
||||
@@ -148,11 +148,11 @@ async def cancel_task(
|
||||
if task.user_id != current_user.id and not current_user.is_admin:
|
||||
raise HTTPException(status_code=403, detail="Access denied")
|
||||
|
||||
success = await scheduler_service.cancel_task(task_id)
|
||||
success = await scheduler_service.delete_task(task_id)
|
||||
if not success:
|
||||
raise HTTPException(status_code=400, detail="Failed to cancel task")
|
||||
raise HTTPException(status_code=400, detail="Failed to delete task")
|
||||
|
||||
return {"message": "Task cancelled successfully"}
|
||||
return {"message": "Task deleted successfully"}
|
||||
|
||||
|
||||
# Admin-only endpoints
|
||||
|
||||
@@ -8,7 +8,6 @@ from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
import app.models # noqa: F401
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
from app.core.seeds import seed_all_data
|
||||
|
||||
engine: AsyncEngine = create_async_engine(
|
||||
settings.DATABASE_URL,
|
||||
@@ -40,26 +39,23 @@ def get_session_factory() -> Callable[[], AsyncSession]:
|
||||
|
||||
|
||||
async def init_db() -> None:
|
||||
"""Initialize the database and create tables if they do not exist."""
|
||||
"""Initialize the database using Alembic migrations."""
|
||||
logger = get_logger(__name__)
|
||||
try:
|
||||
logger.info("Initializing database tables")
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(SQLModel.metadata.create_all)
|
||||
logger.info("Database tables created successfully")
|
||||
logger.info("Running database migrations")
|
||||
# Run Alembic migrations programmatically
|
||||
from alembic import command
|
||||
from alembic.config import Config
|
||||
|
||||
# Seed initial data
|
||||
await seed_initial_data()
|
||||
# Get the alembic config
|
||||
alembic_cfg = Config("alembic.ini")
|
||||
|
||||
# Run migrations to the latest revision
|
||||
command.upgrade(alembic_cfg, "head")
|
||||
logger.info("Database migrations completed successfully")
|
||||
|
||||
except Exception:
|
||||
logger.exception("Failed to initialize database")
|
||||
raise
|
||||
|
||||
|
||||
async def seed_initial_data() -> None:
|
||||
"""Seed initial data into the database."""
|
||||
logger = get_logger(__name__)
|
||||
logger.info("Starting initial data seeding")
|
||||
|
||||
async with AsyncSession(engine) as session:
|
||||
await seed_all_data(session)
|
||||
|
||||
@@ -403,6 +403,44 @@ class CreditService:
|
||||
finally:
|
||||
await session.close()
|
||||
|
||||
async def recharge_user_credits_auto(
|
||||
self,
|
||||
user_id: int,
|
||||
) -> CreditTransaction | None:
|
||||
"""Recharge credits for a user automatically based on their plan.
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
|
||||
Returns:
|
||||
The created credit transaction if credits were added, None if no recharge
|
||||
needed
|
||||
|
||||
Raises:
|
||||
ValueError: If user not found or has no plan
|
||||
|
||||
"""
|
||||
session = self.db_session_factory()
|
||||
try:
|
||||
user_repo = UserRepository(session)
|
||||
user = await user_repo.get_by_id_with_plan(user_id)
|
||||
if not user:
|
||||
msg = f"User {user_id} not found"
|
||||
raise ValueError(msg)
|
||||
|
||||
if not user.plan:
|
||||
msg = f"User {user_id} has no plan assigned"
|
||||
raise ValueError(msg)
|
||||
|
||||
# Call the main method with plan details
|
||||
return await self.recharge_user_credits(
|
||||
user_id,
|
||||
user.plan.credits,
|
||||
user.plan.max_credits,
|
||||
)
|
||||
finally:
|
||||
await session.close()
|
||||
|
||||
async def recharge_user_credits(
|
||||
self,
|
||||
user_id: int,
|
||||
@@ -556,7 +594,13 @@ class CreditService:
|
||||
|
||||
if transaction:
|
||||
stats["recharged_users"] += 1
|
||||
stats["total_credits_added"] += transaction.amount
|
||||
# Calculate the amount from plan data to avoid session issues
|
||||
current_credits = user.credits
|
||||
plan_credits = user.plan.credits
|
||||
max_credits = user.plan.max_credits
|
||||
target_credits = min(current_credits + plan_credits, max_credits)
|
||||
credits_added = target_credits - current_credits
|
||||
stats["total_credits_added"] += credits_added
|
||||
else:
|
||||
stats["skipped_users"] += 1
|
||||
|
||||
|
||||
@@ -144,6 +144,25 @@ class SchedulerService:
|
||||
logger.info("Cancelled task: %s (%s)", task.name, task_id)
|
||||
return True
|
||||
|
||||
async def delete_task(self, task_id: int) -> bool:
|
||||
"""Delete a scheduled task completely."""
|
||||
async with self.db_session_factory() as session:
|
||||
repo = ScheduledTaskRepository(session)
|
||||
|
||||
task = await repo.get_by_id(task_id)
|
||||
if not task:
|
||||
return False
|
||||
|
||||
# Remove from APScheduler first (job might not exist in scheduler)
|
||||
with suppress(Exception):
|
||||
self.scheduler.remove_job(str(task_id))
|
||||
|
||||
# Delete from database
|
||||
await repo.delete(task)
|
||||
|
||||
logger.info("Deleted task: %s (%s)", task.name, task_id)
|
||||
return True
|
||||
|
||||
async def get_user_tasks(
|
||||
self,
|
||||
user_id: int,
|
||||
|
||||
@@ -80,8 +80,11 @@ class TaskHandlerRegistry:
|
||||
msg = f"Invalid user_id format: {user_id}"
|
||||
raise TaskExecutionError(msg) from e
|
||||
|
||||
stats = await self.credit_service.recharge_user_credits(user_id_int)
|
||||
logger.info("Recharged credits for user %s: %s", user_id, stats)
|
||||
transaction = await self.credit_service.recharge_user_credits_auto(user_id_int)
|
||||
if transaction:
|
||||
logger.info("Recharged credits for user %s: %s credits added", user_id, transaction.amount)
|
||||
else:
|
||||
logger.info("No credits added for user %s (already at maximum)", user_id)
|
||||
else:
|
||||
# Recharge all users (system task)
|
||||
stats = await self.credit_service.recharge_all_users_credits()
|
||||
|
||||
Reference in New Issue
Block a user