mirror of
https://github.com/remsky/Kokoro-FastAPI.git
synced 2025-04-13 09:39:17 +00:00
Fixed some tests
This commit is contained in:
parent
353fe79690
commit
1a03ac7464
2 changed files with 44 additions and 19 deletions
|
@ -6,7 +6,7 @@ import numpy as np
|
|||
import pytest
|
||||
|
||||
from api.src.services.audio import AudioNormalizer, AudioService
|
||||
|
||||
from api.src.inference.base import AudioChunk
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_settings():
|
||||
|
@ -31,10 +31,11 @@ async def test_convert_to_wav(sample_audio):
|
|||
"""Test converting to WAV format"""
|
||||
audio_data, sample_rate = sample_audio
|
||||
# Write and finalize in one step for WAV
|
||||
result = await AudioService.convert_audio(
|
||||
audio_data, sample_rate, "wav", is_first_chunk=True, is_last_chunk=True
|
||||
result, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(audio_data), sample_rate, "wav", is_first_chunk=True, is_last_chunk=True
|
||||
)
|
||||
assert isinstance(result, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
assert len(result) > 0
|
||||
# Check WAV header
|
||||
assert result.startswith(b"RIFF")
|
||||
|
@ -45,8 +46,11 @@ async def test_convert_to_wav(sample_audio):
|
|||
async def test_convert_to_mp3(sample_audio):
|
||||
"""Test converting to MP3 format"""
|
||||
audio_data, sample_rate = sample_audio
|
||||
result = await AudioService.convert_audio(audio_data, sample_rate, "mp3")
|
||||
result, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(audio_data), sample_rate, "mp3"
|
||||
)
|
||||
assert isinstance(result, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
assert len(result) > 0
|
||||
# Check MP3 header (ID3 or MPEG frame sync)
|
||||
assert result.startswith(b"ID3") or result.startswith(b"\xff\xfb")
|
||||
|
@ -56,8 +60,11 @@ async def test_convert_to_mp3(sample_audio):
|
|||
async def test_convert_to_opus(sample_audio):
|
||||
"""Test converting to Opus format"""
|
||||
audio_data, sample_rate = sample_audio
|
||||
result = await AudioService.convert_audio(audio_data, sample_rate, "opus")
|
||||
result, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(audio_data), sample_rate, "opus"
|
||||
)
|
||||
assert isinstance(result, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
assert len(result) > 0
|
||||
# Check OGG header
|
||||
assert result.startswith(b"OggS")
|
||||
|
@ -67,8 +74,11 @@ async def test_convert_to_opus(sample_audio):
|
|||
async def test_convert_to_flac(sample_audio):
|
||||
"""Test converting to FLAC format"""
|
||||
audio_data, sample_rate = sample_audio
|
||||
result = await AudioService.convert_audio(audio_data, sample_rate, "flac")
|
||||
result, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(audio_data), sample_rate, "flac"
|
||||
)
|
||||
assert isinstance(result, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
assert len(result) > 0
|
||||
# Check FLAC header
|
||||
assert result.startswith(b"fLaC")
|
||||
|
@ -78,8 +88,11 @@ async def test_convert_to_flac(sample_audio):
|
|||
async def test_convert_to_aac(sample_audio):
|
||||
"""Test converting to AAC format"""
|
||||
audio_data, sample_rate = sample_audio
|
||||
result = await AudioService.convert_audio(audio_data, sample_rate, "aac")
|
||||
result, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(audio_data), sample_rate, "aac"
|
||||
)
|
||||
assert isinstance(result, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
assert len(result) > 0
|
||||
# Check ADTS header (AAC)
|
||||
assert result.startswith(b"\xff\xf0") or result.startswith(b"\xff\xf1")
|
||||
|
@ -89,8 +102,11 @@ async def test_convert_to_aac(sample_audio):
|
|||
async def test_convert_to_pcm(sample_audio):
|
||||
"""Test converting to PCM format"""
|
||||
audio_data, sample_rate = sample_audio
|
||||
result = await AudioService.convert_audio(audio_data, sample_rate, "pcm")
|
||||
result, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(audio_data), sample_rate, "pcm"
|
||||
)
|
||||
assert isinstance(result, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
assert len(result) > 0
|
||||
# PCM is raw bytes, so no header to check
|
||||
|
||||
|
@ -110,10 +126,11 @@ async def test_normalization_wav(sample_audio):
|
|||
# Create audio data outside int16 range
|
||||
large_audio = audio_data * 1e5
|
||||
# Write and finalize in one step for WAV
|
||||
result = await AudioService.convert_audio(
|
||||
large_audio, sample_rate, "wav", is_first_chunk=True, is_last_chunk=True
|
||||
result, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(large_audio), sample_rate, "wav", is_first_chunk=True, is_last_chunk=True
|
||||
)
|
||||
assert isinstance(result, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
assert len(result) > 0
|
||||
|
||||
|
||||
|
@ -123,8 +140,11 @@ async def test_normalization_pcm(sample_audio):
|
|||
audio_data, sample_rate = sample_audio
|
||||
# Create audio data outside int16 range
|
||||
large_audio = audio_data * 1e5
|
||||
result = await AudioService.convert_audio(large_audio, sample_rate, "pcm")
|
||||
result, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(large_audio), sample_rate, "pcm"
|
||||
)
|
||||
assert isinstance(result, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
assert len(result) > 0
|
||||
|
||||
|
||||
|
@ -144,10 +164,11 @@ async def test_different_sample_rates(sample_audio):
|
|||
sample_rates = [8000, 16000, 44100, 48000]
|
||||
|
||||
for rate in sample_rates:
|
||||
result = await AudioService.convert_audio(
|
||||
audio_data, rate, "wav", is_first_chunk=True, is_last_chunk=True
|
||||
result, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(audio_data), rate, "wav", is_first_chunk=True, is_last_chunk=True
|
||||
)
|
||||
assert isinstance(result, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
assert len(result) > 0
|
||||
|
||||
|
||||
|
@ -156,11 +177,15 @@ async def test_buffer_position_after_conversion(sample_audio):
|
|||
"""Test that buffer position is reset after writing"""
|
||||
audio_data, sample_rate = sample_audio
|
||||
# Write and finalize in one step for first conversion
|
||||
result = await AudioService.convert_audio(
|
||||
audio_data, sample_rate, "wav", is_first_chunk=True, is_last_chunk=True
|
||||
result1, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(audio_data), sample_rate, "wav", is_first_chunk=True, is_last_chunk=True
|
||||
)
|
||||
assert isinstance(result1, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
# Convert again to ensure buffer was properly reset
|
||||
result2 = await AudioService.convert_audio(
|
||||
audio_data, sample_rate, "wav", is_first_chunk=True, is_last_chunk=True
|
||||
result2, audio_chunk = await AudioService.convert_audio(
|
||||
AudioChunk(audio_data), sample_rate, "wav", is_first_chunk=True, is_last_chunk=True
|
||||
)
|
||||
assert len(result) == len(result2)
|
||||
assert isinstance(result2, bytes)
|
||||
assert isinstance(audio_chunk, AudioChunk)
|
||||
assert len(result1) == len(result2)
|
||||
|
|
|
@ -159,7 +159,7 @@ async def test_stream_audio_chunks_client_disconnect():
|
|||
)
|
||||
|
||||
chunks = []
|
||||
async for chunk in stream_audio_chunks(mock_service, request, mock_request):
|
||||
async for chunk, _ in stream_audio_chunks(mock_service, request, mock_request):
|
||||
chunks.append(chunk)
|
||||
|
||||
assert len(chunks) == 0 # Should stop immediately due to disconnect
|
||||
|
|
Loading…
Add table
Reference in a new issue