feat: Enhance stream processing and SocketIO services with app context management
This commit is contained in:
@@ -30,13 +30,18 @@ class StreamProcessingService:
|
||||
os.getenv("STREAM_MAX_CONCURRENT", "2")
|
||||
)
|
||||
_downloads_dir: str = "sounds/temp"
|
||||
_app_instance = None # Store the Flask app instance
|
||||
|
||||
@classmethod
|
||||
def initialize(cls) -> None:
|
||||
def initialize(cls, app=None) -> None:
|
||||
"""Initialize the stream processing service."""
|
||||
if cls._is_running:
|
||||
return
|
||||
|
||||
# Store the Flask app instance if provided
|
||||
if app:
|
||||
cls._app_instance = app
|
||||
|
||||
# Create necessary directories
|
||||
os.makedirs(cls._downloads_dir, exist_ok=True)
|
||||
os.makedirs("sounds/stream", exist_ok=True)
|
||||
@@ -83,18 +88,21 @@ class StreamProcessingService:
|
||||
@classmethod
|
||||
def _worker_thread(cls) -> None:
|
||||
"""Worker thread for processing streams."""
|
||||
from app import create_app
|
||||
|
||||
# Create app context for database operations
|
||||
app = create_app()
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Get stream ID from queue with timeout
|
||||
stream_id = cls._processing_queue.get(timeout=1)
|
||||
|
||||
with app.app_context():
|
||||
cls._process_stream(stream_id)
|
||||
# Use the stored app instance for database operations
|
||||
if cls._app_instance:
|
||||
with cls._app_instance.app_context():
|
||||
cls._process_stream(stream_id)
|
||||
else:
|
||||
# Fallback: import create_app if no app instance stored
|
||||
from app import create_app
|
||||
app = create_app()
|
||||
with app.app_context():
|
||||
cls._process_stream(stream_id)
|
||||
|
||||
cls._processing_queue.task_done()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user