Merge branch 'remsky:master' into master

This commit is contained in:
Fireblade2534 2025-05-09 09:08:55 -04:00 committed by GitHub
commit 243d98e339
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 281 additions and 49 deletions

View file

@ -104,7 +104,7 @@ async def generate_from_phonemes(
if chunk_audio is not None:
# Normalize audio before writing
normalized_audio = await normalizer.normalize(chunk_audio)
normalized_audio = normalizer.normalize(chunk_audio)
# Write chunk and yield bytes
chunk_bytes = writer.write_chunk(normalized_audio)
if chunk_bytes:
@ -114,6 +114,7 @@ async def generate_from_phonemes(
final_bytes = writer.write_chunk(finalize=True)
if final_bytes:
yield final_bytes
writer.close()
else:
raise ValueError("Failed to generate audio data")

View file

@ -4,8 +4,10 @@ Handles various text formats including URLs, emails, numbers, money, and special
Converts them into a format suitable for text-to-speech processing.
"""
import math
import re
from functools import lru_cache
from typing import List, Optional, Union
import inflect
from numpy import number
@ -132,6 +134,7 @@ VALID_UNITS = {
"px": "pixel", # CSS units
}
MONEY_UNITS = {"$": ("dollar", "cent"), "£": ("pound", "pence"), "": ("euro", "cent")}
# Pre-compiled regex patterns for performance
EMAIL_PATTERN = re.compile(
@ -152,37 +155,24 @@ UNIT_PATTERN = re.compile(
)
TIME_PATTERN = re.compile(
r"([0-9]{2} ?: ?[0-9]{2}( ?: ?[0-9]{2})?)( ?(pm|am)\b)?", re.IGNORECASE
r"([0-9]{1,2} ?: ?[0-9]{2}( ?: ?[0-9]{2})?)( ?(pm|am)\b)?", re.IGNORECASE
)
MONEY_PATTERN = re.compile(
r"(-?)(["
+ "".join(MONEY_UNITS.keys())
+ r"])(\d+(?:\.\d+)?)((?: hundred| thousand| (?:[bm]|tr|quadr)illion|k|m|b|t)*)\b",
re.IGNORECASE,
)
NUMBER_PATTERN = re.compile(
r"(-?)(\d+(?:\.\d+)?)((?: hundred| thousand| (?:[bm]|tr|quadr)illion|k|m|b)*)\b",
re.IGNORECASE,
)
INFLECT_ENGINE = inflect.engine()
def split_num(num: re.Match[str]) -> str:
"""Handle number splitting for various formats"""
num = num.group()
if "." in num:
return num
elif ":" in num:
h, m = [int(n) for n in num.split(":")]
if m == 0:
return f"{h} o'clock"
elif m < 10:
return f"{h} oh {m}"
return f"{h} {m}"
year = int(num[:4])
if year < 1100 or year % 1000 < 10:
return num
left, right = num[:2], int(num[2:4])
s = "s" if num.endswith("s") else ""
if 100 <= year % 1000 <= 999:
if right == 0:
return f"{left} hundred{s}"
elif right < 10:
return f"{left} oh {right}{s}"
return f"{left} {right}{s}"
def handle_units(u: re.Match[str]) -> str:
"""Converts units to their full form"""
unit_string = u.group(6).strip()
@ -208,14 +198,61 @@ def conditional_int(number: float, threshold: float = 0.00001):
return number
def translate_multiplier(multiplier: str) -> str:
"""Translate multiplier abrevations to words"""
multiplier_translation = {
"k": "thousand",
"m": "million",
"b": "billion",
"t": "trillion",
}
if multiplier.lower() in multiplier_translation:
return multiplier_translation[multiplier.lower()]
return multiplier.strip()
def split_four_digit(number: float):
part1 = str(conditional_int(number))[:2]
part2 = str(conditional_int(number))[2:]
return f"{INFLECT_ENGINE.number_to_words(part1)} {INFLECT_ENGINE.number_to_words(part2)}"
def handle_numbers(n: re.Match[str]) -> str:
number = n.group(2)
try:
number = float(number)
except:
return n.group()
if n.group(1) == "-":
number *= -1
multiplier = translate_multiplier(n.group(3))
number = conditional_int(number)
if multiplier != "":
multiplier = f" {multiplier}"
else:
if (
number % 1 == 0
and len(str(number)) == 4
and number > 1500
and number % 1000 > 9
):
return split_four_digit(number)
return f"{INFLECT_ENGINE.number_to_words(number)}{multiplier}"
def handle_money(m: re.Match[str]) -> str:
"""Convert money expressions to spoken form"""
bill = "dollar" if m.group(2) == "$" else "pound"
coin = "cent" if m.group(2) == "$" else "pence"
bill, coin = MONEY_UNITS[m.group(2)]
number = m.group(3)
multiplier = m.group(4)
try:
number = float(number)
except:
@ -224,12 +261,17 @@ def handle_money(m: re.Match[str]) -> str:
if m.group(1) == "-":
number *= -1
multiplier = translate_multiplier(m.group(4))
if multiplier != "":
multiplier = f" {multiplier}"
if number % 1 == 0 or multiplier != "":
text_number = f"{INFLECT_ENGINE.number_to_words(conditional_int(number))}{multiplier} {INFLECT_ENGINE.plural(bill, count=number)}"
else:
sub_number = int(str(number).split(".")[-1].ljust(2, "0"))
text_number = f"{INFLECT_ENGINE.number_to_words(int(round(number)))} {INFLECT_ENGINE.plural(bill, count=number)} and {INFLECT_ENGINE.number_to_words(sub_number)} {INFLECT_ENGINE.plural(coin, count=sub_number)}"
text_number = f"{INFLECT_ENGINE.number_to_words(int(math.floor(number)))} {INFLECT_ENGINE.plural(bill, count=number)} and {INFLECT_ENGINE.number_to_words(sub_number)} {INFLECT_ENGINE.plural(coin, count=sub_number)}"
return text_number
@ -320,15 +362,31 @@ def handle_phone_number(p: re.Match[str]) -> str:
def handle_time(t: re.Match[str]) -> str:
t = t.groups()
numbers = " ".join(
[INFLECT_ENGINE.number_to_words(X.strip()) for X in t[0].split(":")]
)
time_parts = t[0].split(":")
numbers = []
numbers.append(INFLECT_ENGINE.number_to_words(time_parts[0].strip()))
minute_number = INFLECT_ENGINE.number_to_words(time_parts[1].strip())
if int(time_parts[1]) < 10:
if int(time_parts[1]) != 0:
numbers.append(f"oh {minute_number}")
else:
numbers.append(minute_number)
half = ""
if t[2] is not None:
half = t[2].strip()
if len(time_parts) > 2:
seconds_number = INFLECT_ENGINE.number_to_words(time_parts[2].strip())
second_word = INFLECT_ENGINE.plural("second", int(time_parts[2].strip()))
numbers.append(f"and {seconds_number} {second_word}")
else:
if t[2] is not None:
half = " " + t[2].strip()
else:
if int(time_parts[1]) == 0:
numbers.append("o'clock")
return numbers + half
return " ".join(numbers) + half
def normalize_text(text: str, normalization_options: NormalizationOptions) -> str:
@ -366,7 +424,7 @@ def normalize_text(text: str, normalization_options: NormalizationOptions) -> st
for a, b in zip("、。!,:;?–", ",.!,:;?-"):
text = text.replace(a, b + " ")
# Handle simple time in the format of HH:MM:SS
# Handle simple time in the format of HH:MM:SS (am/pm)
text = TIME_PATTERN.sub(
handle_time,
text,
@ -390,15 +448,12 @@ def normalize_text(text: str, normalization_options: NormalizationOptions) -> st
# Handle numbers and money
text = re.sub(r"(?<=\d),(?=\d)", "", text)
text = re.sub(
r"(?i)(-?)([$£])(\d+(?:\.\d+)?)((?: hundred| thousand| (?:[bm]|tr|quadr)illion)*)\b",
text = MONEY_PATTERN.sub(
handle_money,
text,
)
text = re.sub(
r"\d*\.\d+|\b\d{4}s?\b|(?<!:)\b(?:[1-9]|1[0-2]):[0-5]\d\b(?!:)", split_num, text
)
text = NUMBER_PATTERN.sub(handle_numbers, text)
text = re.sub(r"\d*\.\d+", handle_decimal, text)

View file

@ -57,19 +57,19 @@ def test_url_localhost():
normalize_text(
"Running on localhost:7860", normalization_options=NormalizationOptions()
)
== "Running on localhost colon 78 60"
== "Running on localhost colon seventy-eight sixty"
)
assert (
normalize_text(
"Server at localhost:8080/api", normalization_options=NormalizationOptions()
)
== "Server at localhost colon 80 80 slash api"
== "Server at localhost colon eighty eighty slash api"
)
assert (
normalize_text(
"Test localhost:3000/test?v=1", normalization_options=NormalizationOptions()
)
== "Test localhost colon 3000 slash test question-mark v equals 1"
== "Test localhost colon three thousand slash test question-mark v equals one"
)
@ -79,17 +79,17 @@ def test_url_ip_addresses():
normalize_text(
"Access 0.0.0.0:9090/test", normalization_options=NormalizationOptions()
)
== "Access 0 dot 0 dot 0 dot 0 colon 90 90 slash test"
== "Access zero dot zero dot zero dot zero colon ninety ninety slash test"
)
assert (
normalize_text(
"API at 192.168.1.1:8000", normalization_options=NormalizationOptions()
)
== "API at 192 dot 168 dot 1 dot 1 colon 8000"
== "API at one hundred and ninety-two dot one hundred and sixty-eight dot one dot one colon eight thousand"
)
assert (
normalize_text("Server 127.0.0.1", normalization_options=NormalizationOptions())
== "Server 127 dot 0 dot 0 dot 1"
== "Server one hundred and twenty-seven dot zero dot zero dot one"
)
@ -146,6 +146,15 @@ def test_money():
)
== "He lost five point three thousand dollars."
)
assert (
normalize_text(
"He went gambling and lost about $25.05k.",
normalization_options=NormalizationOptions(),
)
== "He went gambling and lost about twenty-five point zero five thousand dollars."
)
assert (
normalize_text(
"To put it weirdly -$6.9 million",
@ -153,11 +162,140 @@ def test_money():
)
== "To put it weirdly minus six point nine million dollars"
)
assert (
normalize_text("It costs $50.3.", normalization_options=NormalizationOptions())
== "It costs fifty dollars and thirty cents."
)
assert (
normalize_text(
"The plant cost $200,000.8.", normalization_options=NormalizationOptions()
)
== "The plant cost two hundred thousand dollars and eighty cents."
)
assert (
normalize_text(
"€30.2 is in euros", normalization_options=NormalizationOptions()
)
== "thirty euros and twenty cents is in euros"
)
def test_time():
"""Test time normalization"""
assert (
normalize_text(
"Your flight leaves at 10:35 pm",
normalization_options=NormalizationOptions(),
)
== "Your flight leaves at ten thirty-five pm"
)
assert (
normalize_text(
"He departed for london around 5:03 am.",
normalization_options=NormalizationOptions(),
)
== "He departed for london around five oh three am."
)
assert (
normalize_text(
"Only the 13:42 and 15:12 slots are available.",
normalization_options=NormalizationOptions(),
)
== "Only the thirteen forty-two and fifteen twelve slots are available."
)
assert (
normalize_text(
"It is currently 1:00 pm", normalization_options=NormalizationOptions()
)
== "It is currently one pm"
)
assert (
normalize_text(
"It is currently 3:00", normalization_options=NormalizationOptions()
)
== "It is currently three o'clock"
)
assert (
normalize_text(
"12:00 am is midnight", normalization_options=NormalizationOptions()
)
== "twelve am is midnight"
)
def test_number():
"""Test number normalization"""
assert (
normalize_text(
"I bought 1035 cans of soda", normalization_options=NormalizationOptions()
)
== "I bought one thousand and thirty-five cans of soda"
)
assert (
normalize_text(
"The bus has a maximum capacity of 62 people",
normalization_options=NormalizationOptions(),
)
== "The bus has a maximum capacity of sixty-two people"
)
assert (
normalize_text(
"There are 1300 products left in stock",
normalization_options=NormalizationOptions(),
)
== "There are one thousand, three hundred products left in stock"
)
assert (
normalize_text(
"The population is 7,890,000 people.",
normalization_options=NormalizationOptions(),
)
== "The population is seven million, eight hundred and ninety thousand people."
)
assert (
normalize_text(
"He looked around but only found 1.6k of the 10k bricks",
normalization_options=NormalizationOptions(),
)
== "He looked around but only found one point six thousand of the ten thousand bricks"
)
assert (
normalize_text(
"The book has 342 pages.", normalization_options=NormalizationOptions()
)
== "The book has three hundred and forty-two pages."
)
assert (
normalize_text(
"He made -50 sales today.", normalization_options=NormalizationOptions()
)
== "He made minus fifty sales today."
)
assert (
normalize_text(
"56.789 to the power of 1.35 million",
normalization_options=NormalizationOptions(),
)
== "fifty-six point seven eight nine to the power of one point three five million"
)
def test_non_url_text():
"""Test that non-URL text is unaffected"""

38
dev/Test copy 2.py Normal file
View file

@ -0,0 +1,38 @@
import base64
import json
import pydub
import requests
text = """Running on localhost:7860"""
Type = "wav"
response = requests.post(
"http://localhost:8880/dev/captioned_speech",
json={
"model": "kokoro",
"input": text,
"voice": "af_heart+af_sky",
"speed": 1.0,
"response_format": Type,
"stream": True,
},
stream=True,
)
f = open(f"outputstream.{Type}", "wb")
for chunk in response.iter_lines(decode_unicode=True):
if chunk:
temp_json = json.loads(chunk)
if temp_json["timestamps"] != []:
chunk_json = temp_json
# Decode base 64 stream to bytes
chunk_audio = base64.b64decode(temp_json["audio"].encode("utf-8"))
# Process streaming chunks
f.write(chunk_audio)
# Print word level timestamps
print(chunk_json["timestamps"])