From 2e2b3e9e0e3ce120418a7676ce04a987ad4d69d3 Mon Sep 17 00:00:00 2001 From: fondoger Date: Fri, 4 Apr 2025 02:50:41 +0800 Subject: [PATCH] Feature: Support silence tags. eg:`[silent](0.5s)` --- api/src/services/audio.py | 4 +- .../text_processing/text_processor.py | 43 ++++++++++++++++++- api/src/services/tts_service.py | 21 ++++++++- api/tests/test_text_processor.py | 28 ++++++++++++ 4 files changed, 93 insertions(+), 3 deletions(-) diff --git a/api/src/services/audio.py b/api/src/services/audio.py index 5e344ec..f0f1230 100644 --- a/api/src/services/audio.py +++ b/api/src/services/audio.py @@ -116,6 +116,7 @@ class AudioService: speed: float = 1, chunk_text: str = "", is_last_chunk: bool = False, + is_silent_chunk: bool = False, trim_audio: bool = True, normalizer: AudioNormalizer = None, ) -> AudioChunk: @@ -128,6 +129,7 @@ class AudioService: speed: The speaking speed of the voice chunk_text: The text sent to the model to generate the resulting speech is_last_chunk: Whether this is the last chunk + is_silent_chunk: Whether this chunk is a silent tag (e.g., [Silent](0.5s)) trim_audio: Whether audio should be trimmed normalizer: Optional AudioNormalizer instance for consistent normalization @@ -146,7 +148,7 @@ class AudioService: audio_chunk.audio = normalizer.normalize(audio_chunk.audio) - if trim_audio == True: + if trim_audio == True and not is_silent_chunk: audio_chunk = AudioService.trim_audio(audio_chunk,chunk_text,speed,is_last_chunk,normalizer) # Write audio data first diff --git a/api/src/services/text_processing/text_processor.py b/api/src/services/text_processing/text_processor.py index 0bd4658..0fa711d 100644 --- a/api/src/services/text_processing/text_processor.py +++ b/api/src/services/text_processing/text_processor.py @@ -14,6 +14,8 @@ from ...structures.schemas import NormalizationOptions # Pre-compiled regex patterns for performance CUSTOM_PHONEMES = re.compile(r"(\[([^\]]|\n)*?\])(\(\/([^\/)]|\n)*?\/\))") +# Matching: [silent](/1s/), [silent](/0.5s/), [silent](/.5s/) +CUSTOM_PHONEME_SILENCE_TAG = re.compile(r"\[silent\]\(\/(\d*\.?\d+)s\/\)") def process_text_chunk( text: str, language: str = "a", skip_phonemize: bool = False @@ -89,7 +91,14 @@ def process_text(text: str, language: str = "a") -> List[int]: def get_sentence_info(text: str, custom_phenomes_list: Dict[str, str]) -> List[Tuple[str, List[int], int]]: - """Process all sentences and return info.""" + """ + Process all sentences and return info. + + Possible List Values: + - (sentence, tokens, token_count) + - (silence_tag, [], 0) + """ + sentences = re.split(r"([.!?;:])(?=\s|$)", text) phoneme_length, min_value = len(custom_phenomes_list), 0 @@ -102,6 +111,19 @@ def get_sentence_info(text: str, custom_phenomes_list: Dict[str, str]) -> List[T sentence = sentence.replace(current_id, custom_phenomes_list.pop(current_id)) min_value += 1 + # Handle silence tags + # Eg: "This is a test sentence, [silent](/1s/) with silence for one second." + while match := CUSTOM_PHONEME_SILENCE_TAG.search(sentence): + match_prefix = sentence[:match.start()] # `This is a test sentence, ` + match_text = match.group(0) # `[silent](/1s/)` + match_suffix = sentence[match.end():] # ` with silence for one second.` + if match_prefix.strip(): + tokens = process_text_chunk(match_prefix.strip()) + results.append((match_prefix, tokens, len(tokens))) + + # Insert silence tag with empty tokens + results.append((match_text, [], 0)) + sentence = match_suffix punct = sentences[i + 1] if i + 1 < len(sentences) else "" @@ -149,6 +171,25 @@ async def smart_split( current_count = 0 for sentence, tokens, count in sentences: + # Handle silence tags + if CUSTOM_PHONEME_SILENCE_TAG.match(sentence): + # Yield any existing chunk if present. + if current_chunk: + chunk_text = " ".join(current_chunk) + chunk_count += 1 + logger.debug( + f"Yielding chunk {chunk_count} before silence tag: '{chunk_text[:50]}{'...' if len(text) > 50 else ''}' ({current_count} tokens)" + ) + yield chunk_text, current_tokens + current_chunk = [] + current_tokens = [] + current_count = 0 + + # Silent tags is not sent to Kokoro, we we don't increment `chunk_count` + logger.debug(f"Yielding silence tag: '{sentence}'") + yield sentence, [] + continue + # Handle sentences that exceed max tokens if count > max_tokens: # Yield current chunk if any diff --git a/api/src/services/tts_service.py b/api/src/services/tts_service.py index 8a6bb42..4195952 100644 --- a/api/src/services/tts_service.py +++ b/api/src/services/tts_service.py @@ -21,7 +21,7 @@ from ..inference.voice_manager import get_manager as get_voice_manager from ..structures.schemas import NormalizationOptions from .audio import AudioNormalizer, AudioService from .text_processing import tokenize -from .text_processing.text_processor import process_text_chunk, smart_split +from .text_processing.text_processor import CUSTOM_PHONEME_SILENCE_TAG, smart_split class TTSService: @@ -62,6 +62,25 @@ class TTSService: """Process tokens into audio.""" async with self._chunk_semaphore: try: + # Handle silence tags, eg: `[silent](0.5s)` + if match := CUSTOM_PHONEME_SILENCE_TAG.match(chunk_text): + silence_duration = float(match.group(1)) + silence_audio = np.zeros(int(silence_duration * 24000), dtype=np.float32) + if not output_format: + yield AudioChunk(silence_audio, output=b"") + return + chunk_data = await AudioService.convert_audio( + AudioChunk(silence_audio), + output_format, + writer, + speed, + chunk_text, + normalizer=normalizer, + is_silent_chunk=True, + ) + yield chunk_data + return + # Handle stream finalization if is_last: # Skip format conversion for raw audio mode diff --git a/api/tests/test_text_processor.py b/api/tests/test_text_processor.py index 7e5fb0f..1470631 100644 --- a/api/tests/test_text_processor.py +++ b/api/tests/test_text_processor.py @@ -58,6 +58,20 @@ def test_get_sentence_info_phenomoes(): assert count == len(tokens) assert count > 0 +def test_get_sentence_info_silence_tags(): + """Test sentence splitting and info extraction with silence tags.""" + text = "This is a test sentence, [silent](/1s/) with silence for one second." + results = get_sentence_info(text, {}) + + assert len(results) == 3 + assert results[1][0] == "[silent](/1s/)" + for sentence, tokens, count in results: + assert isinstance(sentence, str) + assert isinstance(tokens, list) + assert isinstance(count, int) + assert count == len(tokens) + assert count >= 0 + @pytest.mark.asyncio async def test_smart_split_short_text(): """Test smart splitting with text under max tokens.""" @@ -99,3 +113,17 @@ async def test_smart_split_with_punctuation(): # Verify punctuation is preserved assert all(any(p in chunk for p in "!?;:.") for chunk in chunks) + +@pytest.mark.asyncio +async def test_smart_split_with_silence_tags(): + """Test smart splitting handles silence tags correctly.""" + text = "This is a test sentence, [silent](/1s/) with silence for one second." + + chunks = [] + async for chunk_text, chunk_tokens in smart_split(text): + chunks.append(chunk_text) + + assert len(chunks) == 3 + assert chunks[0] == "This is a test sentence, " + assert chunks[1] == "[silent](/1s/)" + assert chunks[2] == " with silence for one second."