Kokoro-FastAPI/api/src/services/temp_manager.py

140 lines
4.4 KiB
Python
Raw Normal View History

"""Temporary file writer for audio downloads"""
import os
import tempfile
2025-02-09 18:32:17 -07:00
from typing import List, Optional
import aiofiles
from fastapi import HTTPException
from loguru import logger
from ..core.config import settings
async def cleanup_temp_files() -> None:
"""Clean up old temp files"""
try:
if not await aiofiles.os.path.exists(settings.temp_file_dir):
await aiofiles.os.makedirs(settings.temp_file_dir, exist_ok=True)
return
# Get all temp files with stats
files = []
total_size = 0
2025-02-09 18:32:17 -07:00
# Use os.scandir for sync iteration, but aiofiles.os.stat for async stats
for entry in os.scandir(settings.temp_file_dir):
if entry.is_file():
stat = await aiofiles.os.stat(entry.path)
files.append((entry.path, stat.st_mtime, stat.st_size))
total_size += stat.st_size
# Sort by modification time (oldest first)
files.sort(key=lambda x: x[1])
# Remove files if:
# 1. They're too old
# 2. We have too many files
# 3. Directory is too large
current_time = (await aiofiles.os.stat(settings.temp_file_dir)).st_mtime
max_age = settings.max_temp_dir_age_hours * 3600
for path, mtime, size in files:
should_delete = False
2025-02-09 18:32:17 -07:00
# Check age
if current_time - mtime > max_age:
should_delete = True
logger.info(f"Deleting old temp file: {path}")
2025-02-09 18:32:17 -07:00
# Check count limit
elif len(files) > settings.max_temp_dir_count:
should_delete = True
logger.info(f"Deleting excess temp file: {path}")
2025-02-09 18:32:17 -07:00
# Check size limit
elif total_size > settings.max_temp_dir_size_mb * 1024 * 1024:
should_delete = True
logger.info(f"Deleting to reduce directory size: {path}")
2025-02-09 18:32:17 -07:00
if should_delete:
try:
await aiofiles.os.remove(path)
total_size -= size
logger.info(f"Deleted temp file: {path}")
except Exception as e:
logger.warning(f"Failed to delete temp file {path}: {e}")
except Exception as e:
logger.warning(f"Error during temp file cleanup: {e}")
class TempFileWriter:
"""Handles writing audio chunks to a temp file"""
2025-02-09 18:32:17 -07:00
def __init__(self, format: str):
"""Initialize temp file writer
2025-02-09 18:32:17 -07:00
Args:
format: Audio format extension (mp3, wav, etc)
"""
self.format = format
self.temp_file = None
self._finalized = False
2025-02-09 18:32:17 -07:00
async def __aenter__(self):
"""Async context manager entry"""
# Clean up old files first
await cleanup_temp_files()
2025-02-09 18:32:17 -07:00
# Create temp file with proper extension
await aiofiles.os.makedirs(settings.temp_file_dir, exist_ok=True)
temp = tempfile.NamedTemporaryFile(
dir=settings.temp_file_dir,
delete=False,
suffix=f".{self.format}",
2025-02-09 18:32:17 -07:00
mode="wb",
)
2025-02-09 18:32:17 -07:00
self.temp_file = await aiofiles.open(temp.name, mode="wb")
self.temp_path = temp.name
temp.close() # Close sync file, we'll use async version
2025-02-09 18:32:17 -07:00
# Generate download path immediately
self.download_path = f"/download/{os.path.basename(self.temp_path)}"
return self
2025-02-09 18:32:17 -07:00
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
try:
if self.temp_file and not self._finalized:
await self.temp_file.close()
self._finalized = True
except Exception as e:
logger.error(f"Error closing temp file: {e}")
2025-02-09 18:32:17 -07:00
async def write(self, chunk: bytes) -> None:
"""Write a chunk of audio data
2025-02-09 18:32:17 -07:00
Args:
chunk: Audio data bytes to write
"""
if self._finalized:
raise RuntimeError("Cannot write to finalized temp file")
2025-02-09 18:32:17 -07:00
await self.temp_file.write(chunk)
await self.temp_file.flush()
2025-02-09 18:32:17 -07:00
async def finalize(self) -> str:
"""Close temp file and return download path
2025-02-09 18:32:17 -07:00
Returns:
Path to use for downloading the temp file
"""
if self._finalized:
raise RuntimeError("Temp file already finalized")
2025-02-09 18:32:17 -07:00
await self.temp_file.close()
self._finalized = True
2025-02-09 18:32:17 -07:00
return f"/download/{os.path.basename(self.temp_path)}"