mirror of
https://github.com/3b1b/manim.git
synced 2025-04-13 09:47:07 +00:00

This simply didn't work, and had no resilience to changes to the library. For cases where this might be useful, it's likely much better deliberately save specific data which is time-consuming to generate on the fly.
458 lines
16 KiB
Python
458 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import subprocess as sp
|
|
import sys
|
|
|
|
import numpy as np
|
|
from pydub import AudioSegment
|
|
from tqdm.auto import tqdm as ProgressDisplay
|
|
from pathlib import Path
|
|
|
|
from manimlib.constants import FFMPEG_BIN
|
|
from manimlib.logger import log
|
|
from manimlib.mobject.mobject import Mobject
|
|
from manimlib.utils.file_ops import add_extension_if_not_present
|
|
from manimlib.utils.file_ops import get_sorted_integer_files
|
|
from manimlib.utils.file_ops import guarantee_existence
|
|
from manimlib.utils.sounds import get_full_sound_file_path
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from PIL.Image import Image
|
|
|
|
from manimlib.camera.camera import Camera
|
|
from manimlib.scene.scene import Scene
|
|
|
|
|
|
class SceneFileWriter(object):
|
|
def __init__(
|
|
self,
|
|
scene: Scene,
|
|
write_to_movie: bool = False,
|
|
break_into_partial_movies: bool = False,
|
|
save_pngs: bool = False, # TODO, this currently does nothing
|
|
png_mode: str = "RGBA",
|
|
save_last_frame: bool = False,
|
|
movie_file_extension: str = ".mp4",
|
|
# What python file is generating this scene
|
|
input_file_path: str = "",
|
|
# Where should this be written
|
|
output_directory: str | None = None,
|
|
file_name: str | None = None,
|
|
subdirectory_for_videos: bool = False,
|
|
open_file_upon_completion: bool = False,
|
|
show_file_location_upon_completion: bool = False,
|
|
quiet: bool = False,
|
|
total_frames: int = 0,
|
|
progress_description_len: int = 40,
|
|
video_codec: str = "libx264",
|
|
pixel_format: str = "yuv420p",
|
|
saturation: float = 1.0,
|
|
gamma: float = 1.0,
|
|
):
|
|
self.scene: Scene = scene
|
|
self.write_to_movie = write_to_movie
|
|
self.break_into_partial_movies = break_into_partial_movies
|
|
self.save_pngs = save_pngs
|
|
self.png_mode = png_mode
|
|
self.save_last_frame = save_last_frame
|
|
self.movie_file_extension = movie_file_extension
|
|
self.input_file_path = input_file_path
|
|
self.output_directory = output_directory
|
|
self.file_name = file_name
|
|
self.open_file_upon_completion = open_file_upon_completion
|
|
self.subdirectory_for_videos = subdirectory_for_videos
|
|
self.show_file_location_upon_completion = show_file_location_upon_completion
|
|
self.quiet = quiet
|
|
self.total_frames = total_frames
|
|
self.progress_description_len = progress_description_len
|
|
self.video_codec = video_codec
|
|
self.pixel_format = pixel_format
|
|
self.saturation = saturation
|
|
self.gamma = gamma
|
|
|
|
# State during file writing
|
|
self.writing_process: sp.Popen | None = None
|
|
self.progress_display: ProgressDisplay | None = None
|
|
self.ended_with_interrupt: bool = False
|
|
self.init_output_directories()
|
|
self.init_audio()
|
|
|
|
# Output directories and files
|
|
def init_output_directories(self) -> None:
|
|
out_dir = self.output_directory or ""
|
|
scene_name = self.file_name or self.get_default_scene_name()
|
|
if self.save_last_frame:
|
|
image_dir = guarantee_existence(os.path.join(out_dir, "images"))
|
|
image_file = add_extension_if_not_present(scene_name, ".png")
|
|
self.image_file_path = os.path.join(image_dir, image_file)
|
|
if self.write_to_movie:
|
|
if self.subdirectory_for_videos:
|
|
movie_dir = guarantee_existence(os.path.join(out_dir, "videos"))
|
|
else:
|
|
movie_dir = guarantee_existence(out_dir)
|
|
movie_file = add_extension_if_not_present(scene_name, self.movie_file_extension)
|
|
self.movie_file_path = os.path.join(movie_dir, movie_file)
|
|
if self.break_into_partial_movies:
|
|
self.partial_movie_directory = guarantee_existence(os.path.join(
|
|
movie_dir, "partial_movie_files", scene_name,
|
|
))
|
|
# A place to save mobjects
|
|
self.saved_mobject_directory = os.path.join(
|
|
out_dir, "mobjects", str(self.scene)
|
|
)
|
|
|
|
def get_default_module_directory(self) -> str:
|
|
path, _ = os.path.splitext(self.input_file_path)
|
|
if path.startswith("_"):
|
|
path = path[1:]
|
|
return path
|
|
|
|
def get_default_scene_name(self) -> str:
|
|
name = str(self.scene)
|
|
saan = self.scene.start_at_animation_number
|
|
eaan = self.scene.end_at_animation_number
|
|
if saan is not None:
|
|
name += f"_{saan}"
|
|
if eaan is not None:
|
|
name += f"_{eaan}"
|
|
return name
|
|
|
|
def get_resolution_directory(self) -> str:
|
|
pixel_height = self.scene.camera.pixel_height
|
|
fps = self.scene.camera.fps
|
|
return "{}p{}".format(
|
|
pixel_height, fps
|
|
)
|
|
|
|
# Directory getters
|
|
def get_image_file_path(self) -> str:
|
|
return self.image_file_path
|
|
|
|
def get_next_partial_movie_path(self) -> str:
|
|
result = os.path.join(
|
|
self.partial_movie_directory,
|
|
"{:05}{}".format(
|
|
self.scene.num_plays,
|
|
self.movie_file_extension,
|
|
)
|
|
)
|
|
return result
|
|
|
|
def get_movie_file_path(self) -> str:
|
|
return self.movie_file_path
|
|
|
|
# Sound
|
|
def init_audio(self) -> None:
|
|
self.includes_sound: bool = False
|
|
|
|
def create_audio_segment(self) -> None:
|
|
self.audio_segment = AudioSegment.silent()
|
|
|
|
def add_audio_segment(
|
|
self,
|
|
new_segment: AudioSegment,
|
|
time: float | None = None,
|
|
gain_to_background: float | None = None
|
|
) -> None:
|
|
if not self.includes_sound:
|
|
self.includes_sound = True
|
|
self.create_audio_segment()
|
|
segment = self.audio_segment
|
|
curr_end = segment.duration_seconds
|
|
if time is None:
|
|
time = curr_end
|
|
if time < 0:
|
|
raise Exception("Adding sound at timestamp < 0")
|
|
|
|
new_end = time + new_segment.duration_seconds
|
|
diff = new_end - curr_end
|
|
if diff > 0:
|
|
segment = segment.append(
|
|
AudioSegment.silent(int(np.ceil(diff * 1000))),
|
|
crossfade=0,
|
|
)
|
|
self.audio_segment = segment.overlay(
|
|
new_segment,
|
|
position=int(1000 * time),
|
|
gain_during_overlay=gain_to_background,
|
|
)
|
|
|
|
def add_sound(
|
|
self,
|
|
sound_file: str,
|
|
time: float | None = None,
|
|
gain: float | None = None,
|
|
gain_to_background: float | None = None
|
|
) -> None:
|
|
file_path = get_full_sound_file_path(sound_file)
|
|
new_segment = AudioSegment.from_file(file_path)
|
|
if gain:
|
|
new_segment = new_segment.apply_gain(gain)
|
|
self.add_audio_segment(new_segment, time, gain_to_background)
|
|
|
|
# Writers
|
|
def begin(self) -> None:
|
|
if not self.break_into_partial_movies and self.write_to_movie:
|
|
self.open_movie_pipe(self.get_movie_file_path())
|
|
|
|
def begin_animation(self) -> None:
|
|
if self.break_into_partial_movies and self.write_to_movie:
|
|
self.open_movie_pipe(self.get_next_partial_movie_path())
|
|
|
|
def end_animation(self) -> None:
|
|
if self.break_into_partial_movies and self.write_to_movie:
|
|
self.close_movie_pipe()
|
|
|
|
def finish(self) -> None:
|
|
if self.write_to_movie:
|
|
if self.break_into_partial_movies:
|
|
self.combine_movie_files()
|
|
else:
|
|
self.close_movie_pipe()
|
|
if self.includes_sound:
|
|
self.add_sound_to_video()
|
|
self.print_file_ready_message(self.get_movie_file_path())
|
|
if self.save_last_frame:
|
|
self.scene.update_frame(force_draw=True)
|
|
self.save_final_image(self.scene.get_image())
|
|
if self.should_open_file():
|
|
self.open_file()
|
|
|
|
def open_movie_pipe(self, file_path: str) -> None:
|
|
stem, ext = os.path.splitext(file_path)
|
|
self.final_file_path = file_path
|
|
self.temp_file_path = stem + "_temp" + ext
|
|
|
|
fps = self.scene.camera.fps
|
|
width, height = self.scene.camera.get_pixel_shape()
|
|
|
|
vf_arg = 'vflip'
|
|
# if self.pixel_format.startswith("yuv"):
|
|
vf_arg += f',eq=saturation={self.saturation}:gamma={self.gamma}'
|
|
|
|
command = [
|
|
FFMPEG_BIN,
|
|
'-y', # overwrite output file if it exists
|
|
'-f', 'rawvideo',
|
|
'-s', f'{width}x{height}', # size of one frame
|
|
'-pix_fmt', 'rgba',
|
|
'-r', str(fps), # frames per second
|
|
'-i', '-', # The input comes from a pipe
|
|
'-vf', vf_arg,
|
|
'-an', # Tells FFMPEG not to expect any audio
|
|
'-loglevel', 'error',
|
|
]
|
|
if self.video_codec:
|
|
command += ['-vcodec', self.video_codec]
|
|
if self.pixel_format:
|
|
command += ['-pix_fmt', self.pixel_format]
|
|
command += [self.temp_file_path]
|
|
self.writing_process = sp.Popen(command, stdin=sp.PIPE)
|
|
|
|
if not self.quiet:
|
|
self.progress_display = ProgressDisplay(
|
|
range(self.total_frames),
|
|
leave=False,
|
|
ascii=True if platform.system() == 'Windows' else None,
|
|
dynamic_ncols=True,
|
|
)
|
|
self.set_progress_display_description()
|
|
|
|
def use_fast_encoding(self):
|
|
self.video_codec = "libx264rgb"
|
|
self.pixel_format = "rgb32"
|
|
|
|
def get_insert_file_path(self, index: int) -> Path:
|
|
movie_path = Path(self.get_movie_file_path())
|
|
scene_name = movie_path.stem
|
|
insert_dir = Path(movie_path.parent, "inserts")
|
|
guarantee_existence(str(insert_dir))
|
|
return Path(insert_dir, f"{scene_name}_{index}{movie_path.suffix}")
|
|
|
|
def begin_insert(self):
|
|
# Begin writing process
|
|
self.write_to_movie = True
|
|
self.init_output_directories()
|
|
index = 0
|
|
while (insert_path := self.get_insert_file_path(index)).exists():
|
|
index += 1
|
|
self.inserted_file_path = str(insert_path)
|
|
self.open_movie_pipe(self.inserted_file_path)
|
|
|
|
def end_insert(self):
|
|
self.close_movie_pipe()
|
|
self.write_to_movie = False
|
|
self.print_file_ready_message(self.inserted_file_path)
|
|
|
|
def has_progress_display(self):
|
|
return self.progress_display is not None
|
|
|
|
def set_progress_display_description(self, file: str = "", sub_desc: str = "") -> None:
|
|
if self.progress_display is None:
|
|
return
|
|
|
|
desc_len = self.progress_description_len
|
|
if not file:
|
|
file = os.path.split(self.get_movie_file_path())[1]
|
|
full_desc = f"{file} {sub_desc}"
|
|
if len(full_desc) > desc_len:
|
|
full_desc = full_desc[:desc_len - 3] + "..."
|
|
else:
|
|
full_desc += " " * (desc_len - len(full_desc))
|
|
self.progress_display.set_description(full_desc)
|
|
|
|
def write_frame(self, camera: Camera) -> None:
|
|
if self.write_to_movie:
|
|
raw_bytes = camera.get_raw_fbo_data()
|
|
self.writing_process.stdin.write(raw_bytes)
|
|
if self.progress_display is not None:
|
|
self.progress_display.update()
|
|
|
|
def close_movie_pipe(self) -> None:
|
|
self.writing_process.stdin.close()
|
|
self.writing_process.wait()
|
|
self.writing_process.terminate()
|
|
if self.progress_display is not None:
|
|
self.progress_display.close()
|
|
|
|
if not self.ended_with_interrupt:
|
|
shutil.move(self.temp_file_path, self.final_file_path)
|
|
else:
|
|
self.movie_file_path = self.temp_file_path
|
|
|
|
def combine_movie_files(self) -> None:
|
|
kwargs = {
|
|
"remove_non_integer_files": True,
|
|
"extension": self.movie_file_extension,
|
|
}
|
|
if self.scene.start_at_animation_number is not None:
|
|
kwargs["min_index"] = self.scene.start_at_animation_number
|
|
if self.scene.end_at_animation_number is not None:
|
|
kwargs["max_index"] = self.scene.end_at_animation_number
|
|
else:
|
|
kwargs["remove_indices_greater_than"] = self.scene.num_plays - 1
|
|
partial_movie_files = get_sorted_integer_files(
|
|
self.partial_movie_directory,
|
|
**kwargs
|
|
)
|
|
if len(partial_movie_files) == 0:
|
|
log.warning("No animations in this scene")
|
|
return
|
|
|
|
# Write a file partial_file_list.txt containing all
|
|
# partial movie files
|
|
file_list = os.path.join(
|
|
self.partial_movie_directory,
|
|
"partial_movie_file_list.txt"
|
|
)
|
|
with open(file_list, 'w') as fp:
|
|
for pf_path in partial_movie_files:
|
|
if os.name == 'nt':
|
|
pf_path = pf_path.replace('\\', '/')
|
|
fp.write(f"file \'{pf_path}\'\n")
|
|
|
|
movie_file_path = self.get_movie_file_path()
|
|
commands = [
|
|
FFMPEG_BIN,
|
|
'-y', # overwrite output file if it exists
|
|
'-f', 'concat',
|
|
'-safe', '0',
|
|
'-i', file_list,
|
|
'-loglevel', 'error',
|
|
'-c', 'copy',
|
|
movie_file_path
|
|
]
|
|
if not self.includes_sound:
|
|
commands.insert(-1, '-an')
|
|
|
|
combine_process = sp.Popen(commands)
|
|
combine_process.wait()
|
|
|
|
def add_sound_to_video(self) -> None:
|
|
movie_file_path = self.get_movie_file_path()
|
|
stem, ext = os.path.splitext(movie_file_path)
|
|
sound_file_path = stem + ".wav"
|
|
# Makes sure sound file length will match video file
|
|
self.add_audio_segment(AudioSegment.silent(0))
|
|
self.audio_segment.export(
|
|
sound_file_path,
|
|
bitrate='312k',
|
|
)
|
|
temp_file_path = stem + "_temp" + ext
|
|
commands = [
|
|
FFMPEG_BIN,
|
|
"-i", movie_file_path,
|
|
"-i", sound_file_path,
|
|
'-y', # overwrite output file if it exists
|
|
"-c:v", "copy",
|
|
"-c:a", "aac",
|
|
"-b:a", "320k",
|
|
# select video stream from first file
|
|
"-map", "0:v:0",
|
|
# select audio stream from second file
|
|
"-map", "1:a:0",
|
|
'-loglevel', 'error',
|
|
# "-shortest",
|
|
temp_file_path,
|
|
]
|
|
sp.call(commands)
|
|
shutil.move(temp_file_path, movie_file_path)
|
|
os.remove(sound_file_path)
|
|
|
|
def save_final_image(self, image: Image) -> None:
|
|
file_path = self.get_image_file_path()
|
|
image.save(file_path)
|
|
self.print_file_ready_message(file_path)
|
|
|
|
def print_file_ready_message(self, file_path: str) -> None:
|
|
if not self.quiet:
|
|
log.info(f"File ready at {file_path}")
|
|
|
|
def should_open_file(self) -> bool:
|
|
return any([
|
|
self.show_file_location_upon_completion,
|
|
self.open_file_upon_completion,
|
|
])
|
|
|
|
def open_file(self) -> None:
|
|
if self.quiet:
|
|
curr_stdout = sys.stdout
|
|
sys.stdout = open(os.devnull, "w")
|
|
|
|
current_os = platform.system()
|
|
file_paths = []
|
|
|
|
if self.save_last_frame:
|
|
file_paths.append(self.get_image_file_path())
|
|
if self.write_to_movie:
|
|
file_paths.append(self.get_movie_file_path())
|
|
|
|
for file_path in file_paths:
|
|
if current_os == "Windows":
|
|
os.startfile(file_path)
|
|
else:
|
|
commands = []
|
|
if current_os == "Linux":
|
|
commands.append("xdg-open")
|
|
elif current_os.startswith("CYGWIN"):
|
|
commands.append("cygstart")
|
|
else: # Assume macOS
|
|
commands.append("open")
|
|
|
|
if self.show_file_location_upon_completion:
|
|
commands.append("-R")
|
|
|
|
commands.append(file_path)
|
|
|
|
FNULL = open(os.devnull, 'w')
|
|
sp.call(commands, stdout=FNULL, stderr=sp.STDOUT)
|
|
FNULL.close()
|
|
|
|
if self.quiet:
|
|
sys.stdout.close()
|
|
sys.stdout = curr_stdout
|