feat: Add scheduler for daily user credits recharge
This commit is contained in:
@@ -222,8 +222,8 @@ class CreditService:
|
||||
"Failed to emit user_credits_changed event for user %s",
|
||||
user_id,
|
||||
)
|
||||
else:
|
||||
return transaction
|
||||
|
||||
return transaction
|
||||
|
||||
except Exception:
|
||||
await session.rollback()
|
||||
@@ -315,8 +315,8 @@ class CreditService:
|
||||
"Failed to emit user_credits_changed event for user %s",
|
||||
user_id,
|
||||
)
|
||||
else:
|
||||
return transaction
|
||||
|
||||
return transaction
|
||||
|
||||
except Exception:
|
||||
await session.rollback()
|
||||
@@ -402,3 +402,180 @@ class CreditService:
|
||||
return user.credits
|
||||
finally:
|
||||
await session.close()
|
||||
|
||||
async def recharge_user_credits(
|
||||
self,
|
||||
user_id: int,
|
||||
plan_credits: int,
|
||||
max_credits: int,
|
||||
) -> CreditTransaction | None:
|
||||
"""Recharge credits for a user based on their plan.
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
plan_credits: Number of credits from the plan
|
||||
max_credits: Maximum credits allowed for the plan
|
||||
|
||||
Returns:
|
||||
The created credit transaction if credits were added, None if no recharge
|
||||
needed
|
||||
|
||||
Raises:
|
||||
ValueError: If user not found
|
||||
|
||||
"""
|
||||
session = self.db_session_factory()
|
||||
try:
|
||||
user_repo = UserRepository(session)
|
||||
user = await user_repo.get_by_id(user_id)
|
||||
if not user:
|
||||
msg = f"User {user_id} not found"
|
||||
raise ValueError(msg)
|
||||
|
||||
# Calculate credits to add (can't exceed max_credits)
|
||||
current_credits = user.credits
|
||||
target_credits = min(current_credits + plan_credits, max_credits)
|
||||
credits_to_add = target_credits - current_credits
|
||||
|
||||
# If no credits to add, return None
|
||||
if credits_to_add <= 0:
|
||||
logger.info(
|
||||
"No credits to add for user %s: current=%s, "
|
||||
"plan_credits=%s, max=%s",
|
||||
user_id,
|
||||
current_credits,
|
||||
plan_credits,
|
||||
max_credits,
|
||||
)
|
||||
return None
|
||||
|
||||
# Record transaction
|
||||
transaction = CreditTransaction(
|
||||
user_id=user_id,
|
||||
action_type=CreditActionType.DAILY_RECHARGE.value,
|
||||
amount=credits_to_add,
|
||||
balance_before=current_credits,
|
||||
balance_after=target_credits,
|
||||
description="Daily credit recharge",
|
||||
success=True,
|
||||
metadata_json=json.dumps(
|
||||
{
|
||||
"plan_credits": plan_credits,
|
||||
"max_credits": max_credits,
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
# Update user credits
|
||||
await user_repo.update(user, {"credits": target_credits})
|
||||
|
||||
# Save transaction
|
||||
session.add(transaction)
|
||||
await session.commit()
|
||||
|
||||
logger.info(
|
||||
"Credits recharged for user %s: %s credits added (balance: %s → %s)",
|
||||
user_id,
|
||||
credits_to_add,
|
||||
current_credits,
|
||||
target_credits,
|
||||
)
|
||||
|
||||
# Emit user_credits_changed event via WebSocket
|
||||
try:
|
||||
event_data = {
|
||||
"user_id": str(user_id),
|
||||
"credits_before": current_credits,
|
||||
"credits_after": target_credits,
|
||||
"credits_added": credits_to_add,
|
||||
"description": "Daily credit recharge",
|
||||
"success": True,
|
||||
}
|
||||
await socket_manager.send_to_user(
|
||||
str(user_id),
|
||||
"user_credits_changed",
|
||||
event_data,
|
||||
)
|
||||
logger.info("Emitted user_credits_changed event for user %s", user_id)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to emit user_credits_changed event for user %s",
|
||||
user_id,
|
||||
)
|
||||
|
||||
return transaction
|
||||
|
||||
except Exception:
|
||||
await session.rollback()
|
||||
raise
|
||||
finally:
|
||||
await session.close()
|
||||
|
||||
async def recharge_all_users_credits(self) -> dict[str, int]:
|
||||
"""Recharge credits for all users based on their plans.
|
||||
|
||||
Returns:
|
||||
Dictionary with statistics about the recharge operation
|
||||
|
||||
"""
|
||||
session = self.db_session_factory()
|
||||
stats = {
|
||||
"total_users": 0,
|
||||
"recharged_users": 0,
|
||||
"skipped_users": 0,
|
||||
"total_credits_added": 0,
|
||||
}
|
||||
|
||||
try:
|
||||
user_repo = UserRepository(session)
|
||||
|
||||
# Process users in batches to avoid memory issues
|
||||
offset = 0
|
||||
batch_size = 100
|
||||
|
||||
while True:
|
||||
users = await user_repo.get_all_with_plan(
|
||||
limit=batch_size,
|
||||
offset=offset,
|
||||
)
|
||||
if not users:
|
||||
break
|
||||
|
||||
for user in users:
|
||||
stats["total_users"] += 1
|
||||
|
||||
# Skip users without ID (shouldn't happen in practice)
|
||||
if user.id is None:
|
||||
continue
|
||||
|
||||
transaction = await self.recharge_user_credits(
|
||||
user.id,
|
||||
user.plan.credits,
|
||||
user.plan.max_credits,
|
||||
)
|
||||
|
||||
if transaction:
|
||||
stats["recharged_users"] += 1
|
||||
stats["total_credits_added"] += transaction.amount
|
||||
else:
|
||||
stats["skipped_users"] += 1
|
||||
|
||||
offset += batch_size
|
||||
|
||||
# Break if we got fewer users than batch_size (last batch)
|
||||
if len(users) < batch_size:
|
||||
break
|
||||
|
||||
logger.info(
|
||||
"Daily credit recharge completed: %s total users, "
|
||||
"%s recharged, %s skipped, %s total credits added",
|
||||
stats["total_users"],
|
||||
stats["recharged_users"],
|
||||
stats["skipped_users"],
|
||||
stats["total_credits_added"],
|
||||
)
|
||||
|
||||
return stats
|
||||
|
||||
finally:
|
||||
await session.close()
|
||||
|
||||
63
app/services/scheduler.py
Normal file
63
app/services/scheduler.py
Normal file
@@ -0,0 +1,63 @@
|
||||
"""Scheduler service for periodic tasks."""
|
||||
|
||||
from collections.abc import Callable
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
from app.core.logging import get_logger
|
||||
from app.services.credit import CreditService
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class SchedulerService:
|
||||
"""Service for managing scheduled tasks."""
|
||||
|
||||
def __init__(self, db_session_factory: Callable[[], AsyncSession]) -> None:
|
||||
"""Initialize the scheduler service.
|
||||
|
||||
Args:
|
||||
db_session_factory: Factory function to create database sessions
|
||||
|
||||
"""
|
||||
self.db_session_factory = db_session_factory
|
||||
self.scheduler = AsyncIOScheduler()
|
||||
self.credit_service = CreditService(db_session_factory)
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the scheduler and register all tasks."""
|
||||
logger.info("Starting scheduler service...")
|
||||
|
||||
# Add daily credit recharge job (runs at midnight UTC)
|
||||
self.scheduler.add_job(
|
||||
self._daily_credit_recharge,
|
||||
"cron",
|
||||
hour=0,
|
||||
minute=0,
|
||||
id="daily_credit_recharge",
|
||||
name="Daily Credit Recharge",
|
||||
replace_existing=True,
|
||||
)
|
||||
|
||||
self.scheduler.start()
|
||||
logger.info("Scheduler service started successfully")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the scheduler."""
|
||||
logger.info("Stopping scheduler service...")
|
||||
self.scheduler.shutdown()
|
||||
logger.info("Scheduler service stopped")
|
||||
|
||||
async def _daily_credit_recharge(self) -> None:
|
||||
"""Execute daily credit recharge for all users."""
|
||||
logger.info("Starting daily credit recharge task...")
|
||||
|
||||
try:
|
||||
stats = await self.credit_service.recharge_all_users_credits()
|
||||
logger.info(
|
||||
"Daily credit recharge completed successfully: %s",
|
||||
stats,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Daily credit recharge task failed")
|
||||
Reference in New Issue
Block a user