mirror of
https://github.com/3b1b/manim.git
synced 2025-04-13 09:47:07 +00:00

* Comment tweak * Directly print traceback Since the shell.showtraceback is giving some issues * Make InteracrtiveSceneEmbed into a class This way it can keep track of it's internal shell; use of get_ipython has a finicky relationship with reloading. * Move remaining checkpoint_paste logic into scene_embed.py This involved making a few context managers for Scene: temp_record, temp_skip, temp_progress_bar, which seem useful in and of themselves. * Change null key to be the empty string * Ensure temporary svg paths for Text are deleted * Remove unused dict_ops.py functions * Remove break_into_partial_movies from file_writer configuration * Rewrite guarantee_existence using Path * Clean up SceneFileWriter It had a number of vestigial functions no longer used, and some setup that could be made more organized. * Remove --save_pngs CLI arg (which did nothing) * Add --subdivide CLI arg * Remove add_extension_if_not_present * Remove get_sorted_integer_files * Have find_file return Path * Minor clean up * Clean up num_tex_symbols * Fix find_file * Minor cleanup for extract_scene.py * Add preview_frame_while_skipping option to scene config * Use shell.showtraceback function * Move keybindings to config, instead of in-place constants * Replace DEGREES -> DEG * Add arg to clear the cache * Separate out full_tex_to_svg from tex_to_svg And only cache to disk the results of full_tex_to_svg. Otherwise, making edits to the tex_templates would not show up without clearing the cache. * Bug fix in handling BlankScene * Make checkpoint_states an instance variable of CheckpointManager As per https://github.com/3b1b/manim/issues/2272 * Move resizing out of Window.focus, and into Window.init_for_scene * Make default output directory "." instead of "" To address https://github.com/3b1b/manim/issues/2261 * Remove input_file_path arg from SceneFileWriter * Use Dict syntax in place of dict for config more consistently across config.py * Simplify get_output_directory * Swap order of preamble and additional preamble
385 lines
13 KiB
Python
385 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import subprocess as sp
|
|
import sys
|
|
|
|
import numpy as np
|
|
from pydub import AudioSegment
|
|
from tqdm.auto import tqdm as ProgressDisplay
|
|
from pathlib import Path
|
|
|
|
from manimlib.logger import log
|
|
from manimlib.mobject.mobject import Mobject
|
|
from manimlib.utils.file_ops import guarantee_existence
|
|
from manimlib.utils.sounds import get_full_sound_file_path
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from PIL.Image import Image
|
|
|
|
from manimlib.camera.camera import Camera
|
|
from manimlib.scene.scene import Scene
|
|
|
|
|
|
class SceneFileWriter(object):
|
|
def __init__(
|
|
self,
|
|
scene: Scene,
|
|
write_to_movie: bool = False,
|
|
subdivide_output: bool = False,
|
|
png_mode: str = "RGBA",
|
|
save_last_frame: bool = False,
|
|
movie_file_extension: str = ".mp4",
|
|
# Where should this be written
|
|
output_directory: str = ".",
|
|
file_name: str | None = None,
|
|
open_file_upon_completion: bool = False,
|
|
show_file_location_upon_completion: bool = False,
|
|
quiet: bool = False,
|
|
total_frames: int = 0,
|
|
progress_description_len: int = 40,
|
|
# Name of the binary used for ffmpeg
|
|
ffmpeg_bin: str = "ffmpeg",
|
|
video_codec: str = "libx264",
|
|
pixel_format: str = "yuv420p",
|
|
saturation: float = 1.0,
|
|
gamma: float = 1.0,
|
|
):
|
|
self.scene: Scene = scene
|
|
self.write_to_movie = write_to_movie
|
|
self.subdivide_output = subdivide_output
|
|
self.png_mode = png_mode
|
|
self.save_last_frame = save_last_frame
|
|
self.movie_file_extension = movie_file_extension
|
|
self.output_directory = output_directory
|
|
self.file_name = file_name
|
|
self.open_file_upon_completion = open_file_upon_completion
|
|
self.show_file_location_upon_completion = show_file_location_upon_completion
|
|
self.quiet = quiet
|
|
self.total_frames = total_frames
|
|
self.progress_description_len = progress_description_len
|
|
self.ffmpeg_bin = ffmpeg_bin
|
|
self.video_codec = video_codec
|
|
self.pixel_format = pixel_format
|
|
self.saturation = saturation
|
|
self.gamma = gamma
|
|
|
|
# State during file writing
|
|
self.writing_process: sp.Popen | None = None
|
|
self.progress_display: ProgressDisplay | None = None
|
|
self.ended_with_interrupt: bool = False
|
|
|
|
self.init_output_directories()
|
|
self.init_audio()
|
|
|
|
# Output directories and files
|
|
def init_output_directories(self) -> None:
|
|
if self.save_last_frame:
|
|
self.image_file_path = self.init_image_file_path()
|
|
if self.write_to_movie:
|
|
self.movie_file_path = self.init_movie_file_path()
|
|
if self.subdivide_output:
|
|
self.partial_movie_directory = self.init_partial_movie_directory()
|
|
|
|
def init_image_file_path(self) -> Path:
|
|
return self.get_output_file_rootname().with_suffix(".png")
|
|
|
|
def init_movie_file_path(self) -> Path:
|
|
return self.get_output_file_rootname().with_suffix(self.movie_file_extension)
|
|
|
|
def init_partial_movie_directory(self):
|
|
return guarantee_existence(self.get_output_file_rootname())
|
|
|
|
def get_output_file_rootname(self) -> Path:
|
|
return Path(
|
|
guarantee_existence(self.output_directory),
|
|
self.get_output_file_name()
|
|
)
|
|
|
|
def get_output_file_name(self) -> str:
|
|
if self.file_name:
|
|
return self.file_name
|
|
# Otherwise, use the name of the scene, potentially
|
|
# appending animation numbers
|
|
name = str(self.scene)
|
|
saan = self.scene.start_at_animation_number
|
|
eaan = self.scene.end_at_animation_number
|
|
if saan is not None:
|
|
name += f"_{saan}"
|
|
if eaan is not None:
|
|
name += f"_{eaan}"
|
|
return name
|
|
|
|
# Directory getters
|
|
def get_image_file_path(self) -> str:
|
|
return self.image_file_path
|
|
|
|
def get_next_partial_movie_path(self) -> str:
|
|
result = Path(self.partial_movie_directory, f"{self.scene.num_plays:05}")
|
|
return result.with_suffix(self.movie_file_extension)
|
|
|
|
def get_movie_file_path(self) -> str:
|
|
return self.movie_file_path
|
|
|
|
# Sound
|
|
def init_audio(self) -> None:
|
|
self.includes_sound: bool = False
|
|
|
|
def create_audio_segment(self) -> None:
|
|
self.audio_segment = AudioSegment.silent()
|
|
|
|
def add_audio_segment(
|
|
self,
|
|
new_segment: AudioSegment,
|
|
time: float | None = None,
|
|
gain_to_background: float | None = None
|
|
) -> None:
|
|
if not self.includes_sound:
|
|
self.includes_sound = True
|
|
self.create_audio_segment()
|
|
segment = self.audio_segment
|
|
curr_end = segment.duration_seconds
|
|
if time is None:
|
|
time = curr_end
|
|
if time < 0:
|
|
raise Exception("Adding sound at timestamp < 0")
|
|
|
|
new_end = time + new_segment.duration_seconds
|
|
diff = new_end - curr_end
|
|
if diff > 0:
|
|
segment = segment.append(
|
|
AudioSegment.silent(int(np.ceil(diff * 1000))),
|
|
crossfade=0,
|
|
)
|
|
self.audio_segment = segment.overlay(
|
|
new_segment,
|
|
position=int(1000 * time),
|
|
gain_during_overlay=gain_to_background,
|
|
)
|
|
|
|
def add_sound(
|
|
self,
|
|
sound_file: str,
|
|
time: float | None = None,
|
|
gain: float | None = None,
|
|
gain_to_background: float | None = None
|
|
) -> None:
|
|
file_path = get_full_sound_file_path(sound_file)
|
|
new_segment = AudioSegment.from_file(file_path)
|
|
if gain:
|
|
new_segment = new_segment.apply_gain(gain)
|
|
self.add_audio_segment(new_segment, time, gain_to_background)
|
|
|
|
# Writers
|
|
def begin(self) -> None:
|
|
if not self.subdivide_output and self.write_to_movie:
|
|
self.open_movie_pipe(self.get_movie_file_path())
|
|
|
|
def begin_animation(self) -> None:
|
|
if self.subdivide_output and self.write_to_movie:
|
|
self.open_movie_pipe(self.get_next_partial_movie_path())
|
|
|
|
def end_animation(self) -> None:
|
|
if self.subdivide_output and self.write_to_movie:
|
|
self.close_movie_pipe()
|
|
|
|
def finish(self) -> None:
|
|
if not self.subdivide_output and self.write_to_movie:
|
|
self.close_movie_pipe()
|
|
if self.includes_sound:
|
|
self.add_sound_to_video()
|
|
self.print_file_ready_message(self.get_movie_file_path())
|
|
if self.save_last_frame:
|
|
self.scene.update_frame(force_draw=True)
|
|
self.save_final_image(self.scene.get_image())
|
|
if self.should_open_file():
|
|
self.open_file()
|
|
|
|
def open_movie_pipe(self, file_path: str) -> None:
|
|
stem, ext = os.path.splitext(file_path)
|
|
self.final_file_path = file_path
|
|
self.temp_file_path = stem + "_temp" + ext
|
|
|
|
fps = self.scene.camera.fps
|
|
width, height = self.scene.camera.get_pixel_shape()
|
|
|
|
vf_arg = 'vflip'
|
|
vf_arg += f',eq=saturation={self.saturation}:gamma={self.gamma}'
|
|
|
|
command = [
|
|
self.ffmpeg_bin,
|
|
'-y', # overwrite output file if it exists
|
|
'-f', 'rawvideo',
|
|
'-s', f'{width}x{height}', # size of one frame
|
|
'-pix_fmt', 'rgba',
|
|
'-r', str(fps), # frames per second
|
|
'-i', '-', # The input comes from a pipe
|
|
'-vf', vf_arg,
|
|
'-an', # Tells ffmpeg not to expect any audio
|
|
'-loglevel', 'error',
|
|
]
|
|
if self.video_codec:
|
|
command += ['-vcodec', self.video_codec]
|
|
if self.pixel_format:
|
|
command += ['-pix_fmt', self.pixel_format]
|
|
command += [self.temp_file_path]
|
|
self.writing_process = sp.Popen(command, stdin=sp.PIPE)
|
|
|
|
if not self.quiet:
|
|
self.progress_display = ProgressDisplay(
|
|
range(self.total_frames),
|
|
leave=False,
|
|
ascii=True if platform.system() == 'Windows' else None,
|
|
dynamic_ncols=True,
|
|
)
|
|
self.set_progress_display_description()
|
|
|
|
def use_fast_encoding(self):
|
|
self.video_codec = "libx264rgb"
|
|
self.pixel_format = "rgb32"
|
|
|
|
def get_insert_file_path(self, index: int) -> Path:
|
|
movie_path = Path(self.get_movie_file_path())
|
|
scene_name = movie_path.stem
|
|
insert_dir = Path(movie_path.parent, "inserts")
|
|
guarantee_existence(insert_dir)
|
|
return Path(insert_dir, f"{scene_name}_{index}").with_suffix(self.movie_file_extension)
|
|
|
|
def begin_insert(self):
|
|
# Begin writing process
|
|
self.write_to_movie = True
|
|
self.init_output_directories()
|
|
index = 0
|
|
while (insert_path := self.get_insert_file_path(index)).exists():
|
|
index += 1
|
|
self.inserted_file_path = insert_path
|
|
self.open_movie_pipe(self.inserted_file_path)
|
|
|
|
def end_insert(self):
|
|
self.close_movie_pipe()
|
|
self.write_to_movie = False
|
|
self.print_file_ready_message(self.inserted_file_path)
|
|
|
|
def has_progress_display(self):
|
|
return self.progress_display is not None
|
|
|
|
def set_progress_display_description(self, file: str = "", sub_desc: str = "") -> None:
|
|
if self.progress_display is None:
|
|
return
|
|
|
|
desc_len = self.progress_description_len
|
|
if not file:
|
|
file = os.path.split(self.get_movie_file_path())[1]
|
|
full_desc = f"{file} {sub_desc}"
|
|
if len(full_desc) > desc_len:
|
|
full_desc = full_desc[:desc_len - 3] + "..."
|
|
else:
|
|
full_desc += " " * (desc_len - len(full_desc))
|
|
self.progress_display.set_description(full_desc)
|
|
|
|
def write_frame(self, camera: Camera) -> None:
|
|
if self.write_to_movie:
|
|
raw_bytes = camera.get_raw_fbo_data()
|
|
self.writing_process.stdin.write(raw_bytes)
|
|
if self.progress_display is not None:
|
|
self.progress_display.update()
|
|
|
|
def close_movie_pipe(self) -> None:
|
|
self.writing_process.stdin.close()
|
|
self.writing_process.wait()
|
|
self.writing_process.terminate()
|
|
if self.progress_display is not None:
|
|
self.progress_display.close()
|
|
|
|
if not self.ended_with_interrupt:
|
|
shutil.move(self.temp_file_path, self.final_file_path)
|
|
else:
|
|
self.movie_file_path = self.temp_file_path
|
|
|
|
def add_sound_to_video(self) -> None:
|
|
movie_file_path = self.get_movie_file_path()
|
|
stem, ext = os.path.splitext(movie_file_path)
|
|
sound_file_path = stem + ".wav"
|
|
# Makes sure sound file length will match video file
|
|
self.add_audio_segment(AudioSegment.silent(0))
|
|
self.audio_segment.export(
|
|
sound_file_path,
|
|
bitrate='312k',
|
|
)
|
|
temp_file_path = stem + "_temp" + ext
|
|
commands = [
|
|
self.ffmpeg_bin,
|
|
"-i", movie_file_path,
|
|
"-i", sound_file_path,
|
|
'-y', # overwrite output file if it exists
|
|
"-c:v", "copy",
|
|
"-c:a", "aac",
|
|
"-b:a", "320k",
|
|
# select video stream from first file
|
|
"-map", "0:v:0",
|
|
# select audio stream from second file
|
|
"-map", "1:a:0",
|
|
'-loglevel', 'error',
|
|
# "-shortest",
|
|
temp_file_path,
|
|
]
|
|
sp.call(commands)
|
|
shutil.move(temp_file_path, movie_file_path)
|
|
os.remove(sound_file_path)
|
|
|
|
def save_final_image(self, image: Image) -> None:
|
|
file_path = self.get_image_file_path()
|
|
image.save(file_path)
|
|
self.print_file_ready_message(file_path)
|
|
|
|
def print_file_ready_message(self, file_path: str) -> None:
|
|
if not self.quiet:
|
|
log.info(f"File ready at {file_path}")
|
|
|
|
def should_open_file(self) -> bool:
|
|
return any([
|
|
self.show_file_location_upon_completion,
|
|
self.open_file_upon_completion,
|
|
])
|
|
|
|
def open_file(self) -> None:
|
|
if self.quiet:
|
|
curr_stdout = sys.stdout
|
|
sys.stdout = open(os.devnull, "w")
|
|
|
|
current_os = platform.system()
|
|
file_paths = []
|
|
|
|
if self.save_last_frame:
|
|
file_paths.append(self.get_image_file_path())
|
|
if self.write_to_movie:
|
|
file_paths.append(self.get_movie_file_path())
|
|
|
|
for file_path in file_paths:
|
|
if current_os == "Windows":
|
|
os.startfile(file_path)
|
|
else:
|
|
commands = []
|
|
if current_os == "Linux":
|
|
commands.append("xdg-open")
|
|
elif current_os.startswith("CYGWIN"):
|
|
commands.append("cygstart")
|
|
else: # Assume macOS
|
|
commands.append("open")
|
|
|
|
if self.show_file_location_upon_completion:
|
|
commands.append("-R")
|
|
|
|
commands.append(file_path)
|
|
|
|
FNULL = open(os.devnull, 'w')
|
|
sp.call(commands, stdout=FNULL, stderr=sp.STDOUT)
|
|
FNULL.close()
|
|
|
|
if self.quiet:
|
|
sys.stdout.close()
|
|
sys.stdout = curr_stdout
|