From 1b597f40478f4a28d6d635cf7cdff5f8f5bc23ee Mon Sep 17 00:00:00 2001 From: JSC Date: Wed, 2 Jul 2025 13:39:17 +0200 Subject: [PATCH] feat(scheduler): implement scheduler service for background tasks and credit refills; add endpoints for admin control --- app/__init__.py | 11 +++ app/routes/main.py | 17 ++++ app/services/credit_service.py | 133 ++++++++++++++++++++++++++++++ app/services/scheduler_service.py | 107 ++++++++++++++++++++++++ pyproject.toml | 1 + uv.lock | 35 ++++++++ 6 files changed, 304 insertions(+) create mode 100644 app/services/credit_service.py create mode 100644 app/services/scheduler_service.py diff --git a/app/__init__.py b/app/__init__.py index 18bf292..d103bda 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -7,6 +7,7 @@ from flask_jwt_extended import JWTManager from app.database import init_db from app.services.auth_service import AuthService +from app.services.scheduler_service import scheduler_service # Global auth service instance auth_service = AuthService() @@ -60,10 +61,20 @@ def create_app(): # Initialize authentication service with app auth_service.init_app(app) + # Start scheduler for background tasks + scheduler_service.start() + # Register blueprints from app.routes import auth, main app.register_blueprint(main.bp, url_prefix="/api") app.register_blueprint(auth.bp, url_prefix="/api/auth") + # Shutdown scheduler when app is torn down + @app.teardown_appcontext + def shutdown_scheduler(exception): + """Stop scheduler when app context is torn down.""" + if exception: + scheduler_service.stop() + return app diff --git a/app/routes/main.py b/app/routes/main.py index c5f8aab..d525053 100644 --- a/app/routes/main.py +++ b/app/routes/main.py @@ -8,6 +8,7 @@ from app.services.decorators import ( require_credits, require_role, ) +from app.services.scheduler_service import scheduler_service bp = Blueprint("main", __name__) @@ -84,3 +85,19 @@ def expensive_operation() -> dict[str, str]: "user": user["email"], "operation_cost": 10, } + + +@bp.route("/admin/scheduler/status") +@require_auth +@require_role("admin") +def scheduler_status() -> dict: + """Get scheduler status (admin only).""" + return scheduler_service.get_scheduler_status() + + +@bp.route("/admin/credits/refill", methods=["POST"]) +@require_auth +@require_role("admin") +def manual_credit_refill() -> dict: + """Manually trigger credit refill for all users (admin only).""" + return scheduler_service.trigger_credit_refill_now() diff --git a/app/services/credit_service.py b/app/services/credit_service.py new file mode 100644 index 0000000..ee6bd69 --- /dev/null +++ b/app/services/credit_service.py @@ -0,0 +1,133 @@ +"""Credit management service for handling daily credit refills.""" + +import logging +from datetime import datetime + +from app.database import db +from app.models.user import User + +logger = logging.getLogger(__name__) + + +class CreditService: + """Service for managing user credits and daily refills.""" + + @staticmethod + def refill_all_users_credits() -> dict: + """ + Refill credits for all active users based on their plan. + + This function: + 1. Gets all active users + 2. For each user, adds their plan's daily credit amount + 3. Ensures credits never exceed the plan's max_credits limit + 4. Updates all users in a single database transaction + + Returns: + dict: Summary of the refill operation + """ + try: + # Get all active users with their plans + users = User.query.filter_by(is_active=True).all() + + if not users: + logger.info("No active users found for credit refill") + return { + "success": True, + "users_processed": 0, + "credits_added": 0, + "message": "No active users found" + } + + users_processed = 0 + total_credits_added = 0 + + for user in users: + if not user.plan: + logger.warning(f"User {user.email} has no plan assigned, skipping") + continue + + # Calculate new credit amount, capped at plan max + current_credits = user.credits or 0 + plan_daily_credits = user.plan.credits + max_credits = user.plan.max_credits + + # Add daily credits but don't exceed maximum + new_credits = min(current_credits + plan_daily_credits, max_credits) + credits_added = new_credits - current_credits + + if credits_added > 0: + user.credits = new_credits + user.updated_at = datetime.utcnow() + total_credits_added += credits_added + + logger.debug( + f"User {user.email}: {current_credits} -> {new_credits} " + f"(+{credits_added} credits, plan: {user.plan.code})" + ) + else: + logger.debug( + f"User {user.email}: Already at max credits " + f"({current_credits}/{max_credits})" + ) + + users_processed += 1 + + # Commit all changes in a single transaction + db.session.commit() + + logger.info( + f"Daily credit refill completed: {users_processed} users processed, " + f"{total_credits_added} total credits added" + ) + + return { + "success": True, + "users_processed": users_processed, + "credits_added": total_credits_added, + "message": f"Successfully refilled credits for {users_processed} users" + } + + except Exception as e: + # Rollback transaction on error + db.session.rollback() + logger.error(f"Error during daily credit refill: {str(e)}") + + return { + "success": False, + "users_processed": 0, + "credits_added": 0, + "error": str(e), + "message": "Credit refill failed" + } + + @staticmethod + def get_user_credit_info(user_id: int) -> dict: + """ + Get detailed credit information for a specific user. + + Args: + user_id: The user's ID + + Returns: + dict: User's credit information + """ + user = User.query.get(user_id) + if not user: + return {"error": "User not found"} + + if not user.plan: + return {"error": "User has no plan assigned"} + + return { + "user_id": user.id, + "email": user.email, + "current_credits": user.credits, + "plan": { + "code": user.plan.code, + "name": user.plan.name, + "daily_credits": user.plan.credits, + "max_credits": user.plan.max_credits + }, + "is_active": user.is_active + } \ No newline at end of file diff --git a/app/services/scheduler_service.py b/app/services/scheduler_service.py new file mode 100644 index 0000000..4f05e9c --- /dev/null +++ b/app/services/scheduler_service.py @@ -0,0 +1,107 @@ +"""Scheduler service for managing background tasks with APScheduler.""" + +import logging +from typing import Optional + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger + +from app.services.credit_service import CreditService + +logger = logging.getLogger(__name__) + + +class SchedulerService: + """Service for managing scheduled background tasks.""" + + def __init__(self) -> None: + """Initialize the scheduler service.""" + self.scheduler: Optional[BackgroundScheduler] = None + + def start(self) -> None: + """Start the scheduler and add all scheduled jobs.""" + if self.scheduler is not None: + logger.warning("Scheduler is already running") + return + + self.scheduler = BackgroundScheduler() + + # Add daily credit refill job + self._add_daily_credit_refill_job() + + # Start the scheduler + self.scheduler.start() + logger.info("Scheduler started successfully") + + def stop(self) -> None: + """Stop the scheduler.""" + if self.scheduler is not None: + self.scheduler.shutdown() + self.scheduler = None + logger.info("Scheduler stopped") + + def _add_daily_credit_refill_job(self) -> None: + """Add the daily credit refill job.""" + if self.scheduler is None: + raise RuntimeError("Scheduler not initialized") + + # Schedule daily at 00:00 UTC + trigger = CronTrigger(hour=0, minute=0) + + self.scheduler.add_job( + func=self._run_daily_credit_refill, + trigger=trigger, + id="daily_credit_refill", + name="Daily Credit Refill", + replace_existing=True, + ) + + logger.info("Daily credit refill job scheduled for 00:00 UTC") + + def _run_daily_credit_refill(self) -> None: + """Execute the daily credit refill task.""" + logger.info("Starting daily credit refill task") + + try: + result = CreditService.refill_all_users_credits() + + if result["success"]: + logger.info( + f"Daily credit refill completed successfully: " + f"{result['users_processed']} users processed, " + f"{result['credits_added']} credits added" + ) + else: + logger.error(f"Daily credit refill failed: {result['message']}") + + except Exception as e: + logger.exception(f"Error during daily credit refill: {e}") + + def trigger_credit_refill_now(self) -> dict: + """Manually trigger credit refill for testing purposes.""" + logger.info("Manually triggering credit refill") + return CreditService.refill_all_users_credits() + + def get_scheduler_status(self) -> dict: + """Get the current status of the scheduler.""" + if self.scheduler is None: + return {"running": False, "jobs": []} + + jobs = [] + for job in self.scheduler.get_jobs(): + jobs.append({ + "id": job.id, + "name": job.name, + "next_run": job.next_run_time.isoformat() + if job.next_run_time else None, + "trigger": str(job.trigger), + }) + + return { + "running": self.scheduler.running, + "jobs": jobs, + } + + +# Global scheduler instance +scheduler_service = SchedulerService() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index d044ec7..ba960ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,7 @@ authors = [{ name = "quaik8", email = "quaik8@gmail.com" }] readme = "README.md" requires-python = ">=3.12" dependencies = [ + "apscheduler==3.11.0", "authlib==1.6.0", "flask==3.1.1", "flask-cors==6.0.1", diff --git a/uv.lock b/uv.lock index 5711550..33b65f2 100644 --- a/uv.lock +++ b/uv.lock @@ -16,6 +16,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/dd/e2/88e425adac5ad887a087c38d04fe2030010572a3e0e627f8a6e8c33eeda8/alembic-1.16.2-py3-none-any.whl", hash = "sha256:5f42e9bd0afdbd1d5e3ad856c01754530367debdebf21ed6894e34af52b3bb03", size = 242717 }, ] +[[package]] +name = "apscheduler" +version = "3.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzlocal" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4e/00/6d6814ddc19be2df62c8c898c4df6b5b1914f3bd024b780028caa392d186/apscheduler-3.11.0.tar.gz", hash = "sha256:4c622d250b0955a65d5d0eb91c33e6d43fd879834bf541e0a18661ae60460133", size = 107347 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/ae/9a053dd9229c0fde6b1f1f33f609ccff1ee79ddda364c756a924c6d8563b/APScheduler-3.11.0-py3-none-any.whl", hash = "sha256:fc134ca32e50f5eadcc4938e3a4545ab19131435e851abb40b34d63d5141c6da", size = 64004 }, +] + [[package]] name = "authlib" version = "1.6.0" @@ -529,6 +541,7 @@ name = "sdb-backend" version = "2.0.0" source = { virtual = "." } dependencies = [ + { name = "apscheduler" }, { name = "authlib" }, { name = "flask" }, { name = "flask-cors" }, @@ -549,6 +562,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "apscheduler", specifier = "==3.11.0" }, { name = "authlib", specifier = "==1.6.0" }, { name = "flask", specifier = "==3.1.1" }, { name = "flask-cors", specifier = "==6.0.1" }, @@ -605,6 +619,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/69/e0/552843e0d356fbb5256d21449fa957fa4eff3bbc135a74a691ee70c7c5da/typing_extensions-4.14.0-py3-none-any.whl", hash = "sha256:a1514509136dd0b477638fc68d6a91497af5076466ad0fa6c338e44e359944af", size = 43839 }, ] +[[package]] +name = "tzdata" +version = "2025.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/95/32/1a225d6164441be760d75c2c42e2780dc0873fe382da3e98a2e1e48361e5/tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9", size = 196380 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839 }, +] + +[[package]] +name = "tzlocal" +version = "5.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzdata", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8b/2e/c14812d3d4d9cd1773c6be938f89e5735a1f11a9f184ac3639b93cef35d5/tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd", size = 30761 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c2/14/e2a54fabd4f08cd7af1c07030603c3356b74da07f7cc056e600436edfa17/tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d", size = 18026 }, +] + [[package]] name = "urllib3" version = "2.5.0"