Merge pull request #4 from remsky/feat/gradio-gui

Feat/gradio gui
This commit is contained in:
remsky 2025-01-01 21:52:10 -07:00 committed by GitHub
commit ec2e42b9b8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
40 changed files with 2081 additions and 307 deletions

BIN
.coverage

Binary file not shown.

View file

@ -1,5 +1,7 @@
[run]
source = api
source =
api
ui
omit =
Kokoro-82M/*
MagicMock/*

View file

@ -4,6 +4,10 @@ Notable changes to this project will be documented in this file.
## 2024-01-09
### Added
- Gradio Web Interface:
- Added simple web UI utility for audio generation from input or txt file
### Modified
#### Configuration Changes
- Updated Docker configurations:

131
README.md
View file

@ -3,42 +3,55 @@
</p>
# Kokoro TTS API
[![Tests](https://img.shields.io/badge/tests-37%20passed-darkgreen)]()
[![Coverage](https://img.shields.io/badge/coverage-81%25-darkgreen)]()
[![Tests](https://img.shields.io/badge/tests-81%20passed-darkgreen)]()
[![Coverage](https://img.shields.io/badge/coverage-76%25-darkgreen)]()
[![Tested at Model Commit](https://img.shields.io/badge/last--tested--model--commit-a67f113-blue)](https://huggingface.co/hexgrad/Kokoro-82M/tree/c3b0d86e2a980e027ef71c28819ea02e351c2667)
FastAPI wrapper for [Kokoro-82M](https://huggingface.co/hexgrad/Kokoro-82M) text-to-speech model, providing an OpenAI-compatible endpoint with:
Dockerized FastAPI wrapper for [Kokoro-82M](https://huggingface.co/hexgrad/Kokoro-82M) text-to-speech model
- OpenAI-compatible Speech endpoint, with voice combination functionality
- NVIDIA GPU accelerated inference (or CPU) option
- very fast generation time (~35x real time factor)
- automatic chunking/stitching for long texts
- very fast generation time (~35-49x RTF)
- simple audio generation web ui utility
## Quick Start
<details open>
<summary><b>OpenAI-Compatible Speech Endpoint</b></summary>
The service can be accessed through either the API endpoints or the Gradio web interface.
1. Install prerequisites:
- Install [Docker Desktop](https://www.docker.com/products/docker-desktop/)
- Install [Git](https://git-scm.com/downloads) (or download and extract zip)
- Install [Docker Desktop](https://www.docker.com/products/docker-desktop/) + [Git](https://git-scm.com/downloads)
- Clone and start the service:
```bash
git clone https://github.com/remsky/Kokoro-FastAPI.git
cd Kokoro-FastAPI
docker compose up --build
```
2. Run locally as an OpenAI-Compatible Speech Endpoint
```python
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8880",
api_key="not-needed"
)
2. Clone and start the service:
```bash
# Clone repository
git clone https://github.com/remsky/Kokoro-FastAPI.git
cd Kokoro-FastAPI
response = client.audio.speech.create(
model="kokoro",
voice="af_bella",
input="Hello world!",
response_format="mp3"
)
response.stream_to_file("output.mp3")
```
# For GPU acceleration (requires NVIDIA GPU):
docker compose up --build
or visit http://localhost:7860
<p align="center">
<img src="ui\GradioScreenShot.png" width="80%" alt="Voice Analysis Comparison" style="border: 2px solid #333; padding: 10px;">
</p>
</details>
<details>
<summary><b>OpenAI-Compatible Speech Endpoint</b></summary>
# For CPU-only deployment (~10x slower, but doesn't require an NVIDIA GPU):
docker compose -f docker-compose.cpu.yml up --build
```
Quick tests (run from another terminal):
```bash
# Test OpenAI Compatibility
python examples/test_openai_tts.py
# Test all available voices
python examples/test_all_voices.py
```
## OpenAI-Compatible API
```python
# Using OpenAI's Python library
from openai import OpenAI
@ -77,16 +90,26 @@ with open("output.mp3", "wb") as f:
f.write(response.content)
```
## Voice Combination
Quick tests (run from another terminal):
```bash
python examples/test_openai_tts.py # Test OpenAI Compatibility
python examples/test_all_voices.py # Test all available voices
```
</details>
<details>
<summary><b>Voice Combination</b></summary>
Combine voices and generate audio:
```python
import requests
response = requests.get("http://localhost:8880/v1/audio/voices")
voices = response.json()["voices"]
# Create combined voice (saved locally on server)
# Create combined voice (saves locally on server)
response = requests.post(
"http://localhost:8880/v1/audio/voices/combine",
json=["af_bella", "af_sarah"]
json=[voices[0], voices[1]]
)
combined_voice = response.json()["voice"]
@ -100,8 +123,27 @@ response = requests.post(
}
)
```
<p align="center">
<img src="examples/benchmarks/analysis_comparison.png" width="60%" alt="Voice Analysis Comparison" style="border: 2px solid #333; padding: 10px;">
</p>
</details>
## Performance Benchmarks
<details>
<summary><b>Gradio Web Utility</b></summary>
Access the interactive web UI at http://localhost:7860 after starting the service. Features include:
- Voice/format/speed selection
- Audio playback and download
- Text file or direct input
If you only want the API, just comment out everything in the docker-compose.yml under and including `gradio-ui`
Currently, voices created via the API are accessible here, but voice combination/creation has not yet been added
</details>
<details>
<summary><b>Performance Benchmarks</b></summary>
Benchmarking was performed on generation via the local API using text lengths up to feature-length books (~1.5 hours output), measuring processing time and realtime factor. Tests were run on:
- Windows 11 Home w/ WSL2
@ -119,10 +161,22 @@ Benchmarking was performed on generation via the local API using text lengths up
Key Performance Metrics:
- Realtime Factor: Ranges between 35-49x (generation time to output audio length)
- Average Processing Rate: 137.67 tokens/second (cl100k_base)
</details>
<details>
<summary><b>GPU Vs. CPU<b></summary>
## Features
```bash
# GPU: Requires NVIDIA GPU with CUDA 12.1 support
docker compose up --build
- OpenAI-compatible API endpoints
# CPU: ~10x slower than GPU inference
docker compose -f docker-compose.cpu.yml up --build
```
</details>
<details>
<summary><b>Features</b></summary>
- OpenAI-compatible API endpoints (with optional Gradio Web UI)
- GPU-accelerated inference (if desired)
- Multiple audio formats: mp3, wav, opus, flac, (aac & pcm not implemented)
- Natural Boundary Detection:
@ -131,19 +185,21 @@ Key Performance Metrics:
- Averages model weights of any existing voicepacks
- Saves generated voicepacks for future use
<p align="center">
<img src="examples/benchmarks/analysis_comparison.png" width="60%" alt="Voice Analysis Comparison" style="border: 2px solid #333; padding: 10px;">
</p>
*Note: CPU Inference is currently a very basic implementation, and not heavily tested*
</details>
## Model
<details open>
<summary><b>Model</b></summary>
This API uses the [Kokoro-82M](https://huggingface.co/hexgrad/Kokoro-82M) model from HuggingFace.
Visit the model page for more details about training, architecture, and capabilities. I have no affiliation with any of their work, and produced this wrapper for ease of use and personal projects.
</details>
## License
<details>
<summary><b>License</b></summary>
This project is licensed under the Apache License 2.0 - see below for details:
@ -152,3 +208,4 @@ This project is licensed under the Apache License 2.0 - see below for details:
- The inference code adapted from StyleTTS2 is MIT licensed
The full Apache 2.0 license text can be found at: https://www.apache.org/licenses/LICENSE-2.0
</details>

View file

@ -1,10 +1,10 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException, Response
from loguru import logger
from fastapi import Depends, Response, APIRouter, HTTPException
from ..services.audio import AudioService
from ..services.tts import TTSService
from ..services.audio import AudioService
from ..structures.schemas import OpenAISpeechRequest
router = APIRouter(
@ -32,7 +32,7 @@ async def create_speech(
raise ValueError(
f"Voice '{request.voice}' not found. Available voices: {', '.join(sorted(available_voices))}"
)
# Generate audio directly using TTSService's method
audio, _ = tts_service._generate_audio(
text=request.input,
@ -55,14 +55,12 @@ async def create_speech(
except ValueError as e:
logger.error(f"Invalid request: {str(e)}")
raise HTTPException(
status_code=400,
detail={"error": "Invalid request", "message": str(e)}
status_code=400, detail={"error": "Invalid request", "message": str(e)}
)
except Exception as e:
logger.error(f"Error generating speech: {str(e)}")
raise HTTPException(
status_code=500,
detail={"error": "Server error", "message": str(e)}
status_code=500, detail={"error": "Server error", "message": str(e)}
)
@ -78,17 +76,19 @@ async def list_voices(tts_service: TTSService = Depends(get_tts_service)):
@router.post("/audio/voices/combine")
async def combine_voices(request: List[str], tts_service: TTSService = Depends(get_tts_service)):
async def combine_voices(
request: List[str], tts_service: TTSService = Depends(get_tts_service)
):
"""Combine multiple voices into a new voice.
Args:
request: List of voice names to combine
Returns:
Dict with combined voice name and list of all available voices
Raises:
HTTPException:
HTTPException:
- 400: Invalid request (wrong number of voices, voice not found)
- 500: Server error (file system issues, combination failed)
"""
@ -96,24 +96,21 @@ async def combine_voices(request: List[str], tts_service: TTSService = Depends(g
combined_voice = tts_service.combine_voices(voices=request)
voices = tts_service.list_voices()
return {"voices": voices, "voice": combined_voice}
except ValueError as e:
logger.error(f"Invalid voice combination request: {str(e)}")
raise HTTPException(
status_code=400,
detail={"error": "Invalid request", "message": str(e)}
status_code=400, detail={"error": "Invalid request", "message": str(e)}
)
except RuntimeError as e:
logger.error(f"Server error during voice combination: {str(e)}")
raise HTTPException(
status_code=500,
detail={"error": "Server error", "message": str(e)}
status_code=500, detail={"error": "Server error", "message": str(e)}
)
except Exception as e:
logger.error(f"Unexpected error during voice combination: {str(e)}")
raise HTTPException(
status_code=500,
detail={"error": "Unexpected error", "message": str(e)}
status_code=500, detail={"error": "Unexpected error", "message": str(e)}
)

View file

@ -1,17 +1,16 @@
import io
import os
import re
import threading
import time
import threading
from typing import List, Tuple, Optional
import numpy as np
import scipy.io.wavfile as wavfile
import tiktoken
import torch
import tiktoken
import scipy.io.wavfile as wavfile
from kokoro import generate, tokenize, phonemize, normalize_text
from loguru import logger
from kokoro import generate, normalize_text, phonemize, tokenize
from models import build_model
from ..core.config import settings
@ -23,7 +22,7 @@ class TTSModel:
_instance = None
_device = None
_lock = threading.Lock()
# Directory for all voices (copied base voices, and any created combined voices)
VOICES_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "voices")
@ -38,10 +37,10 @@ class TTSModel:
model_path = os.path.join(settings.model_dir, settings.model_path)
model = build_model(model_path, cls._device)
cls._instance = model
# Ensure voices directory exists
os.makedirs(cls.VOICES_DIR, exist_ok=True)
# Copy base voices to local directory
base_voices_dir = os.path.join(settings.model_dir, settings.voices_dir)
if os.path.exists(base_voices_dir):
@ -51,25 +50,37 @@ class TTSModel:
voice_path = os.path.join(cls.VOICES_DIR, file)
if not os.path.exists(voice_path):
try:
logger.info(f"Copying base voice {voice_name} to voices directory")
logger.info(
f"Copying base voice {voice_name} to voices directory"
)
base_path = os.path.join(base_voices_dir, file)
voicepack = torch.load(base_path, map_location=cls._device, weights_only=True)
voicepack = torch.load(
base_path,
map_location=cls._device,
weights_only=True,
)
torch.save(voicepack, voice_path)
except Exception as e:
logger.error(f"Error copying voice {voice_name}: {str(e)}")
logger.error(
f"Error copying voice {voice_name}: {str(e)}"
)
# Warm up with default voice
try:
dummy_text = "Hello"
voice_path = os.path.join(cls.VOICES_DIR, "af.pt")
dummy_voicepack = torch.load(voice_path, map_location=cls._device, weights_only=True)
generate(model, dummy_text, dummy_voicepack, lang='a', speed=1.0)
dummy_voicepack = torch.load(
voice_path, map_location=cls._device, weights_only=True
)
generate(model, dummy_text, dummy_voicepack, lang="a", speed=1.0)
logger.info("Model warm-up complete")
except Exception as e:
logger.warning(f"Model warm-up failed: {e}")
# Count voices in directory for validation
voice_count = len([f for f in os.listdir(cls.VOICES_DIR) if f.endswith('.pt')])
voice_count = len(
[f for f in os.listdir(cls.VOICES_DIR) if f.endswith(".pt")]
)
return cls._instance, voice_count
@classmethod
@ -86,11 +97,11 @@ class TTSService:
self._ensure_voices()
if start_worker:
self.start_worker()
def _ensure_voices(self):
"""Copy base voices to local voices directory during initialization"""
os.makedirs(TTSModel.VOICES_DIR, exist_ok=True)
base_voices_dir = os.path.join(settings.model_dir, settings.voices_dir)
if os.path.exists(base_voices_dir):
for file in os.listdir(base_voices_dir):
@ -99,9 +110,15 @@ class TTSService:
voice_path = os.path.join(TTSModel.VOICES_DIR, file)
if not os.path.exists(voice_path):
try:
logger.info(f"Copying base voice {voice_name} to voices directory")
logger.info(
f"Copying base voice {voice_name} to voices directory"
)
base_path = os.path.join(base_voices_dir, file)
voicepack = torch.load(base_path, map_location=TTSModel._device, weights_only=True)
voicepack = torch.load(
base_path,
map_location=TTSModel._device,
weights_only=True,
)
torch.save(voicepack, voice_path)
except Exception as e:
logger.error(f"Error copying voice {voice_name}: {str(e)}")
@ -112,10 +129,10 @@ class TTSService:
def _get_voice_path(self, voice_name: str) -> Optional[str]:
"""Get the path to a voice file.
Args:
voice_name: Name of the voice to find
Returns:
Path to the voice file if found, None otherwise
"""
@ -141,7 +158,9 @@ class TTSService:
# Load model and voice
model = TTSModel._instance
voicepack = torch.load(voice_path, map_location=TTSModel._device, weights_only=True)
voicepack = torch.load(
voice_path, map_location=TTSModel._device, weights_only=True
)
# Generate audio with or without stitching
if stitch_long_output:
@ -152,11 +171,11 @@ class TTSService:
for i, chunk in enumerate(chunks):
try:
# Validate phonemization first
ps = phonemize(chunk, voice[0])
tokens = tokenize(ps)
logger.debug(
f"Processing chunk {i + 1}/{len(chunks)}: {len(tokens)} tokens"
)
# ps = phonemize(chunk, voice[0])
# tokens = tokenize(ps)
# logger.debug(
# f"Processing chunk {i + 1}/{len(chunks)}: {len(tokens)} tokens"
# )
# Only proceed if phonemization succeeded
chunk_audio, _ = generate(
@ -205,47 +224,51 @@ class TTSService:
def combine_voices(self, voices: List[str]) -> str:
"""Combine multiple voices into a new voice.
Args:
voices: List of voice names to combine
Returns:
Name of the combined voice
Raises:
ValueError: If less than 2 voices provided or voice loading fails
RuntimeError: If voice combination or saving fails
"""
if len(voices) < 2:
raise ValueError("At least 2 voices are required for combination")
# Load voices
t_voices: List[torch.Tensor] = []
v_name: List[str] = []
for voice in voices:
try:
voice_path = os.path.join(TTSModel.VOICES_DIR, f"{voice}.pt")
voicepack = torch.load(voice_path, map_location=TTSModel._device, weights_only=True)
voicepack = torch.load(
voice_path, map_location=TTSModel._device, weights_only=True
)
t_voices.append(voicepack)
v_name.append(voice)
except Exception as e:
raise ValueError(f"Failed to load voice {voice}: {str(e)}")
# Combine voices
try:
f: str = "_".join(v_name)
v = torch.mean(torch.stack(t_voices), dim=0)
combined_path = os.path.join(TTSModel.VOICES_DIR, f"{f}.pt")
# Save combined voice
try:
torch.save(v, combined_path)
except Exception as e:
raise RuntimeError(f"Failed to save combined voice to {combined_path}: {str(e)}")
raise RuntimeError(
f"Failed to save combined voice to {combined_path}: {str(e)}"
)
return f
except Exception as e:
if not isinstance(e, (ValueError, RuntimeError)):
raise RuntimeError(f"Error combining voices: {str(e)}")

View file

@ -17,8 +17,8 @@ class OpenAISpeechRequest(BaseModel):
model: Literal["tts-1", "tts-1-hd", "kokoro"] = "kokoro"
input: str = Field(..., description="The text to generate audio for")
voice: str = Field(
default="af",
description="The voice to use for generation. Can be a base voice or a combined voice name."
default="af",
description="The voice to use for generation. Can be a base voice or a combined voice name.",
)
response_format: Literal["mp3", "opus", "aac", "flac", "wav", "pcm"] = Field(
default="mp3",

View file

@ -1,16 +1,18 @@
import os
import shutil
import sys
import shutil
from unittest.mock import Mock, patch
import pytest
def cleanup_mock_dirs():
"""Clean up any MagicMock directories created during tests"""
mock_dir = "MagicMock"
if os.path.exists(mock_dir):
shutil.rmtree(mock_dir)
@pytest.fixture(autouse=True)
def cleanup():
"""Automatically clean up before and after each test"""
@ -18,6 +20,7 @@ def cleanup():
yield
cleanup_mock_dirs()
# Mock torch and other ML modules before they're imported
sys.modules["torch"] = Mock()
sys.modules["transformers"] = Mock()

View file

@ -1,6 +1,8 @@
"""Tests for AudioService"""
import numpy as np
import pytest
from api.src.services.audio import AudioService

View file

@ -114,9 +114,9 @@ def test_combine_voices_success(mock_tts_service):
"""Test successful voice combination"""
test_voices = ["af_bella", "af_sarah"]
mock_tts_service.combine_voices.return_value = "af_bella_af_sarah"
response = client.post("/v1/audio/voices/combine", json=test_voices)
assert response.status_code == 200
assert response.json()["voice"] == "af_bella_af_sarah"
mock_tts_service.combine_voices.assert_called_once_with(voices=test_voices)
@ -126,9 +126,9 @@ def test_combine_voices_single_voice(mock_tts_service):
"""Test combining single voice returns default voice"""
test_voices = ["af_bella"]
mock_tts_service.combine_voices.return_value = "af"
response = client.post("/v1/audio/voices/combine", json=test_voices)
assert response.status_code == 200
assert response.json()["voice"] == "af"
@ -137,9 +137,9 @@ def test_combine_voices_empty_list(mock_tts_service):
"""Test combining empty voice list returns default voice"""
test_voices = []
mock_tts_service.combine_voices.return_value = "af"
response = client.post("/v1/audio/voices/combine", json=test_voices)
assert response.status_code == 200
assert response.json()["voice"] == "af"
@ -148,8 +148,8 @@ def test_combine_voices_error(mock_tts_service):
"""Test error handling in voice combination"""
test_voices = ["af_bella", "af_sarah"]
mock_tts_service.combine_voices.side_effect = Exception("Combination failed")
response = client.post("/v1/audio/voices/combine", json=test_voices)
assert response.status_code == 500
assert "Combination failed" in response.json()["detail"]["message"]

View file

@ -1,7 +1,10 @@
"""Tests for FastAPI application"""
from unittest.mock import MagicMock, patch
import pytest
from unittest.mock import patch, MagicMock
from fastapi.testclient import TestClient
from api.src.main import app, lifespan
@ -19,98 +22,100 @@ def test_health_check(test_client):
@pytest.mark.asyncio
@patch('api.src.main.TTSModel')
@patch('api.src.main.logger')
@patch("api.src.main.TTSModel")
@patch("api.src.main.logger")
async def test_lifespan_successful_warmup(mock_logger, mock_tts_model):
"""Test successful model warmup in lifespan"""
# Mock the model initialization with model info and voicepack count
mock_model = MagicMock()
# Mock file system for voice counting
mock_tts_model.VOICES_DIR = "/mock/voices"
with patch('os.listdir', return_value=['voice1.pt', 'voice2.pt', 'voice3.pt']):
with patch("os.listdir", return_value=["voice1.pt", "voice2.pt", "voice3.pt"]):
mock_tts_model.initialize.return_value = (mock_model, 3) # 3 voice files
mock_tts_model._device = "cuda" # Set device class variable
# Create an async generator from the lifespan context manager
async_gen = lifespan(MagicMock())
# Start the context manager
await async_gen.__aenter__()
# Verify the expected logging sequence
mock_logger.info.assert_any_call("Loading TTS model and voice packs...")
mock_logger.info.assert_any_call("Model loaded and warmed up on cuda")
mock_logger.info.assert_any_call("3 voice packs loaded successfully")
# Verify model initialization was called
mock_tts_model.initialize.assert_called_once()
# Clean up
await async_gen.__aexit__(None, None, None)
@pytest.mark.asyncio
@patch('api.src.main.TTSModel')
@patch('api.src.main.logger')
@patch("api.src.main.TTSModel")
@patch("api.src.main.logger")
async def test_lifespan_failed_warmup(mock_logger, mock_tts_model):
"""Test failed model warmup in lifespan"""
# Mock the model initialization to fail
mock_tts_model.initialize.side_effect = Exception("Failed to initialize model")
# Create an async generator from the lifespan context manager
async_gen = lifespan(MagicMock())
# Verify the exception is raised
with pytest.raises(Exception, match="Failed to initialize model"):
await async_gen.__aenter__()
# Verify the expected logging sequence
mock_logger.info.assert_called_with("Loading TTS model and voice packs...")
# Clean up
await async_gen.__aexit__(None, None, None)
@pytest.mark.asyncio
@patch('api.src.main.TTSModel')
@patch("api.src.main.TTSModel")
async def test_lifespan_cuda_warmup(mock_tts_model):
"""Test model warmup specifically on CUDA"""
# Mock the model initialization with CUDA and voicepacks
mock_model = MagicMock()
# Mock file system for voice counting
mock_tts_model.VOICES_DIR = "/mock/voices"
with patch('os.listdir', return_value=['voice1.pt', 'voice2.pt']):
with patch("os.listdir", return_value=["voice1.pt", "voice2.pt"]):
mock_tts_model.initialize.return_value = (mock_model, 2) # 2 voice files
mock_tts_model._device = "cuda" # Set device class variable
# Create an async generator from the lifespan context manager
async_gen = lifespan(MagicMock())
await async_gen.__aenter__()
# Verify model was initialized
mock_tts_model.initialize.assert_called_once()
# Clean up
await async_gen.__aexit__(None, None, None)
@pytest.mark.asyncio
@patch('api.src.main.TTSModel')
@patch("api.src.main.TTSModel")
async def test_lifespan_cpu_fallback(mock_tts_model):
"""Test model warmup falling back to CPU"""
# Mock the model initialization with CPU and voicepacks
mock_model = MagicMock()
# Mock file system for voice counting
mock_tts_model.VOICES_DIR = "/mock/voices"
with patch('os.listdir', return_value=['voice1.pt', 'voice2.pt', 'voice3.pt', 'voice4.pt']):
with patch(
"os.listdir", return_value=["voice1.pt", "voice2.pt", "voice3.pt", "voice4.pt"]
):
mock_tts_model.initialize.return_value = (mock_model, 4) # 4 voice files
mock_tts_model._device = "cpu" # Set device class variable
# Create an async generator from the lifespan context manager
async_gen = lifespan(MagicMock())
await async_gen.__aenter__()
# Verify model was initialized
mock_tts_model.initialize.assert_called_once()
# Clean up
await async_gen.__aexit__(None, None, None)

View file

@ -1,9 +1,12 @@
"""Tests for TTSService"""
import os
from unittest.mock import MagicMock, call, patch
import numpy as np
import pytest
from unittest.mock import patch, MagicMock, call
from api.src.services.tts import TTSService, TTSModel
from api.src.services.tts import TTSModel, TTSService
@pytest.fixture
@ -50,42 +53,59 @@ def test_audio_to_bytes(tts_service, sample_audio):
assert len(audio_bytes) > 0
@patch('os.listdir')
@patch('os.path.join')
@patch("os.listdir")
@patch("os.path.join")
def test_list_voices(mock_join, mock_listdir, tts_service):
"""Test listing available voices"""
mock_listdir.return_value = ['voice1.pt', 'voice2.pt', 'not_a_voice.txt']
mock_join.return_value = '/fake/path'
mock_listdir.return_value = ["voice1.pt", "voice2.pt", "not_a_voice.txt"]
mock_join.return_value = "/fake/path"
voices = tts_service.list_voices()
assert len(voices) == 2
assert 'voice1' in voices
assert 'voice2' in voices
assert 'not_a_voice' not in voices
assert "voice1" in voices
assert "voice2" in voices
assert "not_a_voice" not in voices
@patch('api.src.services.tts.TTSModel.get_instance')
@patch('api.src.services.tts.TTSModel.get_voicepack')
@patch('api.src.services.tts.normalize_text')
@patch('api.src.services.tts.phonemize')
@patch('api.src.services.tts.tokenize')
@patch('api.src.services.tts.generate')
def test_generate_audio_empty_text(mock_generate, mock_tokenize, mock_phonemize, mock_normalize, mock_voicepack, mock_instance, tts_service):
@patch("api.src.services.tts.TTSModel.get_instance")
@patch("api.src.services.tts.TTSModel.get_voicepack")
@patch("api.src.services.tts.normalize_text")
@patch("api.src.services.tts.phonemize")
@patch("api.src.services.tts.tokenize")
@patch("api.src.services.tts.generate")
def test_generate_audio_empty_text(
mock_generate,
mock_tokenize,
mock_phonemize,
mock_normalize,
mock_voicepack,
mock_instance,
tts_service,
):
"""Test generating audio with empty text"""
mock_normalize.return_value = ""
with pytest.raises(ValueError, match="Text is empty after preprocessing"):
tts_service._generate_audio("", "af", 1.0)
@patch('api.src.services.tts.TTSModel.get_instance')
@patch('os.path.exists')
@patch('api.src.services.tts.normalize_text')
@patch('api.src.services.tts.phonemize')
@patch('api.src.services.tts.tokenize')
@patch('api.src.services.tts.generate')
@patch('torch.load')
def test_generate_audio_no_chunks(mock_torch_load, mock_generate, mock_tokenize, mock_phonemize, mock_normalize, mock_exists, mock_instance, tts_service):
@patch("api.src.services.tts.TTSModel.get_instance")
@patch("os.path.exists")
@patch("api.src.services.tts.normalize_text")
@patch("api.src.services.tts.phonemize")
@patch("api.src.services.tts.tokenize")
@patch("api.src.services.tts.generate")
@patch("torch.load")
def test_generate_audio_no_chunks(
mock_torch_load,
mock_generate,
mock_tokenize,
mock_phonemize,
mock_normalize,
mock_exists,
mock_instance,
tts_service,
):
"""Test generating audio with no successful chunks"""
mock_normalize.return_value = "Test text"
mock_phonemize.return_value = "Test text"
@ -94,19 +114,29 @@ def test_generate_audio_no_chunks(mock_torch_load, mock_generate, mock_tokenize,
mock_instance.return_value = (MagicMock(), "cpu")
mock_exists.return_value = True
mock_torch_load.return_value = MagicMock()
with pytest.raises(ValueError, match="No audio chunks were generated successfully"):
tts_service._generate_audio("Test text", "af", 1.0)
@patch('api.src.services.tts.TTSModel.get_instance')
@patch('os.path.exists')
@patch('api.src.services.tts.normalize_text')
@patch('api.src.services.tts.phonemize')
@patch('api.src.services.tts.tokenize')
@patch('api.src.services.tts.generate')
@patch('torch.load')
def test_generate_audio_success(mock_torch_load, mock_generate, mock_tokenize, mock_phonemize, mock_normalize, mock_exists, mock_instance, tts_service, sample_audio):
@patch("api.src.services.tts.TTSModel.get_instance")
@patch("os.path.exists")
@patch("api.src.services.tts.normalize_text")
@patch("api.src.services.tts.phonemize")
@patch("api.src.services.tts.tokenize")
@patch("api.src.services.tts.generate")
@patch("torch.load")
def test_generate_audio_success(
mock_torch_load,
mock_generate,
mock_tokenize,
mock_phonemize,
mock_normalize,
mock_exists,
mock_instance,
tts_service,
sample_audio,
):
"""Test successful audio generation"""
mock_normalize.return_value = "Test text"
mock_phonemize.return_value = "Test text"
@ -115,15 +145,15 @@ def test_generate_audio_success(mock_torch_load, mock_generate, mock_tokenize, m
mock_instance.return_value = (MagicMock(), "cpu")
mock_exists.return_value = True
mock_torch_load.return_value = MagicMock()
audio, processing_time = tts_service._generate_audio("Test text", "af", 1.0)
assert isinstance(audio, np.ndarray)
assert isinstance(processing_time, float)
assert len(audio) > 0
@patch('api.src.services.tts.torch.cuda.is_available')
@patch('api.src.services.tts.build_model')
@patch("api.src.services.tts.torch.cuda.is_available")
@patch("api.src.services.tts.build_model")
def test_model_initialization_cuda(mock_build_model, mock_cuda_available):
"""Test model initialization with CUDA"""
mock_cuda_available.return_value = True
@ -132,14 +162,14 @@ def test_model_initialization_cuda(mock_build_model, mock_cuda_available):
TTSModel._instance = None # Reset singleton
model, voice_count = TTSModel.initialize()
assert TTSModel._device == "cuda" # Check the class variable instead
assert model == mock_model
mock_build_model.assert_called_once()
@patch('api.src.services.tts.torch.cuda.is_available')
@patch('api.src.services.tts.build_model')
@patch("api.src.services.tts.torch.cuda.is_available")
@patch("api.src.services.tts.build_model")
def test_model_initialization_cpu(mock_build_model, mock_cuda_available):
"""Test model initialization with CPU"""
mock_cuda_available.return_value = False
@ -148,76 +178,95 @@ def test_model_initialization_cpu(mock_build_model, mock_cuda_available):
TTSModel._instance = None # Reset singleton
model, voice_count = TTSModel.initialize()
assert TTSModel._device == "cpu" # Check the class variable instead
assert model == mock_model
mock_build_model.assert_called_once()
@patch('api.src.services.tts.TTSService._get_voice_path')
@patch('api.src.services.tts.TTSModel.get_instance')
@patch("api.src.services.tts.TTSService._get_voice_path")
@patch("api.src.services.tts.TTSModel.get_instance")
def test_voicepack_loading_error(mock_get_instance, mock_get_voice_path):
"""Test voicepack loading error handling"""
mock_get_voice_path.return_value = None
mock_get_instance.return_value = (MagicMock(), "cpu")
TTSModel._voicepacks = {} # Reset voicepacks
service = TTSService(start_worker=False)
with pytest.raises(ValueError, match="Voice not found: nonexistent_voice"):
service._generate_audio("test", "nonexistent_voice", 1.0)
@patch('api.src.services.tts.TTSModel')
@patch("api.src.services.tts.TTSModel")
def test_save_audio(mock_tts_model, tts_service, sample_audio, tmp_path):
"""Test saving audio to file"""
output_dir = os.path.join(tmp_path, "test_output")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "audio.wav")
tts_service._save_audio(sample_audio, output_path)
assert os.path.exists(output_path)
assert os.path.getsize(output_path) > 0
@patch('api.src.services.tts.TTSModel.get_instance')
@patch('os.path.exists')
@patch('api.src.services.tts.normalize_text')
@patch('api.src.services.tts.generate')
@patch('torch.load')
def test_generate_audio_without_stitching(mock_torch_load, mock_generate, mock_normalize, mock_exists, mock_instance, tts_service, sample_audio):
@patch("api.src.services.tts.TTSModel.get_instance")
@patch("os.path.exists")
@patch("api.src.services.tts.normalize_text")
@patch("api.src.services.tts.generate")
@patch("torch.load")
def test_generate_audio_without_stitching(
mock_torch_load,
mock_generate,
mock_normalize,
mock_exists,
mock_instance,
tts_service,
sample_audio,
):
"""Test generating audio without text stitching"""
mock_normalize.return_value = "Test text"
mock_generate.return_value = (sample_audio, None)
mock_instance.return_value = (MagicMock(), "cpu")
mock_exists.return_value = True
mock_torch_load.return_value = MagicMock()
audio, processing_time = tts_service._generate_audio("Test text", "af", 1.0, stitch_long_output=False)
audio, processing_time = tts_service._generate_audio(
"Test text", "af", 1.0, stitch_long_output=False
)
assert isinstance(audio, np.ndarray)
assert isinstance(processing_time, float)
assert len(audio) > 0
mock_generate.assert_called_once()
@patch('os.listdir')
@patch("os.listdir")
def test_list_voices_error(mock_listdir, tts_service):
"""Test error handling in list_voices"""
mock_listdir.side_effect = Exception("Failed to list directory")
voices = tts_service.list_voices()
assert voices == []
@patch('api.src.services.tts.TTSModel.get_instance')
@patch('os.path.exists')
@patch('api.src.services.tts.normalize_text')
@patch('api.src.services.tts.phonemize')
@patch('api.src.services.tts.tokenize')
@patch('api.src.services.tts.generate')
@patch('torch.load')
def test_generate_audio_phonemize_error(mock_torch_load, mock_generate, mock_tokenize, mock_phonemize, mock_normalize, mock_exists, mock_instance, tts_service):
@patch("api.src.services.tts.TTSModel.get_instance")
@patch("os.path.exists")
@patch("api.src.services.tts.normalize_text")
@patch("api.src.services.tts.phonemize")
@patch("api.src.services.tts.tokenize")
@patch("api.src.services.tts.generate")
@patch("torch.load")
def test_generate_audio_phonemize_error(
mock_torch_load,
mock_generate,
mock_tokenize,
mock_phonemize,
mock_normalize,
mock_exists,
mock_instance,
tts_service,
):
"""Test handling phonemization error"""
mock_normalize.return_value = "Test text"
mock_phonemize.side_effect = Exception("Phonemization failed")
@ -225,23 +274,30 @@ def test_generate_audio_phonemize_error(mock_torch_load, mock_generate, mock_tok
mock_exists.return_value = True
mock_torch_load.return_value = MagicMock()
mock_generate.return_value = (None, None)
with pytest.raises(ValueError, match="No audio chunks were generated successfully"):
tts_service._generate_audio("Test text", "af", 1.0)
@patch('api.src.services.tts.TTSModel.get_instance')
@patch('os.path.exists')
@patch('api.src.services.tts.normalize_text')
@patch('api.src.services.tts.generate')
@patch('torch.load')
def test_generate_audio_error(mock_torch_load, mock_generate, mock_normalize, mock_exists, mock_instance, tts_service):
@patch("api.src.services.tts.TTSModel.get_instance")
@patch("os.path.exists")
@patch("api.src.services.tts.normalize_text")
@patch("api.src.services.tts.generate")
@patch("torch.load")
def test_generate_audio_error(
mock_torch_load,
mock_generate,
mock_normalize,
mock_exists,
mock_instance,
tts_service,
):
"""Test handling generation error"""
mock_normalize.return_value = "Test text"
mock_generate.side_effect = Exception("Generation failed")
mock_instance.return_value = (MagicMock(), "cpu")
mock_exists.return_value = True
mock_torch_load.return_value = MagicMock()
with pytest.raises(ValueError, match="No audio chunks were generated successfully"):
tts_service._generate_audio("Test text", "af", 1.0)

View file

@ -46,14 +46,14 @@ services:
model-fetcher:
condition: service_healthy
# # Gradio UI service
# gradio-ui:
# build:
# context: ./ui
# ports:
# - "7860:7860"
# volumes:
# - ./ui/data:/app/ui/data
# - ./ui/app.py:/app/app.py # Mount app.py for hot reload
# environment:
# - GRADIO_WATCH=True # Enable hot reloading
# Gradio UI service [Comment out everything below if you don't need it]
gradio-ui:
build:
context: ./ui
ports:
- "7860:7860"
volumes:
- ./ui/data:/app/ui/data
- ./ui/app.py:/app/app.py # Mount app.py for hot reload
environment:
- GRADIO_WATCH=True # Enable hot reloading

View file

@ -19,7 +19,6 @@ output_dir = Path(__file__).parent / "output"
output_dir.mkdir(exist_ok=True)
def test_voice(voice: str):
speech_file = output_dir / f"speech_{voice}.mp3"
print(f"\nTesting voice: {voice}")

View file

@ -1,21 +1,23 @@
#!/usr/bin/env python3
import argparse
import os
from typing import List, Optional, Dict, Tuple
import argparse
from typing import Dict, List, Tuple, Optional
import requests
import numpy as np
from scipy.io import wavfile
import requests
import matplotlib.pyplot as plt
from scipy.io import wavfile
def submit_combine_voices(voices: List[str], base_url: str = "http://localhost:8880") -> Optional[str]:
def submit_combine_voices(
voices: List[str], base_url: str = "http://localhost:8880"
) -> Optional[str]:
"""Combine multiple voices into a new voice.
Args:
voices: List of voice names to combine (e.g. ["af_bella", "af_sarah"])
base_url: API base URL
Returns:
Name of the combined voice (e.g. "af_bella_af_sarah") or None if error
"""
@ -23,7 +25,7 @@ def submit_combine_voices(voices: List[str], base_url: str = "http://localhost:8
response = requests.post(f"{base_url}/v1/audio/voices/combine", json=voices)
print(f"Response status: {response.status_code}")
print(f"Raw response: {response.text}")
# Accept both 200 and 201 as success
if response.status_code not in [200, 201]:
try:
@ -32,7 +34,7 @@ def submit_combine_voices(voices: List[str], base_url: str = "http://localhost:8
except:
print(f"Error combining voices: {response.text}")
return None
try:
data = response.json()
if "voices" in data:
@ -46,15 +48,20 @@ def submit_combine_voices(voices: List[str], base_url: str = "http://localhost:8
return None
def generate_speech(text: str, voice: str, base_url: str = "http://localhost:8880", output_file: str = "output.mp3") -> bool:
def generate_speech(
text: str,
voice: str,
base_url: str = "http://localhost:8880",
output_file: str = "output.mp3",
) -> bool:
"""Generate speech using specified voice.
Args:
text: Text to convert to speech
voice: Voice name to use
base_url: API base URL
output_file: Path to save audio file
Returns:
True if successful, False otherwise
"""
@ -65,22 +72,25 @@ def generate_speech(text: str, voice: str, base_url: str = "http://localhost:888
"input": text,
"voice": voice,
"speed": 1.0,
"response_format": "wav" # Use WAV for analysis
}
"response_format": "wav", # Use WAV for analysis
},
)
if response.status_code != 200:
error = response.json().get("detail", {}).get("message", response.text)
print(f"Error generating speech: {error}")
return False
# Save the audio
os.makedirs(os.path.dirname(output_file) if os.path.dirname(output_file) else ".", exist_ok=True)
os.makedirs(
os.path.dirname(output_file) if os.path.dirname(output_file) else ".",
exist_ok=True,
)
with open(output_file, "wb") as f:
f.write(response.content)
print(f"Saved audio to {output_file}")
return True
except Exception as e:
print(f"Error: {e}")
return False
@ -88,57 +98,57 @@ def generate_speech(text: str, voice: str, base_url: str = "http://localhost:888
def analyze_audio(filepath: str) -> Tuple[np.ndarray, int, dict]:
"""Analyze audio file and return samples, sample rate, and audio characteristics.
Args:
filepath: Path to audio file
Returns:
Tuple of (samples, sample_rate, characteristics)
"""
sample_rate, samples = wavfile.read(filepath)
# Convert to mono if stereo
if len(samples.shape) > 1:
samples = np.mean(samples, axis=1)
# Calculate basic stats
max_amp = np.max(np.abs(samples))
rms = np.sqrt(np.mean(samples**2))
duration = len(samples) / sample_rate
# Zero crossing rate (helps identify voice characteristics)
zero_crossings = np.sum(np.abs(np.diff(np.signbit(samples)))) / len(samples)
# Simple frequency analysis
if len(samples) > 0:
# Use FFT to get frequency components
fft_result = np.fft.fft(samples)
freqs = np.fft.fftfreq(len(samples), 1/sample_rate)
freqs = np.fft.fftfreq(len(samples), 1 / sample_rate)
# Get positive frequencies only
pos_mask = freqs > 0
freqs = freqs[pos_mask]
magnitudes = np.abs(fft_result)[pos_mask]
# Find dominant frequencies (top 3)
top_indices = np.argsort(magnitudes)[-3:]
dominant_freqs = freqs[top_indices]
# Calculate spectral centroid (brightness of sound)
spectral_centroid = np.sum(freqs * magnitudes) / np.sum(magnitudes)
else:
dominant_freqs = []
spectral_centroid = 0
characteristics = {
"max_amplitude": max_amp,
"rms": rms,
"duration": duration,
"zero_crossing_rate": zero_crossings,
"dominant_frequencies": dominant_freqs,
"spectral_centroid": spectral_centroid
"spectral_centroid": spectral_centroid,
}
return samples, sample_rate, characteristics
@ -167,112 +177,136 @@ def setup_plot(fig, ax, title):
return fig, ax
def plot_analysis(audio_files: Dict[str, str], output_dir: str):
"""Plot comprehensive voice analysis including waveforms and metrics comparison.
Args:
audio_files: Dictionary of label -> filepath
output_dir: Directory to save plot files
"""
# Set dark style
plt.style.use('dark_background')
plt.style.use("dark_background")
# Create figure with subplots
fig = plt.figure(figsize=(15, 15))
fig.patch.set_facecolor("#1a1a2e")
num_files = len(audio_files)
# Create subplot grid with proper spacing
gs = plt.GridSpec(num_files + 1, 2, height_ratios=[1.5]*num_files + [1],
hspace=0.4, wspace=0.3)
gs = plt.GridSpec(
num_files + 1, 2, height_ratios=[1.5] * num_files + [1], hspace=0.4, wspace=0.3
)
# Analyze all files first
all_chars = {}
for i, (label, filepath) in enumerate(audio_files.items()):
samples, sample_rate, chars = analyze_audio(filepath)
all_chars[label] = chars
# Plot waveform spanning both columns
ax = plt.subplot(gs[i, :])
time = np.arange(len(samples)) / sample_rate
plt.plot(time, samples / chars['max_amplitude'], linewidth=0.5, color="#ff2a6d")
plt.plot(time, samples / chars["max_amplitude"], linewidth=0.5, color="#ff2a6d")
ax.set_xlabel("Time (seconds)")
ax.set_ylabel("Normalized Amplitude")
ax.set_ylim(-1.1, 1.1)
setup_plot(fig, ax, f"Waveform: {label}")
# Colors for voices
colors = ["#ff2a6d", "#05d9e8", "#d1f7ff"]
# Create two subplots for metrics with similar scales
# Left subplot: Brightness and Volume
ax1 = plt.subplot(gs[num_files, 0])
metrics1 = [
('Brightness', [chars['spectral_centroid']/1000 for chars in all_chars.values()], 'kHz'),
('Volume', [chars['rms']*100 for chars in all_chars.values()], 'RMS×100')
(
"Brightness",
[chars["spectral_centroid"] / 1000 for chars in all_chars.values()],
"kHz",
),
("Volume", [chars["rms"] * 100 for chars in all_chars.values()], "RMS×100"),
]
# Right subplot: Voice Pitch and Texture
ax2 = plt.subplot(gs[num_files, 1])
metrics2 = [
('Voice Pitch', [min(chars['dominant_frequencies']) for chars in all_chars.values()], 'Hz'),
('Texture', [chars['zero_crossing_rate']*1000 for chars in all_chars.values()], 'ZCR×1000')
(
"Voice Pitch",
[min(chars["dominant_frequencies"]) for chars in all_chars.values()],
"Hz",
),
(
"Texture",
[chars["zero_crossing_rate"] * 1000 for chars in all_chars.values()],
"ZCR×1000",
),
]
def plot_grouped_bars(ax, metrics, show_legend=True):
n_groups = len(metrics)
n_voices = len(audio_files)
bar_width = 0.25
indices = np.arange(n_groups)
# Get max value for y-axis scaling
max_val = max(max(m[1]) for m in metrics)
for i, (voice, color) in enumerate(zip(audio_files.keys(), colors)):
values = [m[1][i] for m in metrics]
offset = (i - n_voices/2 + 0.5) * bar_width
bars = ax.bar(indices + offset, values, bar_width,
label=voice, color=color, alpha=0.8)
offset = (i - n_voices / 2 + 0.5) * bar_width
bars = ax.bar(
indices + offset, values, bar_width, label=voice, color=color, alpha=0.8
)
# Add value labels on top of bars
for bar in bars:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.1f}',
ha='center', va='bottom', color='white',
fontsize=10)
ax.text(
bar.get_x() + bar.get_width() / 2.0,
height,
f"{height:.1f}",
ha="center",
va="bottom",
color="white",
fontsize=10,
)
ax.set_xticks(indices)
ax.set_xticklabels([f"{m[0]}\n({m[2]})" for m in metrics])
# Set y-axis limits with some padding
ax.set_ylim(0, max_val * 1.2)
if show_legend:
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left',
facecolor="#1a1a2e", edgecolor="#ffffff")
ax.legend(
bbox_to_anchor=(1.05, 1),
loc="upper left",
facecolor="#1a1a2e",
edgecolor="#ffffff",
)
# Plot both subplots
plot_grouped_bars(ax1, metrics1, show_legend=True)
plot_grouped_bars(ax2, metrics2, show_legend=False)
# Style both subplots
setup_plot(fig, ax1, 'Brightness and Volume')
setup_plot(fig, ax2, 'Voice Pitch and Texture')
setup_plot(fig, ax1, "Brightness and Volume")
setup_plot(fig, ax2, "Voice Pitch and Texture")
# Add y-axis labels
ax1.set_ylabel('Value')
ax2.set_ylabel('Value')
ax1.set_ylabel("Value")
ax2.set_ylabel("Value")
# Adjust the figure size to accommodate the legend
fig.set_size_inches(15, 15)
# Add padding around the entire figure
plt.subplots_adjust(right=0.85, top=0.95, bottom=0.05, left=0.1)
plt.savefig(os.path.join(output_dir, "analysis_comparison.png"), dpi=300)
print(f"Saved analysis comparison to {output_dir}/analysis_comparison.png")
# Print detailed comparative analysis
print("\nDetailed Voice Analysis:")
for label, chars in all_chars.items():
@ -282,44 +316,57 @@ def plot_analysis(audio_files: Dict[str, str], output_dir: str):
print(f" Duration: {chars['duration']:.2f}s")
print(f" Zero Crossing Rate: {chars['zero_crossing_rate']:.3f}")
print(f" Spectral Centroid: {chars['spectral_centroid']:.0f}Hz")
print(f" Dominant Frequencies: {', '.join(f'{f:.0f}Hz' for f in chars['dominant_frequencies'])}")
print(
f" Dominant Frequencies: {', '.join(f'{f:.0f}Hz' for f in chars['dominant_frequencies'])}"
)
def main():
parser = argparse.ArgumentParser(description="Kokoro Voice Analysis Demo")
parser.add_argument("--voices", nargs="+", type=str, help="Voices to combine")
parser.add_argument("--text", type=str, default="Hello! This is a test of combined voices.", help="Text to speak")
parser.add_argument(
"--text",
type=str,
default="Hello! This is a test of combined voices.",
help="Text to speak",
)
parser.add_argument("--url", default="http://localhost:8880", help="API base URL")
parser.add_argument("--output-dir", default="examples/output", help="Output directory for audio files")
parser.add_argument(
"--output-dir",
default="examples/output",
help="Output directory for audio files",
)
args = parser.parse_args()
if not args.voices:
print("No voices provided, using default test voices")
args.voices = ["af_bella", "af_nicole"]
# Create output directory
os.makedirs(args.output_dir, exist_ok=True)
# Dictionary to store audio files for analysis
audio_files = {}
# Generate speech with individual voices
print("Generating speech with individual voices...")
for voice in args.voices:
output_file = os.path.join(args.output_dir, f"analysis_{voice}.wav")
if generate_speech(args.text, voice, args.url, output_file):
audio_files[voice] = output_file
# Generate speech with combined voice
print(f"\nCombining voices: {', '.join(args.voices)}")
combined_voice = submit_combine_voices(args.voices, args.url)
if combined_voice:
print(f"Successfully created combined voice: {combined_voice}")
output_file = os.path.join(args.output_dir, f"analysis_combined_{combined_voice}.wav")
output_file = os.path.join(
args.output_dir, f"analysis_combined_{combined_voice}.wav"
)
if generate_speech(args.text, combined_voice, args.url, output_file):
audio_files["combined"] = output_file
# Generate comparison plots
plot_analysis(audio_files, args.output_dir)
else:

View file

@ -60,7 +60,7 @@ def test_speed(speed: float):
# Test different formats
for format in ["wav", "mp3", "opus", "aac", "flac", "pcm"]:
test_format(format) # aac and pcm should fail as they are not supported
test_format(format) # aac and pcm should fail as they are not supported
# Test different speeds
for speed in [0.25, 1.0, 2.0, 4.0]: # 5.0 should fail as it's out of range

View file

@ -1,5 +1,5 @@
[pytest]
testpaths = api/tests
testpaths = api/tests ui/tests
python_files = test_*.py
addopts = -v --tb=short --cov=api --cov-report=term-missing --cov-config=.coveragerc
addopts = -v --tb=short --cov=api --cov=ui --cov-report=term-missing --cov-config=.coveragerc
pythonpath = .

View file

@ -10,3 +10,5 @@ sqlalchemy==2.0.27
pytest==8.0.0
httpx==0.26.0
pytest-asyncio==0.23.5
pytest-cov==6.0.0
gradio==4.19.2

15
ui/Dockerfile Normal file
View file

@ -0,0 +1,15 @@
FROM python:3.10-slim
WORKDIR /app/ui
# Install dependencies
RUN pip install gradio==5.9.1 requests==2.32.3
# Create necessary directories
RUN mkdir -p data/inputs data/outputs
# Copy the application files
COPY . .
# Run the Gradio app
CMD ["python", "app.py"]

BIN
ui/GUIBanner.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 486 KiB

BIN
ui/GradioScreenShot.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 113 KiB

5
ui/app.py Normal file
View file

@ -0,0 +1,5 @@
from lib.interface import create_interface
if __name__ == "__main__":
demo = create_interface()
demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True)

View file

@ -0,0 +1,151 @@
The Time Traveller (for so it will be convenient to speak of him) was expounding a recondite matter to us. His pale grey eyes shone and twinkled, and his usually pale face was flushed and animated. The fire burnt brightly, and the soft radiance of the incandescent lights in the lilies of silver caught the bubbles that flashed and passed in our glasses. Our chairs, being his patents, embraced and caressed us rather than submitted to be sat upon, and there was that luxurious after-dinner atmosphere, when thought runs gracefully free of the trammels of precision. And he put it to us in this way—marking the points with a lean forefinger—as we sat and lazily admired his earnestness over this new paradox (as we thought it) and his fecundity.
“You must follow me carefully. I shall have to controvert one or two ideas that are almost universally accepted. The geometry, for instance, they taught you at school is founded on a misconception.”
“Is not that rather a large thing to expect us to begin upon?” said Filby, an argumentative person with red hair.
“I do not mean to ask you to accept anything without reasonable ground for it. You will soon admit as much as I need from you. You know of course that a mathematical line, a line of thickness nil, has no real existence. They taught you that? Neither has a mathematical plane. These things are mere abstractions.”
“That is all right,” said the Psychologist.
“Nor, having only length, breadth, and thickness, can a cube have a real existence.”
“There I object,” said Filby. “Of course a solid body may exist. All real things—”
“So most people think. But wait a moment. Can an instantaneous cube exist?”
“Dont follow you,” said Filby.
“Can a cube that does not last for any time at all, have a real existence?”
Filby became pensive. “Clearly,” the Time Traveller proceeded, “any real body must have extension in four directions: it must have Length, Breadth, Thickness, and—Duration. But through a natural infirmity of the flesh, which I will explain to you in a moment, we incline to overlook this fact. There are really four dimensions, three which we call the three planes of Space, and a fourth, Time. There is, however, a tendency to draw an unreal distinction between the former three dimensions and the latter, because it happens that our consciousness moves intermittently in one direction along the latter from the beginning to the end of our lives.”
“That,” said a very young man, making spasmodic efforts to relight his cigar over the lamp; “that . . . very clear indeed.”
“Now, it is very remarkable that this is so extensively overlooked,” continued the Time Traveller, with a slight accession of cheerfulness. “Really this is what is meant by the Fourth Dimension, though some people who talk about the Fourth Dimension do not know they mean it. It is only another way of looking at Time. There is no difference between Time and any of the three dimensions of Space except that our consciousness moves along it. But some foolish people have got hold of the wrong side of that idea. You have all heard what they have to say about this Fourth Dimension?”
“I have not,” said the Provincial Mayor.
“It is simply this. That Space, as our mathematicians have it, is spoken of as having three dimensions, which one may call Length, Breadth, and Thickness, and is always definable by reference to three planes, each at right angles to the others. But some philosophical people have been asking why three dimensions particularly—why not another direction at right angles to the other three?—and have even tried to construct a Four-Dimensional geometry. Professor Simon Newcomb was expounding this to the New York Mathematical Society only a month or so ago. You know how on a flat surface, which has only two dimensions, we can represent a figure of a three-dimensional solid, and similarly they think that by models of three dimensions they could represent one of four—if they could master the perspective of the thing. See?”
“I think so,” murmured the Provincial Mayor; and, knitting his brows, he lapsed into an introspective state, his lips moving as one who repeats mystic words. “Yes, I think I see it now,” he said after some time, brightening in a quite transitory manner.
“Well, I do not mind telling you I have been at work upon this geometry of Four Dimensions for some time. Some of my results are curious. For instance, here is a portrait of a man at eight years old, another at fifteen, another at seventeen, another at twenty-three, and so on. All these are evidently sections, as it were, Three-Dimensional representations of his Four-Dimensioned being, which is a fixed and unalterable thing.
“Scientific people,” proceeded the Time Traveller, after the pause required for the proper assimilation of this, “know very well that Time is only a kind of Space. Here is a popular scientific diagram, a weather record. This line I trace with my finger shows the movement of the barometer. Yesterday it was so high, yesterday night it fell, then this morning it rose again, and so gently upward to here. Surely the mercury did not trace this line in any of the dimensions of Space generally recognised? But certainly it traced such a line, and that line, therefore, we must conclude, was along the Time-Dimension.”
“But,” said the Medical Man, staring hard at a coal in the fire, “if Time is really only a fourth dimension of Space, why is it, and why has it always been, regarded as something different? And why cannot we move in Time as we move about in the other dimensions of Space?”
The Time Traveller smiled. “Are you so sure we can move freely in Space? Right and left we can go, backward and forward freely enough, and men always have done so. I admit we move freely in two dimensions. But how about up and down? Gravitation limits us there.”
“Not exactly,” said the Medical Man. “There are balloons.”
“But before the balloons, save for spasmodic jumping and the inequalities of the surface, man had no freedom of vertical movement.”
“Still they could move a little up and down,” said the Medical Man.
“Easier, far easier down than up.”
“And you cannot move at all in Time, you cannot get away from the present moment.”
“My dear sir, that is just where you are wrong. That is just where the whole world has gone wrong. We are always getting away from the present moment. Our mental existences, which are immaterial and have no dimensions, are passing along the Time-Dimension with a uniform velocity from the cradle to the grave. Just as we should travel down if we began our existence fifty miles above the earths surface.”
“But the great difficulty is this,” interrupted the Psychologist. You can move about in all directions of Space, but you cannot move about in Time.”
“That is the germ of my great discovery. But you are wrong to say that we cannot move about in Time. For instance, if I am recalling an incident very vividly I go back to the instant of its occurrence: I become absent-minded, as you say. I jump back for a moment. Of course we have no means of staying back for any length of Time, any more than a savage or an animal has of staying six feet above the ground. But a civilised man is better off than the savage in this respect. He can go up against gravitation in a balloon, and why should he not hope that ultimately he may be able to stop or accelerate his drift along the Time-Dimension, or even turn about and travel the other way?”
“Oh, this,” began Filby, “is all—”
“Why not?” said the Time Traveller.
“Its against reason,” said Filby.
“What reason?” said the Time Traveller.
“You can show black is white by argument,” said Filby, “but you will never convince me.”
“Possibly not,” said the Time Traveller. “But now you begin to see the object of my investigations into the geometry of Four Dimensions. Long ago I had a vague inkling of a machine—”
“To travel through Time!” exclaimed the Very Young Man.
“That shall travel indifferently in any direction of Space and Time, as the driver determines.”
Filby contented himself with laughter.
“But I have experimental verification,” said the Time Traveller.
“It would be remarkably convenient for the historian,” the Psychologist suggested. “One might travel back and verify the accepted account of the Battle of Hastings, for instance!”
“Dont you think you would attract attention?” said the Medical Man. “Our ancestors had no great tolerance for anachronisms.”
“One might get ones Greek from the very lips of Homer and Plato,” the Very Young Man thought.
“In which case they would certainly plough you for the Little-go. The German scholars have improved Greek so much.”
“Then there is the future,” said the Very Young Man. “Just think! One might invest all ones money, leave it to accumulate at interest, and hurry on ahead!”
“To discover a society,” said I, “erected on a strictly communistic basis.”
“Of all the wild extravagant theories!” began the Psychologist.
“Yes, so it seemed to me, and so I never talked of it until—”
“Experimental verification!” cried I. “You are going to verify that?”
“The experiment!” cried Filby, who was getting brain-weary.
“Lets see your experiment anyhow,” said the Psychologist, “though its all humbug, you know.”
The Time Traveller smiled round at us. Then, still smiling faintly, and with his hands deep in his trousers pockets, he walked slowly out of the room, and we heard his slippers shuffling down the long passage to his laboratory.
The Psychologist looked at us. “I wonder what hes got?”
“Some sleight-of-hand trick or other,” said the Medical Man, and Filby tried to tell us about a conjuror he had seen at Burslem, but before he had finished his preface the Time Traveller came back, and Filbys anecdote collapsed.
II.
The Machine
The thing the Time Traveller held in his hand was a glittering metallic framework, scarcely larger than a small clock, and very delicately made. There was ivory in it, and some transparent crystalline substance. And now I must be explicit, for this that follows—unless his explanation is to be accepted—is an absolutely unaccountable thing. He took one of the small octagonal tables that were scattered about the room, and set it in front of the fire, with two legs on the hearthrug. On this table he placed the mechanism. Then he drew up a chair, and sat down. The only other object on the table was a small shaded lamp, the bright light of which fell upon the model. There were also perhaps a dozen candles about, two in brass candlesticks upon the mantel and several in sconces, so that the room was brilliantly illuminated. I sat in a low arm-chair nearest the fire, and I drew this forward so as to be almost between the Time Traveller and the fireplace. Filby sat behind him, looking over his shoulder. The Medical Man and the Provincial Mayor watched him in profile from the right, the Psychologist from the left. The Very Young Man stood behind the Psychologist. We were all on the alert. It appears incredible to me that any kind of trick, however subtly conceived and however adroitly done, could have been played upon us under these conditions.
The Time Traveller looked at us, and then at the mechanism. “Well?” said the Psychologist.
“This little affair,” said the Time Traveller, resting his elbows upon the table and pressing his hands together above the apparatus, “is only a model. It is my plan for a machine to travel through time. You will notice that it looks singularly askew, and that there is an odd twinkling appearance about this bar, as though it was in some way unreal.” He pointed to the part with his finger. “Also, here is one little white lever, and here is another.”
The Medical Man got up out of his chair and peered into the thing. “Its beautifully made,” he said.
“It took two years to make,” retorted the Time Traveller. Then, when we had all imitated the action of the Medical Man, he said: “Now I want you clearly to understand that this lever, being pressed over, sends the machine gliding into the future, and this other reverses the motion. This saddle represents the seat of a time traveller. Presently I am going to press the lever, and off the machine will go. It will vanish, pass into future Time, and disappear. Have a good look at the thing. Look at the table too, and satisfy yourselves there is no trickery. I dont want to waste this model, and then be told Im a quack.”
There was a minutes pause perhaps. The Psychologist seemed about to speak to me, but changed his mind. Then the Time Traveller put forth his finger towards the lever. “No,” he said suddenly. “Lend me your hand.” And turning to the Psychologist, he took that individuals hand in his own and told him to put out his forefinger. So that it was the Psychologist himself who sent forth the model Time Machine on its interminable voyage. We all saw the lever turn. I am absolutely certain there was no trickery. There was a breath of wind, and the lamp flame jumped. One of the candles on the mantel was blown out, and the little machine suddenly swung round, became indistinct, was seen as a ghost for a second perhaps, as an eddy of faintly glittering brass and ivory; and it was gone—vanished! Save for the lamp the table was bare.
Everyone was silent for a minute. Then Filby said he was damned.
The Psychologist recovered from his stupor, and suddenly looked under the table. At that the Time Traveller laughed cheerfully. “Well?” he said, with a reminiscence of the Psychologist. Then, getting up, he went to the tobacco jar on the mantel, and with his back to us began to fill his pipe.
We stared at each other. “Look here,” said the Medical Man, “are you in earnest about this? Do you seriously believe that that machine has travelled into time?”
“Certainly,” said the Time Traveller, stooping to light a spill at the fire. Then he turned, lighting his pipe, to look at the Psychologists face. (The Psychologist, to show that he was not unhinged, helped himself to a cigar and tried to light it uncut.) “What is more, I have a big machine nearly finished in there”—he indicated the laboratory—“and when that is put together I mean to have a journey on my own account.”
“You mean to say that that machine has travelled into the future?” said Filby.
“Into the future or the past—I dont, for certain, know which.”
After an interval the Psychologist had an inspiration. “It must have gone into the past if it has gone anywhere,” he said.
“Why?” said the Time Traveller.
“Because I presume that it has not moved in space, and if it travelled into the future it would still be here all this time, since it must have travelled through this time.”
“But,” said I, “If it travelled into the past it would have been visible when we came first into this room; and last Thursday when we were here; and the Thursday before that; and so forth!”
“Serious objections,” remarked the Provincial Mayor, with an air of impartiality, turning towards the Time Traveller.
“Not a bit,” said the Time Traveller, and, to the Psychologist: “You think. You can explain that. Its presentation below the threshold, you know, diluted presentation.”
“Of course,” said the Psychologist, and reassured us. “Thats a simple point of psychology. I should have thought of it. Its plain enough, and helps the paradox delightfully. We cannot see it, nor can we appreciate this machine, any more than we can the spoke of a wheel spinning, or a bullet flying through the air. If it is travelling through time fifty times or a hundred times faster than we are, if it gets through a minute while we get through a second, the impression it creates will of course be only one-fiftieth or one-hundredth of what it would make if it were not travelling in time. Thats plain enough.” He passed his hand through the space in which the machine had been. “You see?” he said, laughing.
We sat and stared at the vacant table for a minute or so. Then the Time Traveller asked us what we thought of it all.
“It sounds plausible enough tonight,” said the Medical Man; “but wait until tomorrow. Wait for the common sense of the morning.”
“Would you like to see the Time Machine itself?” asked the Time Traveller. And therewith, taking the lamp in his hand, he led the way down the long, draughty corridor to his laboratory. I remember vividly the flickering light, his queer, broad head in silhouette, the dance of the shadows, how we all followed him, puzzled but incredulous, and how there in the laboratory we beheld a larger edition of the little mechanism which we had seen vanish from before our eyes. Parts were of nickel, parts of ivory, parts had certainly been filed or sawn out of rock crystal. The thing was generally complete, but the twisted crystalline bars lay unfinished upon the bench beside some
The Time Traveller Returns
I think that at that time none of us quite believed in the Time Machine. The fact is, the Time Traveller was one of those men who are too clever to be believed: you never felt that you saw all round him; you always suspected some subtle reserve, some ingenuity in ambush, behind his lucid frankness. Had Filby shown the model and explained the matter in the Time Travellers words, we should have shown him far less scepticism. For we should have perceived his motives: a pork-butcher could understand Filby. But the Time Traveller had more than a touch of whim among his elements, and we distrusted him. Things that would have made the fame of a less clever man seemed tricks in his hands. It is a mistake to do things too easily. The serious people who took him seriously never felt quite sure of his deportment; they were somehow aware that trusting their reputations for judgment with him was like furnishing a nursery with eggshell china. So I dont think any of us said very much about time travelling in the interval between that Thursday and the next, though its odd potentialities ran, no doubt, in most of our minds: its plausibility, that is, its practical incredibleness, the curious possibilities of anachronism and of utter confusion it suggested. For my own part, I was particularly preoccupied with the trick of the model. That I remember discussing with the Medical Man, whom I met on Friday at the Linnæan. He said he had seen a similar thing at Tübingen, and laid considerable stress on the blowing-out of the candle. But how the trick was done he could not explai

0
ui/lib/__init__.py Normal file
View file

89
ui/lib/api.py Normal file
View file

@ -0,0 +1,89 @@
import os
import datetime
from typing import List, Tuple, Optional
import requests
from .config import API_URL, OUTPUTS_DIR
def check_api_status() -> Tuple[bool, List[str]]:
"""Check TTS service status and get available voices."""
try:
# Use a longer timeout during startup
response = requests.get(
f"{API_URL}/v1/audio/voices",
timeout=30, # Increased timeout for initial startup period
)
response.raise_for_status()
voices = response.json().get("voices", [])
if voices:
return True, voices
print("No voices found in response")
return False, []
except requests.exceptions.Timeout:
print("API request timed out (waiting for service startup)")
return False, []
except requests.exceptions.ConnectionError as e:
print(f"Connection error (service may be starting up): {str(e)}")
return False, []
except requests.exceptions.RequestException as e:
print(f"API request failed: {str(e)}")
return False, []
except Exception as e:
print(f"Unexpected error checking API status: {str(e)}")
return False, []
def text_to_speech(
text: str, voice_id: str, format: str, speed: float
) -> Optional[str]:
"""Generate speech from text using TTS API."""
if not text.strip():
return None
# Create output filename
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_filename = f"output_{timestamp}_voice-{voice_id}_speed-{speed}.{format}"
output_path = os.path.join(OUTPUTS_DIR, output_filename)
try:
response = requests.post(
f"{API_URL}/v1/audio/speech",
json={
"model": "kokoro",
"input": text,
"voice": voice_id,
"response_format": format,
"speed": float(speed),
},
headers={"Content-Type": "application/json"},
timeout=300, # Longer timeout for speech generation
)
response.raise_for_status()
with open(output_path, "wb") as f:
f.write(response.content)
return output_path
except requests.exceptions.Timeout:
print("Speech generation request timed out")
return None
except requests.exceptions.RequestException as e:
print(f"Speech generation request failed: {str(e)}")
return None
except Exception as e:
print(f"Unexpected error generating speech: {str(e)}")
return None
def get_status_html(is_available: bool) -> str:
"""Generate HTML for status indicator."""
color = "green" if is_available else "red"
status = "Available" if is_available else "Unavailable"
return f"""
<div style="display: flex; align-items: center; gap: 8px;">
<div style="width: 12px; height: 12px; border-radius: 50%; background-color: {color};"></div>
<span>TTS Service: {status}</span>
</div>
"""

View file

@ -0,0 +1,5 @@
from .input import create_input_column
from .model import create_model_column
from .output import create_output_column
__all__ = ["create_input_column", "create_model_column", "create_output_column"]

View file

@ -0,0 +1,58 @@
from typing import Tuple
import gradio as gr
from .. import files
def create_input_column() -> Tuple[gr.Column, dict]:
"""Create the input column with text input and file handling."""
with gr.Column(scale=1) as col:
with gr.Tabs() as tabs:
# Set first tab as selected by default
tabs.selected = 0
# Direct Input Tab
with gr.TabItem("Direct Input"):
text_input = gr.Textbox(
label="Text to speak", placeholder="Enter text here...", lines=4
)
text_submit = gr.Button("Generate Speech", variant="primary", size="lg")
# File Input Tab
with gr.TabItem("From File"):
# Existing files dropdown
input_files_list = gr.Dropdown(
label="Select Existing File",
choices=files.list_input_files(),
value=None,
)
# Simple file upload
file_upload = gr.File(
label="Upload Text File (.txt)", file_types=[".txt"]
)
file_preview = gr.Textbox(
label="File Content Preview", interactive=False, lines=4
)
with gr.Row():
file_submit = gr.Button(
"Generate Speech", variant="primary", size="lg"
)
clear_files = gr.Button(
"Clear Files", variant="secondary", size="lg"
)
components = {
"tabs": tabs,
"text_input": text_input,
"file_select": input_files_list,
"file_upload": file_upload,
"file_preview": file_preview,
"text_submit": text_submit,
"file_submit": file_submit,
"clear_files": clear_files,
}
return col, components

View file

@ -0,0 +1,41 @@
from typing import Tuple, Optional
import gradio as gr
from .. import api, config
def create_model_column(voice_ids: Optional[list] = None) -> Tuple[gr.Column, dict]:
"""Create the model settings column."""
if voice_ids is None:
voice_ids = []
with gr.Column(scale=1) as col:
gr.Markdown("### Model Settings")
# Status button starts in waiting state
status_btn = gr.Button(
"⌛ TTS Service: Waiting for Service...", variant="secondary"
)
voice_input = gr.Dropdown(
choices=voice_ids,
label="Voice",
value=voice_ids[0] if voice_ids else None,
interactive=True,
)
format_input = gr.Dropdown(
choices=config.AUDIO_FORMATS, label="Audio Format", value="mp3"
)
speed_input = gr.Slider(
minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speed"
)
components = {
"status_btn": status_btn,
"voice": voice_input,
"format": format_input,
"speed": speed_input,
}
return col, components

View file

@ -0,0 +1,42 @@
from typing import Tuple
import gradio as gr
from .. import files
def create_output_column() -> Tuple[gr.Column, dict]:
"""Create the output column with audio player and file list."""
with gr.Column(scale=1) as col:
gr.Markdown("### Latest Output")
audio_output = gr.Audio(label="Generated Speech", type="filepath")
gr.Markdown("### Generated Files")
output_files = gr.Dropdown(
label="Previous Outputs",
choices=files.list_output_files(),
value=None,
allow_custom_value=False,
)
play_btn = gr.Button("▶️ Play Selected", size="sm")
selected_audio = gr.Audio(
label="Selected Output", type="filepath", visible=False
)
clear_outputs = gr.Button(
"⚠️ Delete All Previously Generated Output Audio 🗑️",
size="sm",
variant="secondary",
)
components = {
"audio_output": audio_output,
"output_files": output_files,
"play_btn": play_btn,
"selected_audio": selected_audio,
"clear_outputs": clear_outputs,
}
return col, components

40
ui/lib/config.py Normal file
View file

@ -0,0 +1,40 @@
import os
# API Configuration
API_URL = "http://kokoro-tts:8880"
# File paths
INPUTS_DIR = "/app/ui/data/inputs"
OUTPUTS_DIR = "/app/ui/data/outputs"
# Create directories if they don't exist
os.makedirs(INPUTS_DIR, exist_ok=True)
os.makedirs(OUTPUTS_DIR, exist_ok=True)
# Audio formats
AUDIO_FORMATS = ["mp3", "wav", "opus", "flac"]
# UI Theme
THEME = "monochrome"
CSS = """
.gradio-container {
max-width: 1000px;
margin: auto;
}
.banner-container {
background: transparent !important;
border: none !important;
box-shadow: none !important;
margin-bottom: 2rem;
}
.banner-container img {
width: 100%;
max-width: 600px;
border-radius: 10px;
margin: 20px auto;
display: block;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
}
"""

122
ui/lib/files.py Normal file
View file

@ -0,0 +1,122 @@
import os
import datetime
from typing import List, Tuple, Optional
from .config import INPUTS_DIR, OUTPUTS_DIR, AUDIO_FORMATS
def list_input_files() -> List[str]:
"""List all input text files."""
return [f for f in os.listdir(INPUTS_DIR) if f.endswith(".txt")]
def list_output_files() -> List[str]:
"""List all output audio files."""
return [
os.path.join(OUTPUTS_DIR, f)
for f in os.listdir(OUTPUTS_DIR)
if any(f.endswith(ext) for ext in AUDIO_FORMATS)
]
def read_text_file(filename: str) -> str:
"""Read content of a text file."""
if not filename:
return ""
try:
file_path = os.path.join(INPUTS_DIR, filename)
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
except:
return ""
def save_text(text: str, filename: Optional[str] = None) -> Optional[str]:
"""Save text to a file. Returns the filename if successful."""
if not text.strip():
return None
if filename is None:
# Use input_1.txt, input_2.txt, etc.
base = "input"
counter = 1
while True:
filename = f"{base}_{counter}.txt"
if not os.path.exists(os.path.join(INPUTS_DIR, filename)):
break
counter += 1
else:
# Handle duplicate filenames by adding _1, _2, etc.
base = os.path.splitext(filename)[0]
ext = os.path.splitext(filename)[1] or ".txt"
counter = 1
while os.path.exists(os.path.join(INPUTS_DIR, filename)):
filename = f"{base}_{counter}{ext}"
counter += 1
filepath = os.path.join(INPUTS_DIR, filename)
try:
with open(filepath, "w", encoding="utf-8") as f:
f.write(text)
return filename
except Exception as e:
print(f"Error saving file: {e}")
return None
def delete_all_input_files() -> bool:
"""Delete all files from the inputs directory. Returns True if successful."""
try:
for filename in os.listdir(INPUTS_DIR):
if filename.endswith(".txt"):
file_path = os.path.join(INPUTS_DIR, filename)
os.remove(file_path)
return True
except Exception as e:
print(f"Error deleting input files: {e}")
return False
def delete_all_output_files() -> bool:
"""Delete all audio files from the outputs directory. Returns True if successful."""
try:
for filename in os.listdir(OUTPUTS_DIR):
if any(filename.endswith(ext) for ext in AUDIO_FORMATS):
file_path = os.path.join(OUTPUTS_DIR, filename)
os.remove(file_path)
return True
except Exception as e:
print(f"Error deleting output files: {e}")
return False
def process_uploaded_file(file_path: str) -> bool:
"""Save uploaded file to inputs directory. Returns True if successful."""
if not file_path:
return False
try:
filename = os.path.basename(file_path)
if not filename.endswith(".txt"):
return False
# Create target path in inputs directory
target_path = os.path.join(INPUTS_DIR, filename)
# If file exists, add number suffix
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(target_path):
new_name = f"{base}_{counter}{ext}"
target_path = os.path.join(INPUTS_DIR, new_name)
counter += 1
# Copy file to inputs directory
import shutil
shutil.copy2(file_path, target_path)
return True
except Exception as e:
print(f"Error saving uploaded file: {e}")
return False

240
ui/lib/handlers.py Normal file
View file

@ -0,0 +1,240 @@
import os
import shutil
import gradio as gr
from . import api, files
def setup_event_handlers(components: dict):
"""Set up all event handlers for the UI components."""
def refresh_status():
try:
is_available, voices = api.check_api_status()
status = "Available" if is_available else "Waiting for Service..."
if is_available and voices:
# Preserve current voice selection if it exists and is still valid
current_voice = components["model"]["voice"].value
default_voice = current_voice if current_voice in voices else voices[0]
return [
gr.update(
value=f"🔄 TTS Service: {status}",
interactive=True,
variant="secondary",
),
gr.update(choices=voices, value=default_voice),
]
return [
gr.update(
value=f"⌛ TTS Service: {status}",
interactive=True,
variant="secondary",
),
gr.update(choices=[], value=None),
]
except Exception as e:
print(f"Error in refresh status: {str(e)}")
return [
gr.update(
value="❌ TTS Service: Connection Error",
interactive=True,
variant="secondary",
),
gr.update(choices=[], value=None),
]
def handle_file_select(filename):
if filename:
try:
text = files.read_text_file(filename)
if text:
preview = text[:200] + "..." if len(text) > 200 else text
return gr.update(value=preview)
except Exception as e:
print(f"Error reading file: {e}")
return gr.update(value="")
def handle_file_upload(file):
if file is None:
return gr.update(choices=files.list_input_files())
try:
# Copy file to inputs directory
filename = os.path.basename(file.name)
target_path = os.path.join(files.INPUTS_DIR, filename)
# Handle duplicate filenames
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(target_path):
new_name = f"{base}_{counter}{ext}"
target_path = os.path.join(files.INPUTS_DIR, new_name)
counter += 1
shutil.copy2(file.name, target_path)
except Exception as e:
print(f"Error uploading file: {e}")
return gr.update(choices=files.list_input_files())
def generate_from_text(text, voice, format, speed):
"""Generate speech from direct text input"""
is_available, _ = api.check_api_status()
if not is_available:
gr.Warning("TTS Service is currently unavailable")
return [None, gr.update(choices=files.list_output_files())]
if not text or not text.strip():
gr.Warning("Please enter text in the input box")
return [None, gr.update(choices=files.list_output_files())]
files.save_text(text)
result = api.text_to_speech(text, voice, format, speed)
if result is None:
gr.Warning("Failed to generate speech. Please try again.")
return [None, gr.update(choices=files.list_output_files())]
return [
result,
gr.update(
choices=files.list_output_files(), value=os.path.basename(result)
),
]
def generate_from_file(selected_file, voice, format, speed):
"""Generate speech from selected file"""
is_available, _ = api.check_api_status()
if not is_available:
gr.Warning("TTS Service is currently unavailable")
return [None, gr.update(choices=files.list_output_files())]
if not selected_file:
gr.Warning("Please select a file")
return [None, gr.update(choices=files.list_output_files())]
text = files.read_text_file(selected_file)
result = api.text_to_speech(text, voice, format, speed)
if result is None:
gr.Warning("Failed to generate speech. Please try again.")
return [None, gr.update(choices=files.list_output_files())]
return [
result,
gr.update(
choices=files.list_output_files(), value=os.path.basename(result)
),
]
def play_selected(file_path):
if file_path and os.path.exists(file_path):
return gr.update(value=file_path, visible=True)
return gr.update(visible=False)
def clear_files(voice, format, speed):
"""Delete all input files and clear UI components while preserving model settings"""
files.delete_all_input_files()
return [
gr.update(value=None, choices=[]), # file_select
None, # file_upload
gr.update(value=""), # file_preview
None, # audio_output
gr.update(choices=files.list_output_files()), # output_files
gr.update(value=voice), # voice
gr.update(value=format), # format
gr.update(value=speed), # speed
]
def clear_outputs():
"""Delete all output audio files and clear audio components"""
files.delete_all_output_files()
return [
None, # audio_output
gr.update(choices=[], value=None), # output_files
gr.update(visible=False), # selected_audio
]
# Connect event handlers
components["model"]["status_btn"].click(
fn=refresh_status,
outputs=[components["model"]["status_btn"], components["model"]["voice"]],
)
components["input"]["file_select"].change(
fn=handle_file_select,
inputs=[components["input"]["file_select"]],
outputs=[components["input"]["file_preview"]],
)
components["input"]["file_upload"].upload(
fn=handle_file_upload,
inputs=[components["input"]["file_upload"]],
outputs=[components["input"]["file_select"]],
)
components["output"]["play_btn"].click(
fn=play_selected,
inputs=[components["output"]["output_files"]],
outputs=[components["output"]["selected_audio"]],
)
# Connect clear files button
components["input"]["clear_files"].click(
fn=clear_files,
inputs=[
components["model"]["voice"],
components["model"]["format"],
components["model"]["speed"],
],
outputs=[
components["input"]["file_select"],
components["input"]["file_upload"],
components["input"]["file_preview"],
components["output"]["audio_output"],
components["output"]["output_files"],
components["model"]["voice"],
components["model"]["format"],
components["model"]["speed"],
],
)
# Connect submit buttons for each tab
components["input"]["text_submit"].click(
fn=generate_from_text,
inputs=[
components["input"]["text_input"],
components["model"]["voice"],
components["model"]["format"],
components["model"]["speed"],
],
outputs=[
components["output"]["audio_output"],
components["output"]["output_files"],
],
)
# Connect clear outputs button
components["output"]["clear_outputs"].click(
fn=clear_outputs,
outputs=[
components["output"]["audio_output"],
components["output"]["output_files"],
components["output"]["selected_audio"],
],
)
components["input"]["file_submit"].click(
fn=generate_from_file,
inputs=[
components["input"]["file_select"],
components["model"]["voice"],
components["model"]["format"],
components["model"]["speed"],
],
outputs=[
components["output"]["audio_output"],
components["output"]["output_files"],
],
)

97
ui/lib/interface.py Normal file
View file

@ -0,0 +1,97 @@
import gradio as gr
from . import api
from .handlers import setup_event_handlers
from .components import create_input_column, create_model_column, create_output_column
def create_interface():
"""Create the main Gradio interface."""
# Skip initial status check - let the timer handle it
is_available, available_voices = False, []
with gr.Blocks(title="Kokoro TTS Demo", theme=gr.themes.Monochrome()) as demo:
gr.HTML(
value='<div style="display: flex; gap: 0;">'
'<a href="https://huggingface.co/hexgrad/Kokoro-82M" target="_blank" style="color: #2196F3; text-decoration: none; margin: 2px; border: 1px solid #2196F3; padding: 4px 8px; height: 24px; box-sizing: border-box; display: inline-flex; align-items: center;">Kokoro-82M HF Repo</a>'
'<a href="https://github.com/remsky/Kokoro-FastAPI" target="_blank" style="color: #2196F3; text-decoration: none; margin: 2px; border: 1px solid #2196F3; padding: 4px 8px; height: 24px; box-sizing: border-box; display: inline-flex; align-items: center;">Kokoro-FastAPI Repo</a>'
"</div>",
show_label=False,
)
# Main interface
with gr.Row():
# Create columns
input_col, input_components = create_input_column()
model_col, model_components = create_model_column(
available_voices
) # Pass initial voices
output_col, output_components = create_output_column()
# Collect all components
components = {
"input": input_components,
"model": model_components,
"output": output_components,
}
# Set up event handlers
setup_event_handlers(components)
# Add periodic status check with Timer
def update_status():
try:
is_available, voices = api.check_api_status()
status = "Available" if is_available else "Waiting for Service..."
if is_available and voices:
# Service is available, update UI and stop timer
current_voice = components["model"]["voice"].value
default_voice = (
current_voice if current_voice in voices else voices[0]
)
# Return values in same order as outputs list
return [
gr.update(
value=f"🔄 TTS Service: {status}",
interactive=True,
variant="secondary",
),
gr.update(choices=voices, value=default_voice),
gr.update(active=False), # Stop timer
]
# Service not available yet, keep checking
return [
gr.update(
value=f"⌛ TTS Service: {status}",
interactive=True,
variant="secondary",
),
gr.update(choices=[], value=None),
gr.update(active=True),
]
except Exception as e:
print(f"Error in status update: {str(e)}")
# On error, keep the timer running but show error state
return [
gr.update(
value="❌ TTS Service: Connection Error",
interactive=True,
variant="secondary",
),
gr.update(choices=[], value=None),
gr.update(active=True),
]
timer = gr.Timer(value=5) # Check every 5 seconds
timer.tick(
fn=update_status,
outputs=[
components["model"]["status_btn"],
components["model"]["voice"],
timer,
],
)
return demo

9
ui/tests/conftest.py Normal file
View file

@ -0,0 +1,9 @@
import gradio as gr
import pytest
@pytest.fixture
def mock_gr_context():
"""Provides a context for testing Gradio components"""
with gr.Blocks():
yield

129
ui/tests/test_api.py Normal file
View file

@ -0,0 +1,129 @@
from unittest.mock import patch, mock_open
import pytest
import requests
from ui.lib import api
@pytest.fixture
def mock_response():
class MockResponse:
def __init__(self, json_data, status_code=200, content=b"audio data"):
self._json = json_data
self.status_code = status_code
self.content = content
def json(self):
return self._json
def raise_for_status(self):
if self.status_code != 200:
raise requests.exceptions.HTTPError(f"HTTP {self.status_code}")
return MockResponse
def test_check_api_status_success(mock_response):
"""Test successful API status check"""
mock_data = {"voices": ["voice1", "voice2"]}
with patch("requests.get", return_value=mock_response(mock_data)):
status, voices = api.check_api_status()
assert status is True
assert voices == ["voice1", "voice2"]
def test_check_api_status_no_voices(mock_response):
"""Test API response with no voices"""
with patch("requests.get", return_value=mock_response({"voices": []})):
status, voices = api.check_api_status()
assert status is False
assert voices == []
def test_check_api_status_timeout():
"""Test API timeout"""
with patch("requests.get", side_effect=requests.exceptions.Timeout):
status, voices = api.check_api_status()
assert status is False
assert voices == []
def test_check_api_status_connection_error():
"""Test API connection error"""
with patch("requests.get", side_effect=requests.exceptions.ConnectionError):
status, voices = api.check_api_status()
assert status is False
assert voices == []
def test_text_to_speech_success(mock_response, tmp_path):
"""Test successful speech generation"""
with patch("requests.post", return_value=mock_response({})), patch(
"ui.lib.api.OUTPUTS_DIR", str(tmp_path)
), patch("builtins.open", mock_open()) as mock_file:
result = api.text_to_speech("test text", "voice1", "mp3", 1.0)
assert result is not None
assert "output_" in result
assert result.endswith(".mp3")
mock_file.assert_called_once()
def test_text_to_speech_empty_text():
"""Test speech generation with empty text"""
result = api.text_to_speech("", "voice1", "mp3", 1.0)
assert result is None
def test_text_to_speech_timeout():
"""Test speech generation timeout"""
with patch("requests.post", side_effect=requests.exceptions.Timeout):
result = api.text_to_speech("test", "voice1", "mp3", 1.0)
assert result is None
def test_text_to_speech_request_error():
"""Test speech generation request error"""
with patch("requests.post", side_effect=requests.exceptions.RequestException):
result = api.text_to_speech("test", "voice1", "mp3", 1.0)
assert result is None
def test_get_status_html_available():
"""Test status HTML generation for available service"""
html = api.get_status_html(True)
assert "green" in html
assert "Available" in html
def test_get_status_html_unavailable():
"""Test status HTML generation for unavailable service"""
html = api.get_status_html(False)
assert "red" in html
assert "Unavailable" in html
def test_text_to_speech_api_params(mock_response, tmp_path):
"""Test correct API parameters are sent"""
with patch("requests.post") as mock_post, patch(
"ui.lib.api.OUTPUTS_DIR", str(tmp_path)
), patch("builtins.open", mock_open()):
mock_post.return_value = mock_response({})
api.text_to_speech("test text", "voice1", "mp3", 1.5)
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
# Check request body
assert kwargs["json"] == {
"model": "kokoro",
"input": "test text",
"voice": "voice1",
"response_format": "mp3",
"speed": 1.5,
}
# Check headers and timeout
assert kwargs["headers"] == {"Content-Type": "application/json"}
assert kwargs["timeout"] == 300

112
ui/tests/test_components.py Normal file
View file

@ -0,0 +1,112 @@
import gradio as gr
import pytest
from ui.lib.config import AUDIO_FORMATS
from ui.lib.components.model import create_model_column
from ui.lib.components.output import create_output_column
def test_create_model_column_structure():
"""Test that create_model_column returns the expected structure"""
voice_ids = ["voice1", "voice2"]
column, components = create_model_column(voice_ids)
# Test return types
assert isinstance(column, gr.Column)
assert isinstance(components, dict)
# Test expected components presence
expected_components = {"status_btn", "voice", "format", "speed"}
assert set(components.keys()) == expected_components
# Test component types
assert isinstance(components["status_btn"], gr.Button)
assert isinstance(components["voice"], gr.Dropdown)
assert isinstance(components["format"], gr.Dropdown)
assert isinstance(components["speed"], gr.Slider)
def test_model_column_default_values():
"""Test the default values of model column components"""
voice_ids = ["voice1", "voice2"]
_, components = create_model_column(voice_ids)
# Test voice dropdown
# Gradio Dropdown converts choices to (value, label) tuples
expected_choices = [(voice_id, voice_id) for voice_id in voice_ids]
assert components["voice"].choices == expected_choices
# Value is not converted to tuple format for the value property
assert components["voice"].value == voice_ids[0]
assert components["voice"].interactive is True
# Test format dropdown
# Gradio Dropdown converts choices to (value, label) tuples
expected_format_choices = [(fmt, fmt) for fmt in AUDIO_FORMATS]
assert components["format"].choices == expected_format_choices
assert components["format"].value == "mp3"
# Test speed slider
assert components["speed"].minimum == 0.5
assert components["speed"].maximum == 2.0
assert components["speed"].value == 1.0
assert components["speed"].step == 0.1
def test_model_column_no_voices():
"""Test model column creation with no voice IDs"""
_, components = create_model_column()
assert components["voice"].choices == []
assert components["voice"].value is None
def test_create_output_column_structure():
"""Test that create_output_column returns the expected structure"""
column, components = create_output_column()
# Test return types
assert isinstance(column, gr.Column)
assert isinstance(components, dict)
# Test expected components presence
expected_components = {
"audio_output",
"output_files",
"play_btn",
"selected_audio",
"clear_outputs",
}
assert set(components.keys()) == expected_components
# Test component types
assert isinstance(components["audio_output"], gr.Audio)
assert isinstance(components["output_files"], gr.Dropdown)
assert isinstance(components["play_btn"], gr.Button)
assert isinstance(components["selected_audio"], gr.Audio)
assert isinstance(components["clear_outputs"], gr.Button)
def test_output_column_configuration():
"""Test the configuration of output column components"""
_, components = create_output_column()
# Test audio output configuration
assert components["audio_output"].label == "Generated Speech"
assert components["audio_output"].type == "filepath"
# Test output files dropdown
assert components["output_files"].label == "Previous Outputs"
assert components["output_files"].allow_custom_value is False
# Test play button
assert components["play_btn"].value == "▶️ Play Selected"
assert components["play_btn"].size == "sm"
# Test selected audio configuration
assert components["selected_audio"].label == "Selected Output"
assert components["selected_audio"].type == "filepath"
assert components["selected_audio"].visible is False
# Test clear outputs button
assert components["clear_outputs"].size == "sm"
assert components["clear_outputs"].variant == "secondary"

197
ui/tests/test_files.py Normal file
View file

@ -0,0 +1,197 @@
import os
from unittest.mock import patch
import pytest
from ui.lib import files
from ui.lib.config import AUDIO_FORMATS
@pytest.fixture
def mock_dirs(tmp_path):
"""Create temporary input and output directories"""
inputs_dir = tmp_path / "inputs"
outputs_dir = tmp_path / "outputs"
inputs_dir.mkdir()
outputs_dir.mkdir()
with patch("ui.lib.files.INPUTS_DIR", str(inputs_dir)), patch(
"ui.lib.files.OUTPUTS_DIR", str(outputs_dir)
):
yield inputs_dir, outputs_dir
def test_list_input_files_empty(mock_dirs):
"""Test listing input files from empty directory"""
assert files.list_input_files() == []
def test_list_input_files(mock_dirs):
"""Test listing input files with various files"""
inputs_dir, _ = mock_dirs
# Create test files
(inputs_dir / "test1.txt").write_text("content1")
(inputs_dir / "test2.txt").write_text("content2")
(inputs_dir / "nottext.pdf").write_text("should not be listed")
result = files.list_input_files()
assert len(result) == 2
assert "test1.txt" in result
assert "test2.txt" in result
assert "nottext.pdf" not in result
def test_list_output_files_empty(mock_dirs):
"""Test listing output files from empty directory"""
assert files.list_output_files() == []
def test_list_output_files(mock_dirs):
"""Test listing output files with various formats"""
_, outputs_dir = mock_dirs
# Create test files for each format
for fmt in AUDIO_FORMATS:
(outputs_dir / f"test.{fmt}").write_text("dummy content")
(outputs_dir / "test.txt").write_text("should not be listed")
result = files.list_output_files()
assert len(result) == len(AUDIO_FORMATS)
for fmt in AUDIO_FORMATS:
assert any(f".{fmt}" in file for file in result)
def test_read_text_file_empty_filename(mock_dirs):
"""Test reading with empty filename"""
assert files.read_text_file("") == ""
def test_read_text_file_nonexistent(mock_dirs):
"""Test reading nonexistent file"""
assert files.read_text_file("nonexistent.txt") == ""
def test_read_text_file_success(mock_dirs):
"""Test successful file reading"""
inputs_dir, _ = mock_dirs
content = "Test content\nMultiple lines"
(inputs_dir / "test.txt").write_text(content)
assert files.read_text_file("test.txt") == content
def test_save_text_empty(mock_dirs):
"""Test saving empty text"""
assert files.save_text("") is None
assert files.save_text(" ") is None
def test_save_text_auto_filename(mock_dirs):
"""Test saving text with auto-generated filename"""
inputs_dir, _ = mock_dirs
# First save
filename1 = files.save_text("content1")
assert filename1 == "input_1.txt"
assert (inputs_dir / filename1).read_text() == "content1"
# Second save
filename2 = files.save_text("content2")
assert filename2 == "input_2.txt"
assert (inputs_dir / filename2).read_text() == "content2"
def test_save_text_custom_filename(mock_dirs):
"""Test saving text with custom filename"""
inputs_dir, _ = mock_dirs
filename = files.save_text("content", "custom.txt")
assert filename == "custom.txt"
assert (inputs_dir / filename).read_text() == "content"
def test_save_text_duplicate_filename(mock_dirs):
"""Test saving text with duplicate filename"""
inputs_dir, _ = mock_dirs
# First save
filename1 = files.save_text("content1", "test.txt")
assert filename1 == "test.txt"
# Save with same filename
filename2 = files.save_text("content2", "test.txt")
assert filename2 == "test_1.txt"
assert (inputs_dir / "test.txt").read_text() == "content1"
assert (inputs_dir / "test_1.txt").read_text() == "content2"
def test_delete_all_input_files(mock_dirs):
"""Test deleting all input files"""
inputs_dir, _ = mock_dirs
# Create test files
(inputs_dir / "test1.txt").write_text("content1")
(inputs_dir / "test2.txt").write_text("content2")
(inputs_dir / "keep.pdf").write_text("should not be deleted")
assert files.delete_all_input_files() is True
remaining_files = list(inputs_dir.iterdir())
assert len(remaining_files) == 1
assert remaining_files[0].name == "keep.pdf"
def test_delete_all_output_files(mock_dirs):
"""Test deleting all output files"""
_, outputs_dir = mock_dirs
# Create test files
for fmt in AUDIO_FORMATS:
(outputs_dir / f"test.{fmt}").write_text("dummy content")
(outputs_dir / "keep.txt").write_text("should not be deleted")
assert files.delete_all_output_files() is True
remaining_files = list(outputs_dir.iterdir())
assert len(remaining_files) == 1
assert remaining_files[0].name == "keep.txt"
def test_process_uploaded_file_empty_path(mock_dirs):
"""Test processing empty file path"""
assert files.process_uploaded_file("") is False
def test_process_uploaded_file_invalid_extension(mock_dirs, tmp_path):
"""Test processing file with invalid extension"""
test_file = tmp_path / "test.pdf"
test_file.write_text("content")
assert files.process_uploaded_file(str(test_file)) is False
def test_process_uploaded_file_success(mock_dirs, tmp_path):
"""Test successful file upload processing"""
inputs_dir, _ = mock_dirs
# Create source file
source_file = tmp_path / "test.txt"
source_file.write_text("test content")
assert files.process_uploaded_file(str(source_file)) is True
assert (inputs_dir / "test.txt").read_text() == "test content"
def test_process_uploaded_file_duplicate(mock_dirs, tmp_path):
"""Test processing file with duplicate name"""
inputs_dir, _ = mock_dirs
# Create existing file
(inputs_dir / "test.txt").write_text("existing content")
# Create source file
source_file = tmp_path / "test.txt"
source_file.write_text("new content")
assert files.process_uploaded_file(str(source_file)) is True
assert (inputs_dir / "test.txt").read_text() == "existing content"
assert (inputs_dir / "test_1.txt").read_text() == "new content"

View file

@ -0,0 +1,4 @@
"""
Drop all tests for now. The Gradio event system is too complex to test properly.
We'll need to find a better way to test the UI functionality.
"""

75
ui/tests/test_input.py Normal file
View file

@ -0,0 +1,75 @@
import gradio as gr
import pytest
from ui.lib.components.input import create_input_column
def test_create_input_column_structure():
"""Test that create_input_column returns the expected structure"""
column, components = create_input_column()
# Test the return types
assert isinstance(column, gr.Column)
assert isinstance(components, dict)
# Test that all expected components are present
expected_components = {
"tabs",
"text_input",
"file_select",
"file_upload",
"file_preview",
"text_submit",
"file_submit",
"clear_files",
}
assert set(components.keys()) == expected_components
# Test component types
assert isinstance(components["tabs"], gr.Tabs)
assert isinstance(components["text_input"], gr.Textbox)
assert isinstance(components["file_select"], gr.Dropdown)
assert isinstance(components["file_upload"], gr.File)
assert isinstance(components["file_preview"], gr.Textbox)
assert isinstance(components["text_submit"], gr.Button)
assert isinstance(components["file_submit"], gr.Button)
assert isinstance(components["clear_files"], gr.Button)
def test_text_input_configuration():
"""Test the text input component configuration"""
_, components = create_input_column()
text_input = components["text_input"]
assert text_input.label == "Text to speak"
assert text_input.placeholder == "Enter text here..."
assert text_input.lines == 4
def test_file_upload_configuration():
"""Test the file upload component configuration"""
_, components = create_input_column()
file_upload = components["file_upload"]
assert file_upload.label == "Upload Text File (.txt)"
assert file_upload.file_types == [".txt"]
def test_button_configurations():
"""Test the button configurations"""
_, components = create_input_column()
# Test text submit button
assert components["text_submit"].value == "Generate Speech"
assert components["text_submit"].variant == "primary"
assert components["text_submit"].size == "lg"
# Test file submit button
assert components["file_submit"].value == "Generate Speech"
assert components["file_submit"].variant == "primary"
assert components["file_submit"].size == "lg"
# Test clear files button
assert components["clear_files"].value == "Clear Files"
assert components["clear_files"].variant == "secondary"
assert components["clear_files"].size == "lg"

146
ui/tests/test_interface.py Normal file
View file

@ -0,0 +1,146 @@
from unittest.mock import MagicMock, PropertyMock, patch
import gradio as gr
import pytest
from ui.lib.interface import create_interface
@pytest.fixture
def mock_timer():
"""Create a mock timer with events property"""
class MockEvent:
def __init__(self, fn):
self.fn = fn
class MockTimer:
def __init__(self):
self._fn = None
self.value = 5
@property
def events(self):
return [MockEvent(self._fn)] if self._fn else []
def tick(self, fn, outputs):
self._fn = fn
return MockTimer()
def test_create_interface_structure():
"""Test the basic structure of the created interface"""
with patch("ui.lib.api.check_api_status", return_value=(False, [])):
demo = create_interface()
# Test interface type and theme
assert isinstance(demo, gr.Blocks)
assert demo.title == "Kokoro TTS Demo"
assert isinstance(demo.theme, gr.themes.Monochrome)
def test_interface_html_links():
"""Test that HTML links are properly configured"""
with patch("ui.lib.api.check_api_status", return_value=(False, [])):
demo = create_interface()
# Find HTML component
html_components = [
comp for comp in demo.blocks.values() if isinstance(comp, gr.HTML)
]
assert len(html_components) > 0
html = html_components[0]
# Check for required links
assert 'href="https://huggingface.co/hexgrad/Kokoro-82M"' in html.value
assert 'href="https://github.com/remsky/Kokoro-FastAPI"' in html.value
assert "Kokoro-82M HF Repo" in html.value
assert "Kokoro-FastAPI Repo" in html.value
def test_update_status_available(mock_timer):
"""Test status update when service is available"""
voices = ["voice1", "voice2"]
with patch("ui.lib.api.check_api_status", return_value=(True, voices)), patch(
"gradio.Timer", return_value=mock_timer
):
demo = create_interface()
# Get the update function
update_fn = mock_timer.events[0].fn
# Test update with available service
updates = update_fn()
assert "Available" in updates[0]["value"]
assert updates[1]["choices"] == voices
assert updates[1]["value"] == voices[0]
assert updates[2]["active"] is False # Timer should stop
def test_update_status_unavailable(mock_timer):
"""Test status update when service is unavailable"""
with patch("ui.lib.api.check_api_status", return_value=(False, [])), patch(
"gradio.Timer", return_value=mock_timer
):
demo = create_interface()
update_fn = mock_timer.events[0].fn
updates = update_fn()
assert "Waiting for Service" in updates[0]["value"]
assert updates[1]["choices"] == []
assert updates[1]["value"] is None
assert updates[2]["active"] is True # Timer should continue
def test_update_status_error(mock_timer):
"""Test status update when an error occurs"""
with patch(
"ui.lib.api.check_api_status", side_effect=Exception("Test error")
), patch("gradio.Timer", return_value=mock_timer):
demo = create_interface()
update_fn = mock_timer.events[0].fn
updates = update_fn()
assert "Connection Error" in updates[0]["value"]
assert updates[1]["choices"] == []
assert updates[1]["value"] is None
assert updates[2]["active"] is True # Timer should continue
def test_timer_configuration(mock_timer):
"""Test timer configuration"""
with patch("ui.lib.api.check_api_status", return_value=(False, [])), patch(
"gradio.Timer", return_value=mock_timer
):
demo = create_interface()
assert mock_timer.value == 5 # Check interval is 5 seconds
assert len(mock_timer.events) == 1 # Should have one event handler
def test_interface_components_presence():
"""Test that all required components are present"""
with patch("ui.lib.api.check_api_status", return_value=(False, [])):
demo = create_interface()
# Check for main component sections
components = {
comp.label
for comp in demo.blocks.values()
if hasattr(comp, "label") and comp.label
}
required_components = {
"Text to speak",
"Voice",
"Audio Format",
"Speed",
"Generated Speech",
"Previous Outputs",
}
assert required_components.issubset(components)