-Add model instance pooling, better concurrency

-Add load testing setup with Locust
This commit is contained in:
remsky 2025-02-09 23:03:16 -07:00
parent 09b7c2cf1e
commit e5e85b32d2
11 changed files with 737 additions and 530 deletions

View file

@ -22,6 +22,9 @@ class PyTorchConfig(BaseModel):
memory_threshold: float = Field(0.8, description="Memory threshold for cleanup")
retry_on_oom: bool = Field(True, description="Whether to retry on OOM errors")
max_concurrent_models: int = Field(2, description="Maximum number of concurrent model instances")
max_queue_size: int = Field(32, description="Maximum size of request queue")
chunk_semaphore_limit: int = Field(4, description="Maximum concurrent chunk processing per model")
class Config:
frozen = True

View file

@ -24,6 +24,11 @@ class KokoroV1(BaseModelBackend):
self._device = "cuda" if settings.use_gpu else "cpu"
self._model: Optional[KModel] = None
self._pipelines: Dict[str, KPipeline] = {} # Store pipelines by lang_code
self._stream: Optional[torch.cuda.Stream] = None
def set_stream(self, stream: torch.cuda.Stream) -> None:
"""Set CUDA stream for this instance."""
self._stream = stream
async def load_model(self, path: str) -> None:
"""Load pre-baked model.
@ -146,14 +151,27 @@ class KokoroV1(BaseModelBackend):
logger.debug(
f"Generating audio from tokens with lang_code '{pipeline_lang_code}': '{tokens[:100]}...'"
)
for result in pipeline.generate_from_tokens(
tokens=tokens, voice=voice_path, speed=speed, model=self._model
):
if result.audio is not None:
logger.debug(f"Got audio chunk with shape: {result.audio.shape}")
yield result.audio.numpy()
else:
logger.warning("No audio in chunk")
# Use CUDA stream if available
if self._stream and self._device == "cuda":
with torch.cuda.stream(self._stream):
for result in pipeline.generate_from_tokens(
tokens=tokens, voice=voice_path, speed=speed, model=self._model
):
if result.audio is not None:
logger.debug(f"Got audio chunk with shape: {result.audio.shape}")
yield result.audio.numpy()
else:
logger.warning("No audio in chunk")
else:
for result in pipeline.generate_from_tokens(
tokens=tokens, voice=voice_path, speed=speed, model=self._model
):
if result.audio is not None:
logger.debug(f"Got audio chunk with shape: {result.audio.shape}")
yield result.audio.numpy()
else:
logger.warning("No audio in chunk")
except Exception as e:
logger.error(f"Generation failed: {e}")
@ -239,14 +257,27 @@ class KokoroV1(BaseModelBackend):
logger.debug(
f"Generating audio for text with lang_code '{pipeline_lang_code}': '{text[:100]}...'"
)
for result in pipeline(
text, voice=voice_path, speed=speed, model=self._model
):
if result.audio is not None:
logger.debug(f"Got audio chunk with shape: {result.audio.shape}")
yield result.audio.numpy()
else:
logger.warning("No audio in chunk")
# Use CUDA stream if available
if self._stream and self._device == "cuda":
with torch.cuda.stream(self._stream):
for result in pipeline(
text, voice=voice_path, speed=speed, model=self._model
):
if result.audio is not None:
logger.debug(f"Got audio chunk with shape: {result.audio.shape}")
yield result.audio.numpy()
else:
logger.warning("No audio in chunk")
else:
for result in pipeline(
text, voice=voice_path, speed=speed, model=self._model
):
if result.audio is not None:
logger.debug(f"Got audio chunk with shape: {result.audio.shape}")
yield result.audio.numpy()
else:
logger.warning("No audio in chunk")
except Exception as e:
logger.error(f"Generation failed: {e}")

View file

@ -1,7 +1,10 @@
"""Kokoro V1 model management."""
from typing import Optional
import asyncio
import time
from typing import Dict, List, Optional, Tuple
import torch
from loguru import logger
from ..core import paths
@ -11,131 +14,47 @@ from .base import BaseModelBackend
from .kokoro_v1 import KokoroV1
class ModelManager:
"""Manages Kokoro V1 model loading and inference."""
class ModelInstance:
"""Individual model instance with its own CUDA stream."""
# Singleton instance
_instance = None
def __init__(self, config: Optional[ModelConfig] = None):
"""Initialize manager.
Args:
config: Optional model configuration override
"""
self._config = config or model_config
self._backend: Optional[KokoroV1] = None # Explicitly type as KokoroV1
def __init__(self, instance_id: int):
"""Initialize model instance."""
self.instance_id = instance_id
self._backend: Optional[KokoroV1] = None
self._device: Optional[str] = None
self._stream: Optional[torch.cuda.Stream] = None if not settings.use_gpu else torch.cuda.Stream()
self._in_use = False
self._last_used = 0.0
def _determine_device(self) -> str:
"""Determine device based on settings."""
return "cuda" if settings.use_gpu else "cpu"
@property
def is_available(self) -> bool:
"""Check if instance is available."""
return not self._in_use
async def initialize(self) -> None:
"""Initialize Kokoro V1 backend."""
"""Initialize model instance."""
try:
self._device = self._determine_device()
logger.info(f"Initializing Kokoro V1 on {self._device}")
self._device = "cuda" if settings.use_gpu else "cpu"
logger.info(f"Initializing Kokoro V1 instance {self.instance_id} on {self._device}")
self._backend = KokoroV1()
if self._stream:
self._backend.set_stream(self._stream)
except Exception as e:
raise RuntimeError(f"Failed to initialize Kokoro V1: {e}")
async def initialize_with_warmup(self, voice_manager) -> tuple[str, str, int]:
"""Initialize and warm up model.
Args:
voice_manager: Voice manager instance for warmup
Returns:
Tuple of (device, backend type, voice count)
Raises:
RuntimeError: If initialization fails
"""
import time
start = time.perf_counter()
try:
# Initialize backend
await self.initialize()
# Load model
model_path = self._config.pytorch_kokoro_v1_file
await self.load_model(model_path)
# Use paths module to get voice path
try:
voices = await paths.list_voices()
voice_path = await paths.get_voice_path(settings.default_voice)
# Warm up with short text
warmup_text = "Warmup text for initialization."
# Use default voice name for warmup
voice_name = settings.default_voice
logger.debug(f"Using default voice '{voice_name}' for warmup")
async for _ in self.generate(warmup_text, (voice_name, voice_path)):
pass
except Exception as e:
raise RuntimeError(f"Failed to get default voice: {e}")
ms = int((time.perf_counter() - start) * 1000)
logger.info(f"Warmup completed in {ms}ms")
return self._device, "kokoro_v1", len(voices)
except FileNotFoundError as e:
logger.error("""
Model files not found! You need to download the Kokoro V1 model:
1. Download model using the script:
python docker/scripts/download_model.py --output api/src/models/v1_0
2. Or set environment variable in docker-compose:
DOWNLOAD_MODEL=true
""")
exit(0)
except Exception as e:
raise RuntimeError(f"Warmup failed: {e}")
def get_backend(self) -> BaseModelBackend:
"""Get initialized backend.
Returns:
Initialized backend instance
Raises:
RuntimeError: If backend not initialized
"""
if not self._backend:
raise RuntimeError("Backend not initialized")
return self._backend
raise RuntimeError(f"Failed to initialize Kokoro V1 instance {self.instance_id}: {e}")
async def load_model(self, path: str) -> None:
"""Load model using initialized backend.
Args:
path: Path to model file
Raises:
RuntimeError: If loading fails
"""
"""Load model using initialized backend."""
if not self._backend:
raise RuntimeError("Backend not initialized")
try:
await self._backend.load_model(path)
except FileNotFoundError as e:
raise e
except Exception as e:
raise RuntimeError(f"Failed to load model: {e}")
raise RuntimeError(f"Failed to load model for instance {self.instance_id}: {e}")
async def generate(self, *args, **kwargs):
"""Generate audio using initialized backend.
Raises:
RuntimeError: If generation fails
"""
"""Generate audio using initialized backend."""
if not self._backend:
raise RuntimeError("Backend not initialized")
@ -143,18 +62,138 @@ Model files not found! You need to download the Kokoro V1 model:
async for chunk in self._backend.generate(*args, **kwargs):
yield chunk
except Exception as e:
raise RuntimeError(f"Generation failed: {e}")
raise RuntimeError(f"Generation failed for instance {self.instance_id}: {e}")
def unload_all(self) -> None:
def unload(self) -> None:
"""Unload model and free resources."""
if self._backend:
self._backend.unload()
self._backend = None
@property
def current_backend(self) -> str:
"""Get current backend type."""
return "kokoro_v1"
class ModelPool:
"""Pool of model instances."""
def __init__(self, max_instances: int):
"""Initialize model pool."""
self.max_instances = max_instances
self._instances: List[ModelInstance] = []
self._request_queue: asyncio.Queue = asyncio.Queue(maxsize=model_config.pytorch_gpu.max_queue_size)
self._lock = asyncio.Lock()
async def initialize(self) -> None:
"""Initialize model pool."""
async with self._lock:
for i in range(self.max_instances):
instance = ModelInstance(i)
await instance.initialize()
self._instances.append(instance)
async def get_instance(self) -> ModelInstance:
"""Get available model instance or wait for one."""
while True:
# Try to find an available instance
for instance in self._instances:
if instance.is_available:
instance._in_use = True
instance._last_used = time.time()
return instance
# If no instance is available, wait in queue
try:
await self._request_queue.put(asyncio.current_task())
await asyncio.sleep(0.1) # Small delay to prevent busy waiting
except asyncio.QueueFull:
raise RuntimeError("Request queue is full")
async def release_instance(self, instance: ModelInstance) -> None:
"""Release model instance back to pool."""
instance._in_use = False
# Process next request in queue if any
if not self._request_queue.empty():
waiting_task = await self._request_queue.get()
if not waiting_task.done():
waiting_task.set_result(None)
class ModelManager:
"""Manages Kokoro V1 model loading and inference."""
# Singleton instance
_instance = None
def __init__(self, config: Optional[ModelConfig] = None):
"""Initialize manager."""
self._config = config or model_config
self._pool: Optional[ModelPool] = None
self._chunk_semaphore = asyncio.Semaphore(self._config.pytorch_gpu.chunk_semaphore_limit)
async def initialize(self) -> None:
"""Initialize model pool."""
if not self._pool:
self._pool = ModelPool(self._config.pytorch_gpu.max_concurrent_models)
await self._pool.initialize()
async def initialize_with_warmup(self, voice_manager) -> tuple[str, str, int]:
"""Initialize and warm up model pool.
Args:
voice_manager: Voice manager instance for warmup
Returns:
Tuple of (device, backend type, voice count)
"""
try:
# Initialize pool
await self.initialize()
# Load model on all instances
model_path = self._config.pytorch_kokoro_v1_file
for instance in self._pool._instances:
await instance.load_model(model_path)
# Warm up first instance
instance = self._pool._instances[0]
try:
voices = await paths.list_voices()
voice_path = await paths.get_voice_path(settings.default_voice)
# Warm up with short text
warmup_text = "Warmup text for initialization."
voice_name = settings.default_voice
logger.debug(f"Using default voice '{voice_name}' for warmup")
async for _ in instance.generate(warmup_text, (voice_name, voice_path)):
pass
except Exception as e:
raise RuntimeError(f"Failed to get default voice: {e}")
return "cuda" if settings.use_gpu else "cpu", "kokoro_v1", len(voices)
except Exception as e:
raise RuntimeError(f"Warmup failed: {e}")
async def generate(self, *args, **kwargs):
"""Generate audio using model pool."""
if not self._pool:
raise RuntimeError("Model pool not initialized")
# Get available instance
instance = await self._pool.get_instance()
try:
async with self._chunk_semaphore:
async for chunk in instance.generate(*args, **kwargs):
yield chunk
finally:
# Release instance back to pool
await self._pool.release_instance(instance)
def unload_all(self) -> None:
"""Unload all models and free resources."""
if self._pool:
for instance in self._pool._instances:
instance.unload()
self._pool = None
async def get_manager(config: Optional[ModelConfig] = None) -> ModelManager:

View file

@ -143,59 +143,49 @@ async def get_system_info():
}
@router.get("/debug/session_pools")
async def get_session_pool_info():
"""Get information about ONNX session pools."""
@router.get("/debug/model_pool")
async def get_model_pool_info():
"""Get information about model pool status."""
from ..inference.model_manager import get_manager
manager = await get_manager()
pools = manager._session_pools
current_time = time.time()
pool_info = {}
if not manager._pool:
return {"status": "Model pool not initialized"}
# Get CPU pool info
if "onnx_cpu" in pools:
cpu_pool = pools["onnx_cpu"]
pool_info["cpu"] = {
"active_sessions": len(cpu_pool._sessions),
"max_sessions": cpu_pool._max_size,
"sessions": [
{"model": path, "age_seconds": current_time - info.last_used}
for path, info in cpu_pool._sessions.items()
],
pool_info = {
"max_instances": manager._pool.max_instances,
"active_instances": len(manager._pool._instances),
"queue_size": manager._pool._request_queue.qsize(),
"max_queue_size": manager._pool._request_queue.maxsize,
"instances": []
}
# Get instance info
for instance in manager._pool._instances:
instance_info = {
"id": instance.instance_id,
"in_use": instance._in_use,
"device": instance._device,
"has_stream": instance._stream is not None,
"last_used": current_time - instance._last_used if instance._last_used > 0 else None
}
pool_info["instances"].append(instance_info)
# Get GPU pool info
if "onnx_gpu" in pools:
gpu_pool = pools["onnx_gpu"]
pool_info["gpu"] = {
"active_sessions": len(gpu_pool._sessions),
"max_streams": gpu_pool._max_size,
"available_streams": len(gpu_pool._available_streams),
"sessions": [
{
"model": path,
"age_seconds": current_time - info.last_used,
"stream_id": info.stream_id,
# Add GPU info if available
if GPU_AVAILABLE:
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0] # Assume first GPU
pool_info["gpu_memory"] = {
"total_mb": gpu.memoryTotal,
"used_mb": gpu.memoryUsed,
"free_mb": gpu.memoryFree,
"percent_used": (gpu.memoryUsed / gpu.memoryTotal) * 100,
}
for path, info in gpu_pool._sessions.items()
],
}
# Add GPU memory info if available
if GPU_AVAILABLE:
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0] # Assume first GPU
pool_info["gpu"]["memory"] = {
"total_mb": gpu.memoryTotal,
"used_mb": gpu.memoryUsed,
"free_mb": gpu.memoryFree,
"percent_used": (gpu.memoryUsed / gpu.memoryTotal) * 100,
}
except Exception:
pass
except Exception:
pass
return pool_info

View file

@ -23,14 +23,13 @@ from .text_processing.text_processor import process_text_chunk, smart_split
class TTSService:
"""Text-to-speech service."""
# Limit concurrent chunk processing
_chunk_semaphore = asyncio.Semaphore(4)
def __init__(self, output_dir: str = None):
"""Initialize service."""
self.output_dir = output_dir
self.model_manager = None
self._voice_manager = None
# Create request queue for global request management
self._request_queue = asyncio.Queue(maxsize=32)
@classmethod
async def create(cls, output_dir: str = None) -> "TTSService":
@ -54,93 +53,55 @@ class TTSService:
lang_code: Optional[str] = None,
) -> AsyncGenerator[Union[np.ndarray, bytes], None]:
"""Process tokens into audio."""
async with self._chunk_semaphore:
try:
# Handle stream finalization
if is_last:
# Skip format conversion for raw audio mode
if not output_format:
yield np.array([], dtype=np.float32)
return
result = await AudioService.convert_audio(
np.array([0], dtype=np.float32), # Dummy data for type checking
24000,
output_format,
is_first_chunk=False,
normalizer=normalizer,
is_last_chunk=True,
)
yield result
try:
# Handle stream finalization
if is_last:
# Skip format conversion for raw audio mode
if not output_format:
yield np.array([], dtype=np.float32)
return
# Skip empty chunks
if not tokens and not chunk_text:
return
result = await AudioService.convert_audio(
np.array([0], dtype=np.float32), # Dummy data for type checking
24000,
output_format,
is_first_chunk=False,
normalizer=normalizer,
is_last_chunk=True,
)
yield result
return
# Get backend
backend = self.model_manager.get_backend()
# Skip empty chunks
if not tokens and not chunk_text:
return
# Generate audio using pre-warmed model
if isinstance(backend, KokoroV1):
# For Kokoro V1, pass text and voice info with lang_code
async for chunk_audio in self.model_manager.generate(
chunk_text,
(voice_name, voice_path),
speed=speed,
lang_code=lang_code,
):
# For streaming, convert to bytes
if output_format:
try:
converted = await AudioService.convert_audio(
chunk_audio,
24000,
output_format,
is_first_chunk=is_first,
normalizer=normalizer,
is_last_chunk=is_last,
)
yield converted
except Exception as e:
logger.error(f"Failed to convert audio: {str(e)}")
else:
yield chunk_audio
# Generate audio using model pool
async for chunk_audio in self.model_manager.generate(
chunk_text,
(voice_name, voice_path),
speed=speed,
lang_code=lang_code,
):
# For streaming, convert to bytes
if output_format:
try:
converted = await AudioService.convert_audio(
chunk_audio,
24000,
output_format,
is_first_chunk=is_first,
normalizer=normalizer,
is_last_chunk=is_last,
)
yield converted
except Exception as e:
logger.error(f"Failed to convert audio: {str(e)}")
else:
# For legacy backends, load voice tensor
voice_tensor = await self._voice_manager.load_voice(
voice_name, device=backend.device
)
chunk_audio = await self.model_manager.generate(
tokens, voice_tensor, speed=speed
)
yield chunk_audio
if chunk_audio is None:
logger.error("Model generated None for audio chunk")
return
if len(chunk_audio) == 0:
logger.error("Model generated empty audio chunk")
return
# For streaming, convert to bytes
if output_format:
try:
converted = await AudioService.convert_audio(
chunk_audio,
24000,
output_format,
is_first_chunk=is_first,
normalizer=normalizer,
is_last_chunk=is_last,
)
yield converted
except Exception as e:
logger.error(f"Failed to convert audio: {str(e)}")
else:
yield chunk_audio
except Exception as e:
logger.error(f"Failed to process tokens: {str(e)}")
except Exception as e:
logger.error(f"Failed to process tokens: {str(e)}")
async def _get_voice_path(self, voice: str) -> Tuple[str, str]:
"""Get voice path, handling combined voices.
@ -228,9 +189,6 @@ class TTSService:
chunk_index = 0
try:
# Get backend
backend = self.model_manager.get_backend()
# Get voice path, handling combined voices
voice_name, voice_path = await self._get_voice_path(voice)
logger.debug(f"Using voice path: {voice_path}")
@ -310,248 +268,37 @@ class TTSService:
word_timestamps = []
try:
# Get backend and voice path
backend = self.model_manager.get_backend()
# Get voice path
voice_name, voice_path = await self._get_voice_path(voice)
if isinstance(backend, KokoroV1):
# Use provided lang_code or determine from voice name
pipeline_lang_code = lang_code if lang_code else voice[:1].lower()
logger.info(
f"Using lang_code '{pipeline_lang_code}' for voice '{voice_name}' in text chunking"
)
# Use provided lang_code or determine from voice name
pipeline_lang_code = lang_code if lang_code else voice[:1].lower()
logger.info(
f"Using lang_code '{pipeline_lang_code}' for voice '{voice_name}' in text chunking"
)
# Get pipelines from backend for proper device management
try:
# Initialize quiet pipeline for text chunking
text_chunks = []
current_offset = 0.0 # Track time offset for timestamps
logger.debug("Splitting text into chunks...")
# Use backend's pipeline management
for result in backend._get_pipeline(pipeline_lang_code)(text):
if result.graphemes and result.phonemes:
text_chunks.append((result.graphemes, result.phonemes))
logger.debug(f"Split text into {len(text_chunks)} chunks")
# Process each chunk
for chunk_idx, (chunk_text, chunk_phonemes) in enumerate(
text_chunks
):
logger.debug(
f"Processing chunk {chunk_idx + 1}/{len(text_chunks)}: '{chunk_text[:50]}...'"
)
# Use backend's pipeline for generation
for result in backend._get_pipeline(pipeline_lang_code)(
chunk_text, voice=voice_path, speed=speed
):
# Collect audio chunks
if result.audio is not None:
chunks.append(result.audio.numpy())
# Process timestamps for this chunk
if (
return_timestamps
and hasattr(result, "tokens")
and result.tokens
):
logger.debug(
f"Processing chunk timestamps with {len(result.tokens)} tokens"
)
if result.pred_dur is not None:
try:
# Join timestamps for this chunk's tokens
KPipeline.join_timestamps(
result.tokens, result.pred_dur
)
# Add timestamps with offset
for token in result.tokens:
if not all(
hasattr(token, attr)
for attr in [
"text",
"start_ts",
"end_ts",
]
):
continue
if not token.text or not token.text.strip():
continue
# Apply offset to timestamps
start_time = (
float(token.start_ts) + current_offset
)
end_time = (
float(token.end_ts) + current_offset
)
word_timestamps.append(
{
"word": str(token.text).strip(),
"start_time": start_time,
"end_time": end_time,
}
)
logger.debug(
f"Added timestamp for word '{token.text}': {start_time:.3f}s - {end_time:.3f}s"
)
# Update offset for next chunk based on pred_dur
chunk_duration = (
float(result.pred_dur.sum()) / 80
) # Convert frames to seconds
current_offset = max(
current_offset + chunk_duration, end_time
)
logger.debug(
f"Updated time offset to {current_offset:.3f}s"
)
except Exception as e:
logger.error(
f"Failed to process timestamps for chunk: {e}"
)
logger.debug(
f"Processing timestamps with pred_dur shape: {result.pred_dur.shape}"
)
try:
# Join timestamps for this chunk's tokens
KPipeline.join_timestamps(
result.tokens, result.pred_dur
)
logger.debug(
"Successfully joined timestamps for chunk"
)
except Exception as e:
logger.error(
f"Failed to join timestamps for chunk: {e}"
)
continue
# Convert tokens to timestamps
for token in result.tokens:
try:
# Skip tokens without required attributes
if not all(
hasattr(token, attr)
for attr in ["text", "start_ts", "end_ts"]
):
logger.debug(
f"Skipping token missing attributes: {dir(token)}"
)
continue
# Get and validate text
text = (
str(token.text).strip()
if token.text is not None
else ""
)
if not text:
logger.debug("Skipping empty token")
continue
# Get and validate timestamps
start_ts = getattr(token, "start_ts", None)
end_ts = getattr(token, "end_ts", None)
if start_ts is None or end_ts is None:
logger.debug(
f"Skipping token with None timestamps: {text}"
)
continue
# Convert timestamps to float
try:
start_time = float(start_ts)
end_time = float(end_ts)
except (TypeError, ValueError):
logger.debug(
f"Skipping token with invalid timestamps: {text}"
)
continue
# Add timestamp
word_timestamps.append(
{
"word": text,
"start_time": start_time,
"end_time": end_time,
}
)
logger.debug(
f"Added timestamp for word '{text}': {start_time:.3f}s - {end_time:.3f}s"
)
except Exception as e:
logger.warning(f"Error processing token: {e}")
continue
except Exception as e:
logger.error(f"Failed to process text with pipeline: {e}")
raise RuntimeError(f"Pipeline processing failed: {e}")
if not chunks:
raise ValueError("No audio chunks were generated successfully")
# Combine chunks
audio = np.concatenate(chunks) if len(chunks) > 1 else chunks[0]
processing_time = time.time() - start_time
if return_timestamps:
# Validate timestamps before returning
if not word_timestamps:
logger.warning("No valid timestamps were generated")
else:
# Sort timestamps by start time to ensure proper order
word_timestamps.sort(key=lambda x: x["start_time"])
# Validate timestamp sequence
for i in range(1, len(word_timestamps)):
prev = word_timestamps[i - 1]
curr = word_timestamps[i]
if curr["start_time"] < prev["end_time"]:
logger.warning(
f"Overlapping timestamps detected: '{prev['word']}' ({prev['start_time']:.3f}-{prev['end_time']:.3f}) and '{curr['word']}' ({curr['start_time']:.3f}-{curr['end_time']:.3f})"
)
logger.debug(
f"Returning {len(word_timestamps)} word timestamps"
)
logger.debug(
f"First timestamp: {word_timestamps[0]['word']} at {word_timestamps[0]['start_time']:.3f}s"
)
logger.debug(
f"Last timestamp: {word_timestamps[-1]['word']} at {word_timestamps[-1]['end_time']:.3f}s"
)
return audio, processing_time, word_timestamps
return audio, processing_time
else:
# For legacy backends
async for chunk in self.generate_audio_stream(
text,
voice,
speed, # Default to WAV for raw audio
# Process text in chunks
async for chunk_text, tokens in smart_split(text):
# Generate audio for chunk using model pool
async for chunk_audio in self.model_manager.generate(
chunk_text,
(voice_name, voice_path),
speed=speed,
lang_code=pipeline_lang_code,
):
if chunk is not None:
chunks.append(chunk)
chunks.append(chunk_audio)
if not chunks:
raise ValueError("No audio chunks were generated successfully")
if not chunks:
raise ValueError("No audio chunks were generated successfully")
# Combine chunks
audio = np.concatenate(chunks) if len(chunks) > 1 else chunks[0]
processing_time = time.time() - start_time
# Combine chunks
audio = np.concatenate(chunks) if len(chunks) > 1 else chunks[0]
processing_time = time.time() - start_time
if return_timestamps:
return (
audio,
processing_time,
[],
) # Empty timestamps for legacy backends
return audio, processing_time
# Return with timestamps if requested
if return_timestamps:
return audio, processing_time, word_timestamps
return audio, processing_time
except Exception as e:
logger.error(f"Error in audio generation: {str(e)}")
@ -589,44 +336,32 @@ class TTSService:
"""
start_time = time.time()
try:
# Get backend and voice path
backend = self.model_manager.get_backend()
# Get voice path
voice_name, voice_path = await self._get_voice_path(voice)
if isinstance(backend, KokoroV1):
# For Kokoro V1, use generate_from_tokens with raw phonemes
result = None
# Use provided lang_code or determine from voice name
pipeline_lang_code = lang_code if lang_code else voice[:1].lower()
logger.info(
f"Using lang_code '{pipeline_lang_code}' for voice '{voice_name}' in phoneme pipeline"
)
# Use provided lang_code or determine from voice name
pipeline_lang_code = lang_code if lang_code else voice[:1].lower()
logger.info(
f"Using lang_code '{pipeline_lang_code}' for voice '{voice_name}' in phoneme pipeline"
)
try:
# Use backend's pipeline management
for r in backend._get_pipeline(
pipeline_lang_code
).generate_from_tokens(
tokens=phonemes, # Pass raw phonemes string
voice=voice_path,
speed=speed,
):
if r.audio is not None:
result = r
break
except Exception as e:
logger.error(f"Failed to generate from phonemes: {e}")
raise RuntimeError(f"Phoneme generation failed: {e}")
# Generate audio using model pool
chunks = []
async for chunk_audio in self.model_manager.generate(
phonemes,
(voice_name, voice_path),
speed=speed,
lang_code=pipeline_lang_code,
):
chunks.append(chunk_audio)
if result is None or result.audio is None:
raise ValueError("No audio generated")
if not chunks:
raise ValueError("No audio generated")
processing_time = time.time() - start_time
return result.audio.numpy(), processing_time
else:
raise ValueError(
"Phoneme generation only supported with Kokoro V1 backend"
)
# Combine chunks
audio = np.concatenate(chunks) if len(chunks) > 1 else chunks[0]
processing_time = time.time() - start_time
return audio, processing_time
except Exception as e:
logger.error(f"Error in phoneme audio generation: {str(e)}")

View file

@ -13,7 +13,7 @@ Accept: application/json
### Get Session Pool Status
# Shows active ONNX sessions, CUDA stream usage, and session ages
# Useful for debugging resource exhaustion issues
GET http://localhost:8880/debug/session_pools
GET http://localhost:8880/debug/model_pool
Accept: application/json
### List Available Models

142
test_client/README.md Normal file
View file

@ -0,0 +1,142 @@
# Kokoro FastAPI Load Testing
This directory contains load testing scripts using Locust to test the Kokoro FastAPI server's performance under concurrent load.
## Docker Setup
The easiest way to run the tests is using Docker:
```bash
# Build the Docker image
docker build -t kokoro-locust .
# Run with web interface (default)
docker run -p 8089:8089 -e LOCUST_HOST=http://host.docker.internal:8880 kokoro-locust
# Run headless mode with specific parameters
docker run -e LOCUST_HOST=http://host.docker.internal:8880 \
-e LOCUST_HEADLESS=true \
-e LOCUST_USERS=10 \
-e LOCUST_SPAWN_RATE=1 \
-e LOCUST_RUN_TIME=5m \
kokoro-locust
```
### Environment Variables
- `LOCUST_HOST`: Target server URL (default: http://localhost:8880)
- `LOCUST_USERS`: Number of users to simulate (default: 10)
- `LOCUST_SPAWN_RATE`: Users to spawn per second (default: 1)
- `LOCUST_RUN_TIME`: Test duration (default: 5m)
- `LOCUST_HEADLESS`: Run without web UI if true (default: false)
### Accessing Results
- Web UI: http://localhost:8089 when running in web mode
- HTML Report: Generated in headless mode, copy from container:
```bash
docker cp <container_id>:/locust/report.html ./report.html
```
## Local Setup (Alternative)
If you prefer running without Docker:
1. Create a virtual environment and install requirements:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
```
2. Make sure your Kokoro FastAPI server is running (default: http://localhost:8880)
3. Run Locust:
```bash
# Web UI mode
locust -f locustfile.py --host http://localhost:8880
# Headless mode
locust -f locustfile.py --host http://localhost:8880 --users 10 --spawn-rate 1 --run-time 5m --headless
```
## Test Scenarios
The load test includes:
1. TTS endpoint testing with short phrases
2. Model pool monitoring
## Testing Different Configurations
To test with different numbers of model instances:
1. Set the model instance count in your server environment:
```bash
export PYTORCH_MAX_CONCURRENT_MODELS=2 # Adjust as needed
```
2. Restart your Kokoro FastAPI server
3. Run the load test with different user counts:
```bash
# Example: Test with 20 users
docker run -e LOCUST_HOST=http://host.docker.internal:8880 \
-e LOCUST_HEADLESS=true \
-e LOCUST_USERS=20 \
-e LOCUST_SPAWN_RATE=2 \
-e LOCUST_RUN_TIME=5m \
kokoro-locust
```
## Example Test Matrix
Test your server with different configurations:
| Model Instances | Concurrent Users | Expected Load |
|----------------|------------------|---------------|
| 1 | 5 | Light |
| 2 | 10 | Medium |
| 4 | 20 | Heavy |
## Quick Test Script
Here's a quick script to test multiple configurations:
```bash
#!/bin/bash
# Array of test configurations
configs=(
"1,5" # 1 instance, 5 users
"2,10" # 2 instances, 10 users
"4,20" # 4 instances, 20 users
)
for config in "${configs[@]}"; do
IFS=',' read -r instances users <<< "$config"
echo "Testing with $instances instances and $users users..."
# Set instance count on server (you'll need to implement this)
# ssh server "export PYTORCH_MAX_CONCURRENT_MODELS=$instances && restart_server"
# Run load test
docker run -e LOCUST_HOST=http://host.docker.internal:8880 \
-e LOCUST_HEADLESS=true \
-e LOCUST_USERS=$users \
-e LOCUST_SPAWN_RATE=1 \
-e LOCUST_RUN_TIME=5m \
kokoro-locust
echo "Waiting 30s before next test..."
sleep 30
done
```
## Tips
1. Start with low user counts and gradually increase
2. Monitor server resources during tests
3. Use the debug endpoint (/debug/model_pool) to monitor instance usage
4. Check server logs for any errors or bottlenecks
5. When using Docker, use `host.docker.internal` to access localhost

190
test_client/locustfile.py Normal file
View file

@ -0,0 +1,190 @@
from locust import HttpUser, task, between, events
import json
import time
class SystemStats:
def __init__(self):
self.queue_size = 0
self.active_instances = 0
self.gpu_memory_used = 0
self.cpu_percent = 0
self.memory_percent = 0
self.error_count = 0
self.last_error = None
system_stats = SystemStats()
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
@environment.web_ui.app.route("/system-stats")
def system_stats_page():
return {
"queue_size": system_stats.queue_size,
"active_instances": system_stats.active_instances,
"gpu_memory_used": system_stats.gpu_memory_used,
"cpu_percent": system_stats.cpu_percent,
"memory_percent": system_stats.memory_percent,
"error_count": system_stats.error_count,
"last_error": system_stats.last_error
}
class KokoroUser(HttpUser):
wait_time = between(2, 3) # Increased wait time to reduce load
def on_start(self):
"""Initialize test data."""
self.test_phrases = [
"Hello, how are you today?",
"The quick brown fox jumps over the lazy dog.",
"Testing voice synthesis with a short phrase.",
"I hope this works well!",
"Just a quick test of the system."
]
self.test_config = {
"model": "kokoro",
"voice": "af_nova",
"response_format": "mp3",
"speed": 1.0,
"stream": False
}
@task(1)
def test_tts_endpoint(self):
"""Test the TTS endpoint with short phrases."""
import random
test_text = random.choice(self.test_phrases)
payload = {
**self.test_config,
"input": test_text
}
with self.client.post(
"/v1/audio/speech",
json=payload,
catch_response=True,
name="/v1/audio/speech (short text)"
) as response:
try:
if response.status_code == 200:
response.success()
elif response.status_code == 429: # Too Many Requests
response.failure("Rate limit exceeded")
system_stats.error_count += 1
system_stats.last_error = "Rate limit exceeded"
elif response.status_code >= 500:
error_msg = f"Server error: {response.status_code}"
try:
error_data = response.json()
if 'detail' in error_data:
error_msg = f"Server error: {error_data['detail']}"
except:
pass
response.failure(error_msg)
system_stats.error_count += 1
system_stats.last_error = error_msg
else:
response.failure(f"Unexpected status: {response.status_code}")
system_stats.error_count += 1
system_stats.last_error = f"Unexpected status: {response.status_code}"
except Exception as e:
error_msg = f"Request failed: {str(e)}"
response.failure(error_msg)
system_stats.error_count += 1
system_stats.last_error = error_msg
@task(1) # Reduced monitoring frequency
def monitor_system(self):
"""Monitor system metrics via debug endpoints."""
# Get model pool stats
with self.client.get(
"/debug/model_pool",
catch_response=True,
name="Debug - Model Pool"
) as response:
if response.status_code == 200:
data = response.json()
system_stats.queue_size = data.get("queue_size", 0)
system_stats.active_instances = data.get("active_instances", 0)
if "gpu_memory" in data:
system_stats.gpu_memory_used = data["gpu_memory"]["used_mb"]
# Report metrics
self.environment.events.request.fire(
request_type="METRIC",
name="Queue Size",
response_time=system_stats.queue_size,
response_length=0,
exception=None
)
self.environment.events.request.fire(
request_type="METRIC",
name="Active Instances",
response_time=system_stats.active_instances,
response_length=0,
exception=None
)
if "gpu_memory" in data:
self.environment.events.request.fire(
request_type="METRIC",
name="GPU Memory (MB)",
response_time=system_stats.gpu_memory_used,
response_length=0,
exception=None
)
# Get system stats
with self.client.get(
"/debug/system",
catch_response=True,
name="Debug - System"
) as response:
if response.status_code == 200:
data = response.json()
system_stats.cpu_percent = data.get("cpu", {}).get("cpu_percent", 0)
system_stats.memory_percent = data.get("process", {}).get("memory_percent", 0)
# Report metrics
self.environment.events.request.fire(
request_type="METRIC",
name="CPU %",
response_time=system_stats.cpu_percent,
response_length=0,
exception=None
)
self.environment.events.request.fire(
request_type="METRIC",
name="Memory %",
response_time=system_stats.memory_percent,
response_length=0,
exception=None
)
# Add custom charts
@events.init_command_line_parser.add_listener
def init_parser(parser):
parser.add_argument(
'--custom-stats',
dest='custom_stats',
action='store_true',
help='Enable custom statistics in web UI'
)
# Stats processor
def process_stats():
stats = {
"Queue Size": system_stats.queue_size,
"Active Instances": system_stats.active_instances,
"GPU Memory (MB)": system_stats.gpu_memory_used,
"CPU %": system_stats.cpu_percent,
"Memory %": system_stats.memory_percent,
"Error Count": system_stats.error_count
}
return stats
@events.test_stop.add_listener
def on_test_stop(environment, **_kwargs):
print("\nFinal System Stats:")
for metric, value in process_stats().items():
print(f"{metric}: {value}")

View file

@ -0,0 +1,2 @@
locust==2.24.0
aiohttp==3.9.3

59
test_client/run_tests.sh Executable file
View file

@ -0,0 +1,59 @@
#!/bin/bash
# Build the Docker image if needed
if [[ "$(docker images -q kokoro-locust 2> /dev/null)" == "" ]]; then
echo "Building Kokoro Locust image..."
docker build -t kokoro-locust .
fi
# Array of test configurations: instances,users,spawn_rate,run_time
configs=(
"1,5,1,3m" # Light load: 1 instance, 5 users
"2,10,2,3m" # Medium load: 2 instances, 10 users
"4,20,2,3m" # Heavy load: 4 instances, 20 users
)
# Create results directory
mkdir -p test_results
timestamp=$(date +%Y%m%d_%H%M%S)
results_dir="test_results/run_${timestamp}"
mkdir -p "$results_dir"
# Run tests for each configuration
for config in "${configs[@]}"; do
IFS=',' read -r instances users spawn_rate run_time <<< "$config"
echo "----------------------------------------"
echo "Testing with configuration:"
echo "- Model instances: $instances"
echo "- Concurrent users: $users"
echo "- Spawn rate: $spawn_rate"
echo "- Run time: $run_time"
echo "----------------------------------------"
# Export instance count for the server (if running locally)
export PYTORCH_MAX_CONCURRENT_MODELS=$instances
# Run load test
docker run --rm \
-e LOCUST_HOST=http://host.docker.internal:8880 \
-e LOCUST_HEADLESS=true \
-e LOCUST_USERS=$users \
-e LOCUST_SPAWN_RATE=$spawn_rate \
-e LOCUST_RUN_TIME=$run_time \
--name kokoro-locust-test \
kokoro-locust
# Copy the report
test_name="instances${instances}_users${users}"
docker cp kokoro-locust-test:/locust/report.html "$results_dir/${test_name}_report.html"
echo "Test complete. Report saved to $results_dir/${test_name}_report.html"
echo "Waiting 30s before next test..."
sleep 30
done
echo "----------------------------------------"
echo "All tests complete!"
echo "Results saved in: $results_dir"
echo "----------------------------------------"

16
test_client/start.sh Normal file
View file

@ -0,0 +1,16 @@
#!/bin/bash
# If LOCUST_HEADLESS is true, run in headless mode with specified parameters
if [ "$LOCUST_HEADLESS" = "true" ]; then
locust -f locustfile.py \
--host ${LOCUST_HOST} \
--users ${LOCUST_USERS} \
--spawn-rate ${LOCUST_SPAWN_RATE} \
--run-time ${LOCUST_RUN_TIME} \
--headless \
--print-stats \
--html report.html
else
# Run with web interface
locust -f locustfile.py --host ${LOCUST_HOST}
fi