Feature: Support silence tags. eg:[silent](0.5s)

This commit is contained in:
fondoger 2025-04-04 02:50:41 +08:00
parent e2313abe72
commit 2e2b3e9e0e
4 changed files with 93 additions and 3 deletions

View file

@ -116,6 +116,7 @@ class AudioService:
speed: float = 1,
chunk_text: str = "",
is_last_chunk: bool = False,
is_silent_chunk: bool = False,
trim_audio: bool = True,
normalizer: AudioNormalizer = None,
) -> AudioChunk:
@ -128,6 +129,7 @@ class AudioService:
speed: The speaking speed of the voice
chunk_text: The text sent to the model to generate the resulting speech
is_last_chunk: Whether this is the last chunk
is_silent_chunk: Whether this chunk is a silent tag (e.g., [Silent](0.5s))
trim_audio: Whether audio should be trimmed
normalizer: Optional AudioNormalizer instance for consistent normalization
@ -146,7 +148,7 @@ class AudioService:
audio_chunk.audio = normalizer.normalize(audio_chunk.audio)
if trim_audio == True:
if trim_audio == True and not is_silent_chunk:
audio_chunk = AudioService.trim_audio(audio_chunk,chunk_text,speed,is_last_chunk,normalizer)
# Write audio data first

View file

@ -14,6 +14,8 @@ from ...structures.schemas import NormalizationOptions
# Pre-compiled regex patterns for performance
CUSTOM_PHONEMES = re.compile(r"(\[([^\]]|\n)*?\])(\(\/([^\/)]|\n)*?\/\))")
# Matching: [silent](/1s/), [silent](/0.5s/), [silent](/.5s/)
CUSTOM_PHONEME_SILENCE_TAG = re.compile(r"\[silent\]\(\/(\d*\.?\d+)s\/\)")
def process_text_chunk(
text: str, language: str = "a", skip_phonemize: bool = False
@ -89,7 +91,14 @@ def process_text(text: str, language: str = "a") -> List[int]:
def get_sentence_info(text: str, custom_phenomes_list: Dict[str, str]) -> List[Tuple[str, List[int], int]]:
"""Process all sentences and return info."""
"""
Process all sentences and return info.
Possible List Values:
- (sentence, tokens, token_count)
- (silence_tag, [], 0)
"""
sentences = re.split(r"([.!?;:])(?=\s|$)", text)
phoneme_length, min_value = len(custom_phenomes_list), 0
@ -102,6 +111,19 @@ def get_sentence_info(text: str, custom_phenomes_list: Dict[str, str]) -> List[T
sentence = sentence.replace(current_id, custom_phenomes_list.pop(current_id))
min_value += 1
# Handle silence tags
# Eg: "This is a test sentence, [silent](/1s/) with silence for one second."
while match := CUSTOM_PHONEME_SILENCE_TAG.search(sentence):
match_prefix = sentence[:match.start()] # `This is a test sentence, `
match_text = match.group(0) # `[silent](/1s/)`
match_suffix = sentence[match.end():] # ` with silence for one second.`
if match_prefix.strip():
tokens = process_text_chunk(match_prefix.strip())
results.append((match_prefix, tokens, len(tokens)))
# Insert silence tag with empty tokens
results.append((match_text, [], 0))
sentence = match_suffix
punct = sentences[i + 1] if i + 1 < len(sentences) else ""
@ -149,6 +171,25 @@ async def smart_split(
current_count = 0
for sentence, tokens, count in sentences:
# Handle silence tags
if CUSTOM_PHONEME_SILENCE_TAG.match(sentence):
# Yield any existing chunk if present.
if current_chunk:
chunk_text = " ".join(current_chunk)
chunk_count += 1
logger.debug(
f"Yielding chunk {chunk_count} before silence tag: '{chunk_text[:50]}{'...' if len(text) > 50 else ''}' ({current_count} tokens)"
)
yield chunk_text, current_tokens
current_chunk = []
current_tokens = []
current_count = 0
# Silent tags is not sent to Kokoro, we we don't increment `chunk_count`
logger.debug(f"Yielding silence tag: '{sentence}'")
yield sentence, []
continue
# Handle sentences that exceed max tokens
if count > max_tokens:
# Yield current chunk if any

View file

@ -21,7 +21,7 @@ from ..inference.voice_manager import get_manager as get_voice_manager
from ..structures.schemas import NormalizationOptions
from .audio import AudioNormalizer, AudioService
from .text_processing import tokenize
from .text_processing.text_processor import process_text_chunk, smart_split
from .text_processing.text_processor import CUSTOM_PHONEME_SILENCE_TAG, smart_split
class TTSService:
@ -62,6 +62,25 @@ class TTSService:
"""Process tokens into audio."""
async with self._chunk_semaphore:
try:
# Handle silence tags, eg: `[silent](0.5s)`
if match := CUSTOM_PHONEME_SILENCE_TAG.match(chunk_text):
silence_duration = float(match.group(1))
silence_audio = np.zeros(int(silence_duration * 24000), dtype=np.float32)
if not output_format:
yield AudioChunk(silence_audio, output=b"")
return
chunk_data = await AudioService.convert_audio(
AudioChunk(silence_audio),
output_format,
writer,
speed,
chunk_text,
normalizer=normalizer,
is_silent_chunk=True,
)
yield chunk_data
return
# Handle stream finalization
if is_last:
# Skip format conversion for raw audio mode

View file

@ -58,6 +58,20 @@ def test_get_sentence_info_phenomoes():
assert count == len(tokens)
assert count > 0
def test_get_sentence_info_silence_tags():
"""Test sentence splitting and info extraction with silence tags."""
text = "This is a test sentence, [silent](/1s/) with silence for one second."
results = get_sentence_info(text, {})
assert len(results) == 3
assert results[1][0] == "[silent](/1s/)"
for sentence, tokens, count in results:
assert isinstance(sentence, str)
assert isinstance(tokens, list)
assert isinstance(count, int)
assert count == len(tokens)
assert count >= 0
@pytest.mark.asyncio
async def test_smart_split_short_text():
"""Test smart splitting with text under max tokens."""
@ -99,3 +113,17 @@ async def test_smart_split_with_punctuation():
# Verify punctuation is preserved
assert all(any(p in chunk for p in "!?;:.") for chunk in chunks)
@pytest.mark.asyncio
async def test_smart_split_with_silence_tags():
"""Test smart splitting handles silence tags correctly."""
text = "This is a test sentence, [silent](/1s/) with silence for one second."
chunks = []
async for chunk_text, chunk_tokens in smart_split(text):
chunks.append(chunk_text)
assert len(chunks) == 3
assert chunks[0] == "This is a test sentence, "
assert chunks[1] == "[silent](/1s/)"
assert chunks[2] == " with silence for one second."