mirror of
https://github.com/santinic/audiblez.git
synced 2025-09-18 21:40:39 +00:00
core<->ui events!
This commit is contained in:
parent
e6ad9ac412
commit
372ff5cbbc
3 changed files with 123 additions and 22 deletions
|
@ -2,6 +2,8 @@
|
|||
# audiblez - A program to convert e-books into audiobooks using
|
||||
# Kokoro-82M model for high-quality text-to-speech synthesis.
|
||||
# by Claudio Santini 2025 - https://claudio.uk
|
||||
from io import StringIO
|
||||
|
||||
import torch.cuda
|
||||
import spacy
|
||||
import ebooklib
|
||||
|
@ -11,6 +13,8 @@ import time
|
|||
import shutil
|
||||
import subprocess
|
||||
import re
|
||||
|
||||
from markdown import Markdown
|
||||
from tabulate import tabulate
|
||||
from pathlib import Path
|
||||
from string import Formatter
|
||||
|
@ -31,12 +35,15 @@ def load_spacy():
|
|||
|
||||
|
||||
def main(file_path, voice, pick_manually, speed, output_folder='.',
|
||||
max_chapters=None, max_sentences=None, selected_chapters=None):
|
||||
max_chapters=None, max_sentences=None, selected_chapters=None, post_event=None):
|
||||
if post_event: post_event('CORE_STARTED')
|
||||
load_spacy()
|
||||
if output_folder != '.':
|
||||
Path(output_folder).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
filename = Path(file_path).name
|
||||
|
||||
extension = '.epub'
|
||||
book = epub.read_epub(file_path)
|
||||
meta_title = book.get_metadata('DC', 'title')
|
||||
title = meta_title[0][0] if meta_title else ''
|
||||
|
@ -49,6 +56,7 @@ def main(file_path, voice, pick_manually, speed, output_folder='.',
|
|||
print(f'Found cover image {cover_maybe.file_name} in {cover_maybe.media_type} format')
|
||||
|
||||
document_chapters = find_document_chapters_and_extract_texts(book)
|
||||
|
||||
if not selected_chapters:
|
||||
if pick_manually is True:
|
||||
selected_chapters = pick_chapters(document_chapters)
|
||||
|
@ -74,10 +82,14 @@ def main(file_path, voice, pick_manually, speed, output_folder='.',
|
|||
if max_chapters and i > max_chapters: break
|
||||
text = chapter.extracted_text
|
||||
xhtml_file_name = chapter.get_name().replace(' ', '_').replace('/', '_').replace('\\', '_')
|
||||
chapter_wav_path = Path(output_folder) / filename.replace('.epub', f'_chapter_{i}_{voice}_{xhtml_file_name}.wav')
|
||||
chapter_wav_path = Path(output_folder) / filename.replace(extension, f'_chapter_{i}_{voice}_{xhtml_file_name}.wav')
|
||||
chapter_wav_files.append(chapter_wav_path)
|
||||
if Path(chapter_wav_path).exists():
|
||||
print(f'File for chapter {i} already exists. Skipping')
|
||||
processed_chars += len(text)
|
||||
if post_event:
|
||||
post_event('CORE_CHAPTER_FINISHED', chapter_index=chapter.chapter_index)
|
||||
post_event('CORE_PROGRESS', progress=processed_chars * 100 // total_chars)
|
||||
continue
|
||||
if len(text.strip()) < 10:
|
||||
print(f'Skipping empty chapter {i}')
|
||||
|
@ -90,6 +102,7 @@ def main(file_path, voice, pick_manually, speed, output_folder='.',
|
|||
pipeline = KPipeline(lang_code=voice[0]) # a for american or b for british etc.
|
||||
|
||||
with yaspin(text=f'Reading chapter {i} ({len(text):,} characters)...', color="yellow") as spinner:
|
||||
if post_event: post_event('CORE_CHAPTER_STARTED', chapter_index=chapter.chapter_index)
|
||||
audio_segments = gen_audio_segments(pipeline, text, voice, speed, max_sentences=max_sentences)
|
||||
if audio_segments:
|
||||
final_audio = np.concatenate(audio_segments)
|
||||
|
@ -101,9 +114,11 @@ def main(file_path, voice, pick_manually, speed, output_folder='.',
|
|||
spinner.ok("✅")
|
||||
print(f'Estimated time remaining: {strfdelta((total_chars - processed_chars) / chars_per_sec)}')
|
||||
print('Chapter written to', chapter_wav_path)
|
||||
if post_event: post_event('CORE_CHAPTER_FINISHED', chapter_index=chapter.chapter_index)
|
||||
print(f'Chapter {i} read in {delta_seconds:.2f} seconds ({chars_per_sec:.0f} characters per second)')
|
||||
progress = processed_chars * 100 // total_chars
|
||||
print('Progress:', f'{progress}%\n')
|
||||
if post_event: post_event('CORE_PROGRESS', progress=progress)
|
||||
else:
|
||||
spinner.fail("❌")
|
||||
print(f'Warning: No audio generated for chapter {i}')
|
||||
|
@ -112,6 +127,7 @@ def main(file_path, voice, pick_manually, speed, output_folder='.',
|
|||
if has_ffmpeg:
|
||||
create_index_file(title, creator, chapter_wav_files, output_folder)
|
||||
create_m4b(chapter_wav_files, filename, cover_image, output_folder)
|
||||
if post_event: post_event('CORE_FINISHED')
|
||||
|
||||
|
||||
def find_cover(book):
|
||||
|
@ -172,6 +188,8 @@ def find_document_chapters_and_extract_texts(book):
|
|||
text += '.'
|
||||
chapter.extracted_text += text + '\n'
|
||||
document_chapters.append(chapter)
|
||||
for i, c in enumerate(document_chapters):
|
||||
c.chapter_index = i # this is used in the UI to identify chapters
|
||||
return document_chapters
|
||||
|
||||
|
||||
|
@ -284,3 +302,24 @@ def create_index_file(title, creator, chapter_mp3_files, output_folder):
|
|||
f.write(f"[CHAPTER]\nTIMEBASE=1/1000\nSTART={start}\nEND={end}\ntitle=Chapter {i}\n\n")
|
||||
i += 1
|
||||
start = end
|
||||
|
||||
|
||||
def unmark_element(element, stream=None):
|
||||
"""auxiliarry function to unmark markdown text"""
|
||||
if stream is None:
|
||||
stream = StringIO()
|
||||
if element.text:
|
||||
stream.write(element.text)
|
||||
for sub in element:
|
||||
unmark_element(sub, stream)
|
||||
if element.tail:
|
||||
stream.write(element.tail)
|
||||
return stream.getvalue()
|
||||
|
||||
|
||||
def unmark(text):
|
||||
"""Unmark markdown text"""
|
||||
Markdown.output_formats["plain"] = unmark_element # patching Markdown
|
||||
__md = Markdown(output_format="plain")
|
||||
__md.stripTopLevelTags = False
|
||||
return __md.convert(text)
|
||||
|
|
|
@ -8,16 +8,20 @@ import torch.cuda
|
|||
import numpy as np
|
||||
import soundfile
|
||||
import wx
|
||||
import wx.lib.newevent
|
||||
from wx.lib.newevent import NewEvent
|
||||
from wx.lib.scrolledpanel import ScrolledPanel
|
||||
from PIL import Image
|
||||
import threading
|
||||
|
||||
from voices import voices, flags
|
||||
|
||||
EventCoreStarted, EVENT_CORE_STARTED = wx.lib.newevent.NewEvent()
|
||||
EventCoreProgress, EVENT_CORE_PROGRESS = wx.lib.newevent.NewEvent()
|
||||
EventCoreChapterFinished, EVENT_CORE_CHAPTER_FINISHED = wx.lib.newevent.NewEvent()
|
||||
EVENTS = {
|
||||
'CORE_STARTED': NewEvent(),
|
||||
'CORE_PROGRESS': NewEvent(),
|
||||
'CORE_CHAPTER_STARTED': NewEvent(),
|
||||
'CORE_CHAPTER_FINISHED': NewEvent(),
|
||||
'CORE_FINISHED': NewEvent()
|
||||
}
|
||||
|
||||
|
||||
class MainWindow(wx.Frame):
|
||||
|
@ -34,7 +38,7 @@ class MainWindow(wx.Frame):
|
|||
self.create_layout()
|
||||
self.Centre()
|
||||
self.Show(True)
|
||||
self.open_epub('../epub/gene.epub')
|
||||
self.open_epub('../epub/mini.epub')
|
||||
|
||||
def create_menu(self):
|
||||
menubar = wx.MenuBar()
|
||||
|
@ -50,10 +54,41 @@ class MainWindow(wx.Frame):
|
|||
menubar.Append(file_menu, "&File")
|
||||
self.SetMenuBar(menubar)
|
||||
|
||||
self.Bind(EVENT_CORE_STARTED, self.event_core_started)
|
||||
self.Bind(EVENTS['CORE_STARTED'][1], self.on_core_started)
|
||||
self.Bind(EVENTS['CORE_CHAPTER_STARTED'][1], self.on_core_chapter_started)
|
||||
self.Bind(EVENTS['CORE_CHAPTER_FINISHED'][1], self.on_core_chapter_finished)
|
||||
self.Bind(EVENTS['CORE_PROGRESS'][1], self.on_core_progress)
|
||||
|
||||
def event_core_started(self, event):
|
||||
print('EVENT_CORE_STARTED')
|
||||
def on_core_started(self, event):
|
||||
print('CORE_STARTED')
|
||||
self.start_button.Hide()
|
||||
self.progress_bar_label.Show()
|
||||
self.progress_bar.Show()
|
||||
self.progress_bar.SetValue(0)
|
||||
self.param_panel.Disable()
|
||||
|
||||
for chapter_index, chapter in enumerate(self.document_chapters):
|
||||
if chapter in self.good_chapters:
|
||||
self.set_table_chapter_status(chapter.chapter_index, "Planned")
|
||||
|
||||
def on_core_chapter_started(self, event):
|
||||
print('CORE_CHAPTER_STARTED', event.chapter_index)
|
||||
self.set_table_chapter_status(event.chapter_index, "⏳ In Progress")
|
||||
|
||||
def on_core_chapter_finished(self, event):
|
||||
print('CORE_CHAPTER_FINISHED', event.chapter_index)
|
||||
self.set_table_chapter_status(event.chapter_index, "✅ Done")
|
||||
self.start_button.Show()
|
||||
|
||||
def on_core_progress(self, event):
|
||||
print('CORE_PROGRESS', event.progress)
|
||||
self.progress_bar.SetValue(event.progress)
|
||||
|
||||
def on_core_finished(self, event):
|
||||
print('CORE_FINISHED', event.progress)
|
||||
|
||||
def set_table_chapter_status(self, chapter_index, status):
|
||||
self.table.SetStringItem(chapter_index, 3, status)
|
||||
|
||||
def create_layout(self):
|
||||
# Panels layout looks like this:
|
||||
|
@ -278,9 +313,18 @@ class MainWindow(wx.Frame):
|
|||
self.param_sizer.Add(output_folder_button, pos=(4, 1), flag=wx.ALL, border=border)
|
||||
|
||||
# Add Start button
|
||||
start_button = wx.Button(self.param_panel, label="🚀 Start Audiobook Synthesis")
|
||||
start_button.Bind(wx.EVT_BUTTON, self.on_start)
|
||||
self.param_sizer.Add(start_button, pos=(6, 0), span=(1, 3), flag=wx.ALL, border=border)
|
||||
self.start_button = wx.Button(self.param_panel, label="🚀 Start Audiobook Synthesis")
|
||||
self.start_button.Bind(wx.EVT_BUTTON, self.on_start)
|
||||
self.param_sizer.Add(self.start_button, pos=(6, 0), span=(1, 3), flag=wx.ALL, border=border)
|
||||
|
||||
# Add Progress Bar label:
|
||||
self.progress_bar_label = wx.StaticText(self.param_panel, label="Synthesis Progress:")
|
||||
self.param_sizer.Add(self.progress_bar_label, pos=(7, 0), flag=wx.ALL, border=border)
|
||||
self.progress_bar = wx.Gauge(self.param_panel, range=100, style=wx.GA_PROGRESS) # vs GA_HORIZONTAL
|
||||
self.param_sizer.Add(self.progress_bar, pos=(8, 0), span=(1, 3), flag=wx.ALL | wx.EXPAND, border=border)
|
||||
self.progress_bar_label.Hide()
|
||||
self.progress_bar.Hide()
|
||||
|
||||
return self.param_panel
|
||||
|
||||
def open_output_folder_dialog(self, event):
|
||||
|
@ -325,11 +369,11 @@ class MainWindow(wx.Frame):
|
|||
self.selected_book = book
|
||||
|
||||
self.document_chapters = find_document_chapters_and_extract_texts(book)
|
||||
for chapter in self.document_chapters:
|
||||
chapter.short_name = chapter.get_name().replace('.xhtml', '').replace('xhtml/', '').replace('.html', '').replace('Text/', '')
|
||||
good_chapters = find_good_chapters(self.document_chapters)
|
||||
self.selected_chapter = good_chapters[0]
|
||||
# self.on_selected_chapter(good_chapters[0])
|
||||
for chapter in self.document_chapters:
|
||||
chapter.short_name = chapter.get_name().replace('.xhtml', '').replace('xhtml/', '').replace('.html', '').replace('Text/', '')
|
||||
chapter.is_selected = chapter in good_chapters
|
||||
|
||||
self.create_layout_for_ebook(self.splitter)
|
||||
|
||||
|
@ -377,13 +421,15 @@ class MainWindow(wx.Frame):
|
|||
sizer = wx.BoxSizer(wx.VERTICAL)
|
||||
panel.SetSizer(sizer)
|
||||
|
||||
table = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
|
||||
self.table = table = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
|
||||
table.InsertColumn(0, "Included")
|
||||
table.InsertColumn(1, "Chapter Name")
|
||||
table.InsertColumn(2, "Chapter Length")
|
||||
table.InsertColumn(3, "Status")
|
||||
table.SetColumnWidth(0, 80)
|
||||
table.SetColumnWidth(1, 150)
|
||||
table.SetColumnWidth(2, 150)
|
||||
table.SetColumnWidth(3, 100)
|
||||
table.SetSize((250, -1))
|
||||
table.EnableCheckBoxes()
|
||||
table.Bind(wx.EVT_LIST_ITEM_CHECKED, self.on_table_checked)
|
||||
|
@ -475,9 +521,17 @@ class CoreThread(threading.Thread):
|
|||
self.params = params
|
||||
|
||||
def run(self):
|
||||
wx.PostEvent(wx.GetApp().GetTopWindow(), EventCoreStarted())
|
||||
# wx.PostEvent(wx.GetApp().GetTopWindow(), EventCoreStarted())
|
||||
import core
|
||||
core.main(**self.params)
|
||||
core.main(**self.params, post_event=self.post_event)
|
||||
|
||||
def post_event(self, event_name, **kwargs):
|
||||
# eg. 'EVENT_CORE_PROGRESS' -> EventCoreProgress, EVENT_CORE_PROGRESS
|
||||
EventObject, EVENT_CODE = EVENTS[event_name]
|
||||
event_object = EventObject()
|
||||
for k, v in kwargs.items():
|
||||
setattr(event_object, k, v)
|
||||
wx.PostEvent(wx.GetApp().GetTopWindow(), event_object)
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
@ -29,10 +29,18 @@ class CliTest(unittest.TestCase):
|
|||
self.assertTrue(Path('./prova/mini.m4b').stat().st_size > 256 * 1024)
|
||||
|
||||
def test_md(self):
|
||||
out = self.cli('markdown.md')
|
||||
content = (
|
||||
'## Italy\n'
|
||||
'Italy, officially the Italian Republic, is a country in '
|
||||
'(Southern)[https://en.wikipedia.org/wiki/Southern_Europe] and Western Europe. '
|
||||
'It consists of a peninsula that extends into the Mediterranean Sea, '
|
||||
'with the Alps on its northern land border, '
|
||||
'as well as nearly 800 islands, notably Sicily and Sardinia.')
|
||||
file_name = NamedTemporaryFile('w', suffix='.txt', delete=False).write(content)
|
||||
out = self.cli(file_name)
|
||||
self.assertIn('Creating M4B file', out)
|
||||
self.assertTrue(Path('markdown.mp4').exists())
|
||||
self.assertTrue(Path('markdown.mp4').stat().st_size > 256 * 1024)
|
||||
self.assertTrue(Path(file_name).exists())
|
||||
self.assertTrue(Path('file_name').stat().st_size > 256 * 1024)
|
||||
|
||||
def test_txt(self):
|
||||
content = (
|
||||
|
|
Loading…
Add table
Reference in a new issue