diff --git a/app/services/sound_normalizer.py b/app/services/sound_normalizer.py index 1cb4ebf..3d3472c 100644 --- a/app/services/sound_normalizer.py +++ b/app/services/sound_normalizer.py @@ -1,6 +1,5 @@ """Sound normalizer service for normalizing audio files using ffmpeg loudnorm.""" -import hashlib import json import os import re @@ -14,6 +13,7 @@ from app.core.config import settings from app.core.logging import get_logger from app.models.sound import Sound from app.repositories.sound import SoundRepository +from app.utils.audio import get_audio_duration, get_file_hash, get_file_size logger = get_logger(__name__) @@ -107,27 +107,6 @@ class SoundNormalizerService: original_dir = type_to_original_dir.get(sound_type, "sounds/originals/other") return Path(original_dir) / filename - def _get_file_hash(self, file_path: Path) -> str: - """Calculate SHA-256 hash of a file.""" - hash_sha256 = hashlib.sha256() - with open(file_path, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - hash_sha256.update(chunk) - return hash_sha256.hexdigest() - - def _get_file_size(self, file_path: Path) -> int: - """Get file size in bytes.""" - return file_path.stat().st_size - - def _get_audio_duration(self, file_path: Path) -> int: - """Get audio duration in milliseconds using ffmpeg.""" - try: - probe = ffmpeg.probe(str(file_path)) - duration = float(probe["format"]["duration"]) - return int(duration * 1000) # Convert to milliseconds - except Exception as e: - logger.warning("Failed to get duration for %s: %s", file_path, e) - return 0 async def _normalize_audio_one_pass( self, @@ -341,9 +320,9 @@ class SoundNormalizerService: await self._normalize_audio_two_pass(original_path, normalized_path) # Get normalized file info - normalized_duration = self._get_audio_duration(normalized_path) - normalized_size = self._get_file_size(normalized_path) - normalized_hash = self._get_file_hash(normalized_path) + normalized_duration = get_audio_duration(normalized_path) + normalized_size = get_file_size(normalized_path) + normalized_hash = get_file_hash(normalized_path) normalized_filename = normalized_path.name # Update sound in database diff --git a/app/services/sound_scanner.py b/app/services/sound_scanner.py index fe12ca7..f5155b0 100644 --- a/app/services/sound_scanner.py +++ b/app/services/sound_scanner.py @@ -1,15 +1,14 @@ """Sound scanner service for scanning and importing audio files.""" -import hashlib from pathlib import Path from typing import TypedDict -import ffmpeg # type: ignore[import-untyped] from sqlmodel.ext.asyncio.session import AsyncSession from app.core.logging import get_logger from app.models.sound import Sound from app.repositories.sound import SoundRepository +from app.utils.audio import get_audio_duration, get_file_hash, get_file_size logger = get_logger(__name__) @@ -57,27 +56,6 @@ class SoundScannerService: ".aac", } - def get_file_hash(self, file_path: Path) -> str: - """Calculate SHA-256 hash of a file.""" - hash_sha256 = hashlib.sha256() - with open(file_path, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - hash_sha256.update(chunk) - return hash_sha256.hexdigest() - - def get_audio_duration(self, file_path: Path) -> int: - """Get audio duration in milliseconds using ffmpeg.""" - try: - probe = ffmpeg.probe(str(file_path)) - duration = float(probe["format"]["duration"]) - return int(duration * 1000) # Convert to milliseconds - except Exception as e: - logger.warning("Failed to get duration for %s: %s", file_path, e) - return 0 - - def get_file_size(self, file_path: Path) -> int: - """Get file size in bytes.""" - return file_path.stat().st_size def extract_name_from_filename(self, filename: str) -> str: """Extract a clean name from filename.""" @@ -207,9 +185,9 @@ class SoundScannerService: ) -> None: """Sync a single audio file (add new or update existing).""" filename = file_path.name - file_hash = self.get_file_hash(file_path) - duration = self.get_audio_duration(file_path) - size = self.get_file_size(file_path) + file_hash = get_file_hash(file_path) + duration = get_audio_duration(file_path) + size = get_file_size(file_path) name = self.extract_name_from_filename(filename) if existing_sound is None: diff --git a/app/utils/audio.py b/app/utils/audio.py new file mode 100644 index 0000000..091a65a --- /dev/null +++ b/app/utils/audio.py @@ -0,0 +1,35 @@ +"""Audio file utility functions shared across audio processing services.""" + +import hashlib +from pathlib import Path + +import ffmpeg # type: ignore[import-untyped] + +from app.core.logging import get_logger + +logger = get_logger(__name__) + + +def get_file_hash(file_path: Path) -> str: + """Calculate SHA-256 hash of a file.""" + hash_sha256 = hashlib.sha256() + with file_path.open("rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_sha256.update(chunk) + return hash_sha256.hexdigest() + + +def get_file_size(file_path: Path) -> int: + """Get file size in bytes.""" + return file_path.stat().st_size + + +def get_audio_duration(file_path: Path) -> int: + """Get audio duration in milliseconds using ffmpeg.""" + try: + probe = ffmpeg.probe(str(file_path)) + duration = float(probe["format"]["duration"]) + return int(duration * 1000) # Convert to milliseconds + except Exception as e: + logger.warning("Failed to get duration for %s: %s", file_path, e) + return 0 \ No newline at end of file diff --git a/tests/services/test_sound_normalizer.py b/tests/services/test_sound_normalizer.py index f9afff1..dadf75b 100644 --- a/tests/services/test_sound_normalizer.py +++ b/tests/services/test_sound_normalizer.py @@ -82,7 +82,8 @@ class TestSoundNormalizerService: temp_path = Path(f.name) try: - hash_value = normalizer_service._get_file_hash(temp_path) + from app.utils.audio import get_file_hash + hash_value = get_file_hash(temp_path) assert len(hash_value) == 64 # SHA-256 hash length assert isinstance(hash_value, str) finally: @@ -96,30 +97,33 @@ class TestSoundNormalizerService: temp_path = Path(f.name) try: - size = normalizer_service._get_file_size(temp_path) + from app.utils.audio import get_file_size + size = get_file_size(temp_path) assert size > 0 assert isinstance(size, int) finally: temp_path.unlink() - @patch("app.services.sound_normalizer.ffmpeg.probe") + @patch("app.utils.audio.ffmpeg.probe") def test_get_audio_duration_success(self, mock_probe, normalizer_service): """Test successful audio duration extraction.""" mock_probe.return_value = {"format": {"duration": "123.456"}} temp_path = Path("/fake/path/test.mp3") - duration = normalizer_service._get_audio_duration(temp_path) + from app.utils.audio import get_audio_duration + duration = get_audio_duration(temp_path) assert duration == 123456 # 123.456 seconds * 1000 = 123456 ms mock_probe.assert_called_once_with(str(temp_path)) - @patch("app.services.sound_normalizer.ffmpeg.probe") + @patch("app.utils.audio.ffmpeg.probe") def test_get_audio_duration_failure(self, mock_probe, normalizer_service): """Test audio duration extraction failure.""" mock_probe.side_effect = Exception("FFmpeg error") temp_path = Path("/fake/path/test.mp3") - duration = normalizer_service._get_audio_duration(temp_path) + from app.utils.audio import get_audio_duration + duration = get_audio_duration(temp_path) assert duration == 0 mock_probe.assert_called_once_with(str(temp_path)) @@ -163,9 +167,9 @@ class TestSoundNormalizerService: with patch.object(normalizer_service, "_get_original_path") as mock_orig_path, \ patch.object(normalizer_service, "_get_normalized_path") as mock_norm_path, \ patch.object(normalizer_service, "_normalize_audio_two_pass") as mock_normalize, \ - patch.object(normalizer_service, "_get_audio_duration", return_value=6000), \ - patch.object(normalizer_service, "_get_file_size", return_value=2048), \ - patch.object(normalizer_service, "_get_file_hash", return_value="new_hash"): + patch("app.services.sound_normalizer.get_audio_duration", return_value=6000), \ + patch("app.services.sound_normalizer.get_file_size", return_value=2048), \ + patch("app.services.sound_normalizer.get_file_hash", return_value="new_hash"): # Setup path mocks mock_orig_path.return_value = Path("/fake/original.mp3") @@ -229,9 +233,9 @@ class TestSoundNormalizerService: with patch.object(normalizer_service, "_get_original_path") as mock_orig_path, \ patch.object(normalizer_service, "_get_normalized_path") as mock_norm_path, \ patch.object(normalizer_service, "_normalize_audio_one_pass") as mock_normalize, \ - patch.object(normalizer_service, "_get_audio_duration", return_value=5500), \ - patch.object(normalizer_service, "_get_file_size", return_value=1500), \ - patch.object(normalizer_service, "_get_file_hash", return_value="norm_hash"): + patch("app.services.sound_normalizer.get_audio_duration", return_value=5500), \ + patch("app.services.sound_normalizer.get_file_size", return_value=1500), \ + patch("app.services.sound_normalizer.get_file_hash", return_value="norm_hash"): # Setup path mocks mock_orig_path.return_value = Path("/fake/original.mp3") diff --git a/tests/services/test_sound_scanner.py b/tests/services/test_sound_scanner.py index e7b2507..37ab3be 100644 --- a/tests/services/test_sound_scanner.py +++ b/tests/services/test_sound_scanner.py @@ -40,7 +40,8 @@ class TestSoundScannerService: temp_path = Path(f.name) try: - hash_value = scanner_service.get_file_hash(temp_path) + from app.utils.audio import get_file_hash + hash_value = get_file_hash(temp_path) assert len(hash_value) == 64 # SHA-256 hash length assert isinstance(hash_value, str) finally: @@ -54,7 +55,8 @@ class TestSoundScannerService: temp_path = Path(f.name) try: - size = scanner_service.get_file_size(temp_path) + from app.utils.audio import get_file_size + size = get_file_size(temp_path) assert size > 0 assert isinstance(size, int) finally: @@ -74,24 +76,26 @@ class TestSoundScannerService: result = scanner_service.extract_name_from_filename(filename) assert result == expected_name - @patch("app.services.sound_scanner.ffmpeg.probe") + @patch("app.utils.audio.ffmpeg.probe") def test_get_audio_duration_success(self, mock_probe, scanner_service): """Test successful audio duration extraction.""" mock_probe.return_value = {"format": {"duration": "123.456"}} temp_path = Path("/fake/path/test.mp3") - duration = scanner_service.get_audio_duration(temp_path) + from app.utils.audio import get_audio_duration + duration = get_audio_duration(temp_path) assert duration == 123456 # 123.456 seconds * 1000 = 123456 ms mock_probe.assert_called_once_with(str(temp_path)) - @patch("app.services.sound_scanner.ffmpeg.probe") + @patch("app.utils.audio.ffmpeg.probe") def test_get_audio_duration_failure(self, mock_probe, scanner_service): """Test audio duration extraction failure.""" mock_probe.side_effect = Exception("FFmpeg error") temp_path = Path("/fake/path/test.mp3") - duration = scanner_service.get_audio_duration(temp_path) + from app.utils.audio import get_audio_duration + duration = get_audio_duration(temp_path) assert duration == 0 mock_probe.assert_called_once_with(str(temp_path)) @@ -125,36 +129,36 @@ class TestSoundScannerService: ) # Mock file operations to return same hash - scanner_service.get_file_hash = Mock(return_value="same_hash") - scanner_service.get_audio_duration = Mock(return_value=120000) - scanner_service.get_file_size = Mock(return_value=1024) + with patch("app.services.sound_scanner.get_file_hash", return_value="same_hash"), \ + patch("app.services.sound_scanner.get_audio_duration", return_value=120000), \ + patch("app.services.sound_scanner.get_file_size", return_value=1024): - # Create a temporary file - with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: - temp_path = Path(f.name) + # Create a temporary file + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: + temp_path = Path(f.name) - try: - results = { - "scanned": 0, - "added": 0, - "updated": 0, - "deleted": 0, - "skipped": 0, - "errors": 0, - "files": [], - } - await scanner_service._sync_audio_file( - temp_path, "SDB", existing_sound, results - ) + try: + results = { + "scanned": 0, + "added": 0, + "updated": 0, + "deleted": 0, + "skipped": 0, + "errors": 0, + "files": [], + } + await scanner_service._sync_audio_file( + temp_path, "SDB", existing_sound, results + ) - assert results["skipped"] == 1 - assert results["added"] == 0 - assert results["updated"] == 0 - assert len(results["files"]) == 1 - assert results["files"][0]["status"] == "skipped" - assert results["files"][0]["reason"] == "file unchanged" - finally: - temp_path.unlink() + assert results["skipped"] == 1 + assert results["added"] == 0 + assert results["updated"] == 0 + assert len(results["files"]) == 1 + assert results["files"][0]["status"] == "skipped" + assert results["files"][0]["reason"] == "file unchanged" + finally: + temp_path.unlink() @pytest.mark.asyncio async def test_sync_audio_file_new(self, scanner_service): @@ -171,42 +175,42 @@ class TestSoundScannerService: scanner_service.sound_repo.create = AsyncMock(return_value=created_sound) # Mock file operations - scanner_service.get_file_hash = Mock(return_value="test_hash") - scanner_service.get_audio_duration = Mock(return_value=120000) # Duration in ms - scanner_service.get_file_size = Mock(return_value=1024) + with patch("app.services.sound_scanner.get_file_hash", return_value="test_hash"), \ + patch("app.services.sound_scanner.get_audio_duration", return_value=120000), \ + patch("app.services.sound_scanner.get_file_size", return_value=1024): - # Create a temporary file - with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: - temp_path = Path(f.name) + # Create a temporary file + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: + temp_path = Path(f.name) - try: - results = { - "scanned": 0, - "added": 0, - "updated": 0, - "deleted": 0, - "skipped": 0, - "errors": 0, - "files": [], - } - await scanner_service._sync_audio_file(temp_path, "SDB", None, results) + try: + results = { + "scanned": 0, + "added": 0, + "updated": 0, + "deleted": 0, + "skipped": 0, + "errors": 0, + "files": [], + } + await scanner_service._sync_audio_file(temp_path, "SDB", None, results) - assert results["added"] == 1 - assert results["skipped"] == 0 - assert results["updated"] == 0 - assert len(results["files"]) == 1 - assert results["files"][0]["status"] == "added" + assert results["added"] == 1 + assert results["skipped"] == 0 + assert results["updated"] == 0 + assert len(results["files"]) == 1 + assert results["files"][0]["status"] == "added" - # Verify sound_repo.create was called with correct data - call_args = scanner_service.sound_repo.create.call_args[0][0] - assert call_args["type"] == "SDB" - assert call_args["filename"] == temp_path.name - assert call_args["duration"] == 120000 # Duration in ms - assert call_args["size"] == 1024 - assert call_args["hash"] == "test_hash" - assert call_args["is_deletable"] is False # SDB sounds are not deletable - finally: - temp_path.unlink() + # Verify sound_repo.create was called with correct data + call_args = scanner_service.sound_repo.create.call_args[0][0] + assert call_args["type"] == "SDB" + assert call_args["filename"] == temp_path.name + assert call_args["duration"] == 120000 # Duration in ms + assert call_args["size"] == 1024 + assert call_args["hash"] == "test_hash" + assert call_args["is_deletable"] is False # SDB sounds are not deletable + finally: + temp_path.unlink() @pytest.mark.asyncio async def test_sync_audio_file_updated(self, scanner_service): @@ -225,44 +229,44 @@ class TestSoundScannerService: scanner_service.sound_repo.update = AsyncMock(return_value=existing_sound) # Mock file operations to return new values - scanner_service.get_file_hash = Mock(return_value="new_hash") - scanner_service.get_audio_duration = Mock(return_value=120000) # New duration - scanner_service.get_file_size = Mock(return_value=1024) # New size + with patch("app.services.sound_scanner.get_file_hash", return_value="new_hash"), \ + patch("app.services.sound_scanner.get_audio_duration", return_value=120000), \ + patch("app.services.sound_scanner.get_file_size", return_value=1024): - # Create a temporary file - with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: - temp_path = Path(f.name) + # Create a temporary file + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: + temp_path = Path(f.name) - try: - results = { - "scanned": 0, - "added": 0, - "updated": 0, - "deleted": 0, - "skipped": 0, - "errors": 0, - "files": [], - } - await scanner_service._sync_audio_file( - temp_path, "SDB", existing_sound, results - ) + try: + results = { + "scanned": 0, + "added": 0, + "updated": 0, + "deleted": 0, + "skipped": 0, + "errors": 0, + "files": [], + } + await scanner_service._sync_audio_file( + temp_path, "SDB", existing_sound, results + ) - assert results["updated"] == 1 - assert results["added"] == 0 - assert results["skipped"] == 0 - assert len(results["files"]) == 1 - assert results["files"][0]["status"] == "updated" - assert results["files"][0]["reason"] == "file was modified" + assert results["updated"] == 1 + assert results["added"] == 0 + assert results["skipped"] == 0 + assert len(results["files"]) == 1 + assert results["files"][0]["status"] == "updated" + assert results["files"][0]["reason"] == "file was modified" - # Verify sound_repo.update was called with correct data - call_args = scanner_service.sound_repo.update.call_args[0][1] # update_data - assert call_args["duration"] == 120000 - assert call_args["size"] == 1024 - assert call_args["hash"] == "new_hash" - # Name is extracted from temp filename, should be capitalized - assert call_args["name"].endswith("mp3") is False # Should be cleaned - finally: - temp_path.unlink() + # Verify sound_repo.update was called with correct data + call_args = scanner_service.sound_repo.update.call_args[0][1] # update_data + assert call_args["duration"] == 120000 + assert call_args["size"] == 1024 + assert call_args["hash"] == "new_hash" + # Name is extracted from temp filename, should be capitalized + assert call_args["name"].endswith("mp3") is False # Should be cleaned + finally: + temp_path.unlink() @pytest.mark.asyncio async def test_sync_audio_file_custom_type(self, scanner_service): @@ -279,40 +283,40 @@ class TestSoundScannerService: scanner_service.sound_repo.create = AsyncMock(return_value=created_sound) # Mock file operations - scanner_service.get_file_hash = Mock(return_value="custom_hash") - scanner_service.get_audio_duration = Mock(return_value=60000) # Duration in ms - scanner_service.get_file_size = Mock(return_value=2048) + with patch("app.services.sound_scanner.get_file_hash", return_value="custom_hash"), \ + patch("app.services.sound_scanner.get_audio_duration", return_value=60000), \ + patch("app.services.sound_scanner.get_file_size", return_value=2048): - # Create a temporary file - with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: - temp_path = Path(f.name) + # Create a temporary file + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: + temp_path = Path(f.name) - try: - results = { - "scanned": 0, - "added": 0, - "updated": 0, - "deleted": 0, - "skipped": 0, - "errors": 0, - "files": [], - } - await scanner_service._sync_audio_file(temp_path, "CUSTOM", None, results) + try: + results = { + "scanned": 0, + "added": 0, + "updated": 0, + "deleted": 0, + "skipped": 0, + "errors": 0, + "files": [], + } + await scanner_service._sync_audio_file(temp_path, "CUSTOM", None, results) - assert results["added"] == 1 - assert results["skipped"] == 0 - assert len(results["files"]) == 1 - assert results["files"][0]["status"] == "added" + assert results["added"] == 1 + assert results["skipped"] == 0 + assert len(results["files"]) == 1 + assert results["files"][0]["status"] == "added" - # Verify sound_repo.create was called with correct data for custom type - call_args = scanner_service.sound_repo.create.call_args[0][0] - assert call_args["type"] == "CUSTOM" - assert call_args["filename"] == temp_path.name - assert call_args["duration"] == 60000 # Duration in ms - assert call_args["size"] == 2048 - assert call_args["hash"] == "custom_hash" - assert ( - call_args["is_deletable"] is False - ) # All sounds are set to not deletable - finally: - temp_path.unlink() + # Verify sound_repo.create was called with correct data for custom type + call_args = scanner_service.sound_repo.create.call_args[0][0] + assert call_args["type"] == "CUSTOM" + assert call_args["filename"] == temp_path.name + assert call_args["duration"] == 60000 # Duration in ms + assert call_args["size"] == 2048 + assert call_args["hash"] == "custom_hash" + assert ( + call_args["is_deletable"] is False + ) # All sounds are set to not deletable + finally: + temp_path.unlink() diff --git a/tests/utils/test_audio.py b/tests/utils/test_audio.py new file mode 100644 index 0000000..01e7d4c --- /dev/null +++ b/tests/utils/test_audio.py @@ -0,0 +1,292 @@ +"""Tests for audio utility functions.""" + +import hashlib +import tempfile +from pathlib import Path +from unittest.mock import patch + +import pytest + +from app.utils.audio import get_audio_duration, get_file_hash, get_file_size + + +class TestAudioUtils: + """Test audio utility functions.""" + + def test_get_file_hash(self): + """Test file hash calculation.""" + # Create a temporary file with known content + test_content = "test content for hashing" + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write(test_content) + temp_path = Path(f.name) + + try: + # Calculate hash using our function + result_hash = get_file_hash(temp_path) + + # Calculate expected hash manually + expected_hash = hashlib.sha256(test_content.encode()).hexdigest() + + # Verify the hash is correct + assert result_hash == expected_hash + assert len(result_hash) == 64 # SHA-256 hash length + assert isinstance(result_hash, str) + + finally: + temp_path.unlink() + + def test_get_file_hash_binary_content(self): + """Test file hash calculation with binary content.""" + # Create a temporary file with binary content + test_bytes = b"\x00\x01\x02\x03\xFF\xFE\xFD" + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(test_bytes) + temp_path = Path(f.name) + + try: + # Calculate hash using our function + result_hash = get_file_hash(temp_path) + + # Calculate expected hash manually + expected_hash = hashlib.sha256(test_bytes).hexdigest() + + # Verify the hash is correct + assert result_hash == expected_hash + assert len(result_hash) == 64 # SHA-256 hash length + assert isinstance(result_hash, str) + + finally: + temp_path.unlink() + + def test_get_file_hash_empty_file(self): + """Test file hash calculation for empty file.""" + # Create an empty temporary file + with tempfile.NamedTemporaryFile(delete=False) as f: + temp_path = Path(f.name) + + try: + # Calculate hash using our function + result_hash = get_file_hash(temp_path) + + # Calculate expected hash for empty content + expected_hash = hashlib.sha256(b"").hexdigest() + + # Verify the hash is correct + assert result_hash == expected_hash + assert len(result_hash) == 64 # SHA-256 hash length + assert isinstance(result_hash, str) + + finally: + temp_path.unlink() + + def test_get_file_hash_large_file(self): + """Test file hash calculation for large file (tests chunked reading).""" + # Create a large temporary file (larger than 4096 bytes chunk size) + test_content = "A" * 10000 # 10KB of 'A' characters + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write(test_content) + temp_path = Path(f.name) + + try: + # Calculate hash using our function + result_hash = get_file_hash(temp_path) + + # Calculate expected hash manually + expected_hash = hashlib.sha256(test_content.encode()).hexdigest() + + # Verify the hash is correct + assert result_hash == expected_hash + assert len(result_hash) == 64 # SHA-256 hash length + assert isinstance(result_hash, str) + + finally: + temp_path.unlink() + + def test_get_file_size(self): + """Test file size calculation.""" + # Create a temporary file with known content + test_content = "test content for size calculation" + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write(test_content) + temp_path = Path(f.name) + + try: + # Get size using our function + result_size = get_file_size(temp_path) + + # Get expected size using pathlib directly + expected_size = temp_path.stat().st_size + + # Verify the size is correct + assert result_size == expected_size + assert result_size > 0 + assert isinstance(result_size, int) + + finally: + temp_path.unlink() + + def test_get_file_size_empty_file(self): + """Test file size calculation for empty file.""" + # Create an empty temporary file + with tempfile.NamedTemporaryFile(delete=False) as f: + temp_path = Path(f.name) + + try: + # Get size using our function + result_size = get_file_size(temp_path) + + # Verify the size is zero + assert result_size == 0 + assert isinstance(result_size, int) + + finally: + temp_path.unlink() + + def test_get_file_size_binary_file(self): + """Test file size calculation for binary file.""" + # Create a temporary file with binary content + test_bytes = b"\x00\x01\x02\x03\xFF\xFE\xFD" * 100 # 700 bytes + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f: + f.write(test_bytes) + temp_path = Path(f.name) + + try: + # Get size using our function + result_size = get_file_size(temp_path) + + # Verify the size is correct + assert result_size == len(test_bytes) + assert result_size == 700 + assert isinstance(result_size, int) + + finally: + temp_path.unlink() + + @patch("app.utils.audio.ffmpeg.probe") + def test_get_audio_duration_success(self, mock_probe): + """Test successful audio duration extraction.""" + # Mock ffmpeg.probe to return duration + mock_probe.return_value = {"format": {"duration": "123.456"}} + + temp_path = Path("/fake/path/test.mp3") + duration = get_audio_duration(temp_path) + + # Verify duration is converted correctly (seconds to milliseconds) + assert duration == 123456 # 123.456 seconds * 1000 = 123456 ms + assert isinstance(duration, int) + mock_probe.assert_called_once_with(str(temp_path)) + + @patch("app.utils.audio.ffmpeg.probe") + def test_get_audio_duration_integer_duration(self, mock_probe): + """Test audio duration extraction with integer duration.""" + # Mock ffmpeg.probe to return integer duration + mock_probe.return_value = {"format": {"duration": "60"}} + + temp_path = Path("/fake/path/test.wav") + duration = get_audio_duration(temp_path) + + # Verify duration is converted correctly + assert duration == 60000 # 60 seconds * 1000 = 60000 ms + assert isinstance(duration, int) + mock_probe.assert_called_once_with(str(temp_path)) + + @patch("app.utils.audio.ffmpeg.probe") + def test_get_audio_duration_zero_duration(self, mock_probe): + """Test audio duration extraction with zero duration.""" + # Mock ffmpeg.probe to return zero duration + mock_probe.return_value = {"format": {"duration": "0.0"}} + + temp_path = Path("/fake/path/silent.mp3") + duration = get_audio_duration(temp_path) + + # Verify duration is zero + assert duration == 0 + assert isinstance(duration, int) + mock_probe.assert_called_once_with(str(temp_path)) + + @patch("app.utils.audio.ffmpeg.probe") + def test_get_audio_duration_fractional_duration(self, mock_probe): + """Test audio duration extraction with fractional seconds.""" + # Mock ffmpeg.probe to return fractional duration + mock_probe.return_value = {"format": {"duration": "45.123"}} + + temp_path = Path("/fake/path/test.flac") + duration = get_audio_duration(temp_path) + + # Verify duration is converted and rounded correctly + assert duration == 45123 # 45.123 seconds * 1000 = 45123 ms + assert isinstance(duration, int) + mock_probe.assert_called_once_with(str(temp_path)) + + @patch("app.utils.audio.ffmpeg.probe") + def test_get_audio_duration_ffmpeg_error(self, mock_probe): + """Test audio duration extraction when ffmpeg fails.""" + # Mock ffmpeg.probe to raise an exception + mock_probe.side_effect = Exception("FFmpeg error: file not found") + + temp_path = Path("/fake/path/nonexistent.mp3") + duration = get_audio_duration(temp_path) + + # Verify duration defaults to 0 on error + assert duration == 0 + assert isinstance(duration, int) + mock_probe.assert_called_once_with(str(temp_path)) + + @patch("app.utils.audio.ffmpeg.probe") + def test_get_audio_duration_missing_format(self, mock_probe): + """Test audio duration extraction when format info is missing.""" + # Mock ffmpeg.probe to return data without format info + mock_probe.return_value = {"streams": []} + + temp_path = Path("/fake/path/corrupt.mp3") + duration = get_audio_duration(temp_path) + + # Verify duration defaults to 0 when format info is missing + assert duration == 0 + assert isinstance(duration, int) + mock_probe.assert_called_once_with(str(temp_path)) + + @patch("app.utils.audio.ffmpeg.probe") + def test_get_audio_duration_missing_duration(self, mock_probe): + """Test audio duration extraction when duration is missing.""" + # Mock ffmpeg.probe to return format without duration + mock_probe.return_value = {"format": {"size": "1024"}} + + temp_path = Path("/fake/path/noduration.mp3") + duration = get_audio_duration(temp_path) + + # Verify duration defaults to 0 when duration is missing + assert duration == 0 + assert isinstance(duration, int) + mock_probe.assert_called_once_with(str(temp_path)) + + @patch("app.utils.audio.ffmpeg.probe") + def test_get_audio_duration_invalid_duration(self, mock_probe): + """Test audio duration extraction with invalid duration value.""" + # Mock ffmpeg.probe to return invalid duration + mock_probe.return_value = {"format": {"duration": "invalid"}} + + temp_path = Path("/fake/path/invalid.mp3") + duration = get_audio_duration(temp_path) + + # Verify duration defaults to 0 when duration is invalid + assert duration == 0 + assert isinstance(duration, int) + mock_probe.assert_called_once_with(str(temp_path)) + + def test_get_file_hash_nonexistent_file(self): + """Test file hash calculation for nonexistent file.""" + nonexistent_path = Path("/fake/nonexistent/file.mp3") + + # Should raise FileNotFoundError for nonexistent file + with pytest.raises(FileNotFoundError): + get_file_hash(nonexistent_path) + + def test_get_file_size_nonexistent_file(self): + """Test file size calculation for nonexistent file.""" + nonexistent_path = Path("/fake/nonexistent/file.mp3") + + # Should raise FileNotFoundError for nonexistent file + with pytest.raises(FileNotFoundError): + get_file_size(nonexistent_path) \ No newline at end of file