Files
sdb2-backend/app/services/player.py
JSC 5e6cc04ad2
All checks were successful
Backend CI / lint (push) Successful in 9m23s
Backend CI / test (push) Successful in 3m47s
fix: Increase broadcast interval to 1 second while playing
2025-08-16 12:24:41 +02:00

797 lines
29 KiB
Python

"""Player service for audio playbook management."""
import asyncio
import threading
import time
from collections.abc import Callable, Coroutine
from enum import Enum
from typing import Any
import vlc # type: ignore[import-untyped]
from app.core.logging import get_logger
from app.models.playlist import Playlist
from app.models.sound import Sound
from app.models.sound_played import SoundPlayed
from app.repositories.playlist import PlaylistRepository
from app.repositories.sound import SoundRepository
from app.services.socket import socket_manager
from app.utils.audio import get_sound_file_path
logger = get_logger(__name__)
class PlayerStatus(str, Enum):
"""Player status enumeration."""
STOPPED = "stopped"
PLAYING = "playing"
PAUSED = "paused"
class PlayerMode(str, Enum):
"""Player mode enumeration."""
CONTINUOUS = "continuous"
LOOP = "loop"
LOOP_ONE = "loop_one"
RANDOM = "random"
SINGLE = "single"
class PlayerState:
"""Player state data structure."""
def __init__(self) -> None:
"""Initialize player state."""
self.status: PlayerStatus = PlayerStatus.STOPPED
self.mode: PlayerMode = PlayerMode.CONTINUOUS
self.volume: int = 80
self.previous_volume: int = 80
self.current_sound_id: int | None = None
self.current_sound_index: int | None = None
self.current_sound_position: int = 0
self.current_sound_duration: int = 0
self.current_sound: Sound | None = None
self.playlist_id: int | None = None
self.playlist_name: str = ""
self.playlist_length: int = 0
self.playlist_duration: int = 0
self.playlist_sounds: list[Sound] = []
def to_dict(self) -> dict[str, Any]:
"""Convert player state to dictionary for serialization."""
return {
"status": self.status.value,
"mode": self.mode.value,
"volume": self.volume,
"previous_volume": self.previous_volume,
"position": self.current_sound_position or 0,
"duration": self.current_sound_duration,
"index": self.current_sound_index,
"current_sound": self._serialize_sound(self.current_sound),
"playlist": (
{
"id": self.playlist_id,
"name": self.playlist_name,
"length": self.playlist_length,
"duration": self.playlist_duration,
"sounds": [
self._serialize_sound(sound) for sound in self.playlist_sounds
],
}
if self.playlist_id
else None
),
}
def _serialize_sound(self, sound: Sound | None) -> dict[str, Any] | None:
"""Serialize a sound object for JSON serialization."""
if not sound:
return None
# Get extraction URL if sound is linked to an extraction
extract_url = None
if hasattr(sound, "extractions") and sound.extractions:
# Get the first extraction (there should only be one per sound)
extraction = sound.extractions[0]
extract_url = extraction.url
return {
"id": sound.id,
"name": sound.name,
"filename": sound.filename,
"duration": sound.duration,
"size": sound.size,
"type": sound.type,
"thumbnail": sound.thumbnail,
"play_count": sound.play_count,
"extract_url": extract_url,
}
class PlayerService:
"""Service for audio playback management."""
def __init__(self, db_session_factory: Callable) -> None:
"""Initialize the player service."""
self.db_session_factory = db_session_factory
self.state = PlayerState()
self._vlc_instance = vlc.Instance()
if self._vlc_instance is None:
msg = (
"VLC instance could not be created. "
"Ensure VLC is installed and accessible."
)
raise RuntimeError(msg)
self._player = self._vlc_instance.media_player_new()
self._is_running = False
self._position_thread: threading.Thread | None = None
self._play_time_tracking: dict[int, dict[str, Any]] = {}
self._lock = threading.Lock()
self._background_tasks: set[asyncio.Task] = set()
self._loop: asyncio.AbstractEventLoop | None = None
self._last_position_broadcast: float = 0
async def start(self) -> None:
"""Start the player service."""
logger.info("Starting player service")
self._is_running = True
# Store the event loop for thread-safe task scheduling
self._loop = asyncio.get_running_loop()
# Load initial playlist
await self.reload_playlist()
# Start position tracking thread
self._position_thread = threading.Thread(
target=self._position_tracker,
daemon=True,
)
self._position_thread.start()
# Set initial volume
self._player.audio_set_volume(self.state.volume)
logger.info("Player service started")
async def stop(self) -> None:
"""Stop the player service."""
logger.info("Stopping player service")
self._is_running = False
# Stop playback
await self._stop_playback()
# Wait for position thread to finish
if self._position_thread and self._position_thread.is_alive():
self._position_thread.join(timeout=2.0)
# Release VLC player
self._player.release()
logger.info("Player service stopped")
async def play(self, index: int | None = None) -> None:
"""Play audio at specified index or current position."""
if self._should_resume_playback(index):
await self._resume_playback()
return
await self._start_new_track(index)
def _should_resume_playback(self, index: int | None) -> bool:
"""Check if we should resume paused playback."""
return (
index is None
and self.state.status == PlayerStatus.PAUSED
and self.state.current_sound is not None
)
async def _resume_playback(self) -> None:
"""Resume paused playback."""
result = self._player.play()
if result == 0: # VLC returns 0 on success
self.state.status = PlayerStatus.PLAYING
self._ensure_play_time_tracking_for_resume()
await self._broadcast_state()
sound_name = (
self.state.current_sound.name if self.state.current_sound else "Unknown"
)
logger.info("Resumed playing sound: %s", sound_name)
else:
logger.error("Failed to resume playback: VLC error code %s", result)
def _ensure_play_time_tracking_for_resume(self) -> None:
"""Ensure play time tracking is initialized for resumed track."""
if (
self.state.current_sound_id
and self.state.current_sound_id not in self._play_time_tracking
):
self._play_time_tracking[self.state.current_sound_id] = {
"total_time": 0,
"last_position": self.state.current_sound_position,
"last_update": time.time(),
"threshold_reached": False,
}
async def _start_new_track(self, index: int | None) -> None:
"""Start playing a new track."""
if not self._prepare_sound_for_playback(index):
return
if not self._load_and_play_media():
return
await self._handle_successful_playback()
def _prepare_sound_for_playback(self, index: int | None) -> bool:
"""Prepare sound for playback, return True if ready."""
if index is not None and not self._set_sound_by_index(index):
return False
if not self.state.current_sound:
logger.warning("No sound to play")
return False
return self._validate_sound_file()
def _set_sound_by_index(self, index: int) -> bool:
"""Set current sound by index, return True if valid."""
if index < 0 or index >= len(self.state.playlist_sounds):
msg = "Invalid sound index"
raise ValueError(msg)
self.state.current_sound_index = index
self.state.current_sound = self.state.playlist_sounds[index]
self.state.current_sound_id = self.state.current_sound.id
return True
def _validate_sound_file(self) -> bool:
"""Validate sound file exists, return True if valid."""
if not self.state.current_sound:
return False
sound_path = get_sound_file_path(self.state.current_sound)
if not sound_path.exists():
logger.error("Sound file not found: %s", sound_path)
return False
return True
def _load_and_play_media(self) -> bool:
"""Load media and start playback, return True if successful."""
if self._vlc_instance is None:
logger.error("VLC instance is not initialized. Cannot play media.")
return False
if not self.state.current_sound:
logger.error("No current sound to play")
return False
sound_path = get_sound_file_path(self.state.current_sound)
media = self._vlc_instance.media_new(str(sound_path))
self._player.set_media(media)
result = self._player.play()
if result != 0: # VLC returns 0 on success
logger.error("Failed to start playback: VLC error code %s", result)
return False
return True
async def _handle_successful_playback(self) -> None:
"""Handle successful playback start."""
if not self.state.current_sound:
logger.error("No current sound for successful playback")
return
self.state.status = PlayerStatus.PLAYING
self.state.current_sound_duration = self.state.current_sound.duration or 0
self._initialize_play_time_tracking()
await self._broadcast_state()
logger.info("Started playing sound: %s", self.state.current_sound.name)
def _initialize_play_time_tracking(self) -> None:
"""Initialize play time tracking for new track."""
if self.state.current_sound_id:
self._play_time_tracking[self.state.current_sound_id] = {
"total_time": 0,
"last_position": 0,
"last_update": time.time(),
"threshold_reached": False,
}
logger.info(
"Initialized play time tracking for sound %s (duration: %s ms)",
self.state.current_sound_id,
self.state.current_sound_duration,
)
async def pause(self) -> None:
"""Pause playback."""
if self.state.status == PlayerStatus.PLAYING:
self._player.pause()
self.state.status = PlayerStatus.PAUSED
await self._broadcast_state()
logger.info("Playback paused")
async def stop_playback(self) -> None:
"""Stop playback."""
await self._stop_playback()
await self._broadcast_state()
async def _stop_playback(self) -> None:
"""Stop playback internal method."""
if self.state.status != PlayerStatus.STOPPED:
self._player.stop()
self.state.status = PlayerStatus.STOPPED
self.state.current_sound_position = 0
# Process any pending play counts
await self._process_play_count()
logger.info("Playback stopped")
async def next(self) -> None:
"""Skip to next track."""
if not self.state.playlist_sounds:
return
current_index = self.state.current_sound_index or 0
next_index = self._get_next_index(current_index)
if next_index is not None:
await self.play(next_index)
else:
await self._stop_playback()
await self._broadcast_state()
async def previous(self) -> None:
"""Go to previous track."""
if not self.state.playlist_sounds:
return
current_index = self.state.current_sound_index or 0
prev_index = self._get_previous_index(current_index)
if prev_index is not None:
await self.play(prev_index)
async def seek(self, position_ms: int) -> None:
"""Seek to specific position in current track."""
if self.state.status == PlayerStatus.STOPPED:
return
# Convert milliseconds to VLC position (0.0 to 1.0)
if self.state.current_sound_duration > 0:
position = position_ms / self.state.current_sound_duration
position = max(0.0, min(1.0, position)) # Clamp to valid range
self._player.set_position(position)
self.state.current_sound_position = position_ms
await self._broadcast_state()
logger.debug("Seeked to position: %sms", position_ms)
async def set_volume(self, volume: int) -> None:
"""Set playback volume (0-100)."""
volume = max(0, min(100, volume)) # Clamp to valid range
# Store previous volume when muting (going from >0 to 0)
if self.state.volume > 0 and volume == 0:
self.state.previous_volume = self.state.volume
self.state.volume = volume
self._player.audio_set_volume(volume)
await self._broadcast_state()
logger.debug("Volume set to: %s", volume)
async def mute(self) -> None:
"""Mute the player (stores current volume as previous_volume)."""
if self.state.volume > 0:
await self.set_volume(0)
async def unmute(self) -> None:
"""Unmute the player (restores previous_volume)."""
if self.state.volume == 0 and self.state.previous_volume > 0:
await self.set_volume(self.state.previous_volume)
async def set_mode(self, mode: PlayerMode) -> None:
"""Set playback mode."""
self.state.mode = mode
await self._broadcast_state()
logger.info("Playback mode set to: %s", mode.value)
async def reload_playlist(self) -> None:
"""Reload current playlist from database."""
session = self.db_session_factory()
try:
playlist_repo = PlaylistRepository(session)
current_playlist = await playlist_repo.get_current_playlist()
if current_playlist and current_playlist.id:
sounds = await playlist_repo.get_playlist_sounds(current_playlist.id)
await self._handle_playlist_reload(current_playlist, sounds)
logger.info(
"Loaded playlist: %s (%s sounds)",
current_playlist.name,
len(sounds),
)
else:
logger.warning("No playlist found to load")
finally:
await session.close()
await self._broadcast_state()
async def _handle_playlist_reload(
self,
current_playlist: Playlist,
sounds: list[Sound],
) -> None:
"""Handle playlist reload logic with ID comparison."""
# Store previous state for comparison
previous_playlist_id = self.state.playlist_id
previous_current_sound_id = self.state.current_sound_id
previous_current_sound_index = self.state.current_sound_index
# Update basic playlist state
self._update_playlist_state(current_playlist, sounds)
# Handle playlist changes based on ID comparison
if (
current_playlist.id is not None
and previous_playlist_id != current_playlist.id
):
await self._handle_playlist_id_changed(
previous_playlist_id,
current_playlist.id,
sounds,
)
elif previous_current_sound_id:
await self._handle_same_playlist_track_check(
previous_current_sound_id,
previous_current_sound_index,
sounds,
)
elif sounds:
self._set_first_track_as_current(sounds)
async def _handle_playlist_id_changed(
self,
previous_id: int | None,
current_id: int,
sounds: list[Sound],
) -> None:
"""Handle when playlist ID changes - stop player and reset to first track."""
logger.info(
"Playlist changed from %s to %s - stopping player and resetting",
previous_id,
current_id,
)
if self.state.status != PlayerStatus.STOPPED:
await self._stop_playback()
if sounds:
self._set_first_track_as_current(sounds)
else:
self._clear_current_track()
async def _handle_same_playlist_track_check(
self,
previous_sound_id: int,
previous_index: int | None,
sounds: list[Sound],
) -> None:
"""Handle track checking when playlist ID is the same."""
# Find the current track in the new playlist
new_index = self._find_sound_index(previous_sound_id, sounds)
if new_index is not None:
# Track still exists - update index if it changed
if new_index != previous_index:
logger.info(
"Current track %s moved from index %s to %s",
previous_sound_id,
previous_index,
new_index,
)
# Always set the index and sound reference
self.state.current_sound_index = new_index
self.state.current_sound = sounds[new_index]
else:
# Current track no longer exists in playlist
await self._handle_track_removed(previous_sound_id, sounds)
async def _handle_track_removed(
self,
previous_sound_id: int,
sounds: list[Sound],
) -> None:
"""Handle when current track no longer exists in playlist."""
logger.info(
"Current track %s no longer exists in playlist - stopping and resetting",
previous_sound_id,
)
if self.state.status != PlayerStatus.STOPPED:
await self._stop_playback()
if sounds:
self._set_first_track_as_current(sounds)
else:
self._clear_current_track()
def _update_playlist_state(
self,
current_playlist: Playlist,
sounds: list[Sound],
) -> None:
"""Update basic playlist state information."""
self.state.playlist_id = current_playlist.id
self.state.playlist_name = current_playlist.name
self.state.playlist_sounds = sounds
self.state.playlist_length = len(sounds)
self.state.playlist_duration = sum(sound.duration or 0 for sound in sounds)
def _find_sound_index(self, sound_id: int, sounds: list[Sound]) -> int | None:
"""Find the index of a sound in the sounds list."""
for i, sound in enumerate(sounds):
if sound.id == sound_id:
return i
return None
def _set_first_track_as_current(self, sounds: list[Sound]) -> None:
"""Set the first track as the current track."""
self.state.current_sound_index = 0
self.state.current_sound = sounds[0]
self.state.current_sound_id = sounds[0].id
def _clear_current_track(self) -> None:
"""Clear the current track state."""
self.state.current_sound_index = None
self.state.current_sound = None
self.state.current_sound_id = None
def get_state(self) -> dict[str, Any]:
"""Get current player state."""
return self.state.to_dict()
def _get_next_index(self, current_index: int) -> int | None:
"""Get next track index based on current mode."""
if not self.state.playlist_sounds:
return None
playlist_length = len(self.state.playlist_sounds)
if self.state.mode == PlayerMode.SINGLE:
return None
if self.state.mode == PlayerMode.LOOP_ONE:
return current_index
if self.state.mode == PlayerMode.RANDOM:
import random # noqa: PLC0415
indices = list(range(playlist_length))
indices.remove(current_index)
return random.choice(indices) if indices else None # noqa: S311
# CONTINUOUS or LOOP
next_index = current_index + 1
if next_index >= playlist_length:
return 0 if self.state.mode == PlayerMode.LOOP else None
return next_index
def _get_previous_index(self, current_index: int) -> int | None:
"""Get previous track index."""
if not self.state.playlist_sounds:
return None
playlist_length = len(self.state.playlist_sounds)
prev_index = current_index - 1
if prev_index < 0:
return playlist_length - 1 if self.state.mode == PlayerMode.LOOP else None
return prev_index
def _position_tracker(self) -> None:
"""Background thread to track playback position and handle auto-advance."""
while self._is_running:
if self.state.status == PlayerStatus.PLAYING:
# Update position
vlc_position = self._player.get_position()
if vlc_position >= 0: # Valid position
self.state.current_sound_position = int(
vlc_position * self.state.current_sound_duration,
)
# Check if track finished
player_state = self._player.get_state()
vlc_state_ended = 6 # vlc.State.Ended value
if player_state == vlc_state_ended:
# Track finished, handle auto-advance
self._schedule_async_task(self._handle_track_finished())
# Update play time tracking
self._update_play_time()
# Broadcast state every second while playing
broadcast_interval = 1
current_time = time.time()
if current_time - self._last_position_broadcast >= broadcast_interval:
self._last_position_broadcast = current_time
self._schedule_async_task(self._broadcast_state())
time.sleep(0.1) # 100ms update interval
def _update_play_time(self) -> None:
"""Update play time tracking for current sound."""
if not self.state.current_sound_id or self.state.status != PlayerStatus.PLAYING:
return
sound_id = self.state.current_sound_id
current_time = time.time()
current_position = self.state.current_sound_position
with self._lock:
if sound_id in self._play_time_tracking:
tracking = self._play_time_tracking[sound_id]
# Calculate time elapsed (only if position advanced reasonably)
time_elapsed = current_time - tracking["last_update"]
position_diff = abs(current_position - tracking["last_position"])
# Only count if position advanced naturally (not seeking)
max_position_jump = 5000 # 5 seconds in milliseconds
if time_elapsed > 0 and position_diff < max_position_jump:
# Add real time elapsed (converted to ms)
tracking["total_time"] += time_elapsed * 1000
tracking["last_position"] = current_position
tracking["last_update"] = current_time
# Check if 20% threshold reached
play_threshold = 0.2 # 20% of track duration
threshold_time = self.state.current_sound_duration * play_threshold
if (
not tracking["threshold_reached"]
and self.state.current_sound_duration > 0
and tracking["total_time"] >= threshold_time
):
tracking["threshold_reached"] = True
logger.info(
"Play count threshold reached for sound %s: %s/%s ms (%.1f%%)",
sound_id,
tracking["total_time"],
self.state.current_sound_duration,
(tracking["total_time"] / self.state.current_sound_duration)
* 100,
)
self._schedule_async_task(self._record_play_count(sound_id))
async def _record_play_count(self, sound_id: int) -> None:
"""Record a play count for a sound."""
logger.info("Recording play count for sound %s", sound_id)
session = self.db_session_factory()
try:
sound_repo = SoundRepository(session)
# Update sound play count
sound = await sound_repo.get_by_id(sound_id)
if sound:
old_count = sound.play_count
await sound_repo.update(
sound,
{"play_count": sound.play_count + 1},
)
logger.info(
"Updated sound %s play_count: %s -> %s",
sound_id,
old_count,
old_count + 1,
)
else:
logger.warning("Sound %s not found for play count update", sound_id)
# Record play history without user_id for player-based plays
# Always create a new SoundPlayed record for each play event
sound_played = SoundPlayed(
user_id=None, # No user_id for player-based plays
sound_id=sound_id,
)
session.add(sound_played)
logger.info(
"Created SoundPlayed record for player play, sound %s",
sound_id,
)
await session.commit()
logger.info("Successfully recorded play count for sound %s", sound_id)
except Exception:
logger.exception("Error recording play count for sound %s", sound_id)
await session.rollback()
finally:
await session.close()
async def _process_play_count(self) -> None:
"""Process any pending play counts when stopping."""
if not self.state.current_sound_id:
return
sound_id = self.state.current_sound_id
with self._lock:
if (
sound_id in self._play_time_tracking
and self._play_time_tracking[sound_id]["threshold_reached"]
):
# Already processed
del self._play_time_tracking[sound_id]
async def _handle_track_finished(self) -> None:
"""Handle when a track finishes playing."""
await self._process_play_count()
# Auto-advance to next track
if self.state.current_sound_index is not None:
next_index = self._get_next_index(self.state.current_sound_index)
if next_index is not None:
await self.play(next_index)
else:
await self._stop_playback()
await self._broadcast_state()
async def _broadcast_state(self) -> None:
"""Broadcast current player state via WebSocket."""
try:
state_data = self.get_state()
await socket_manager.broadcast_to_all("player_state", state_data)
except Exception:
logger.exception("Error broadcasting player state")
def _track_task(self, task: asyncio.Task) -> None:
"""Track background task to prevent garbage collection."""
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
def _schedule_async_task(self, coro: Coroutine[Any, Any, Any]) -> None:
"""Schedule an async task from a background thread."""
if self._loop and not self._loop.is_closed():
try:
# Use run_coroutine_threadsafe to schedule the coroutine
asyncio.run_coroutine_threadsafe(coro, self._loop)
# Don't wait for the result to avoid blocking the thread
except Exception:
logger.exception("Error scheduling async task")
# Global player service instance
player_service: PlayerService | None = None
def get_player_service() -> PlayerService:
"""Get the global player service instance."""
if player_service is None:
msg = "Player service not initialized"
raise RuntimeError(msg)
return player_service
async def initialize_player_service(db_session_factory: Callable) -> None:
"""Initialize the global player service."""
global player_service # noqa: PLW0603
player_service = PlayerService(db_session_factory)
await player_service.start()
async def shutdown_player_service() -> None:
"""Shutdown the global player service."""
global player_service # noqa: PLW0603
if player_service:
await player_service.stop()
player_service = None