mirror of
https://github.com/remsky/Kokoro-FastAPI.git
synced 2025-08-05 16:48:53 +00:00
51 lines
1.7 KiB
Python
51 lines
1.7 KiB
Python
![]() |
from collections.abc import AsyncIterable, Iterable
|
||
|
|
||
|
import json
|
||
|
import typing
|
||
|
from pydantic import BaseModel
|
||
|
from starlette.background import BackgroundTask
|
||
|
from starlette.concurrency import iterate_in_threadpool
|
||
|
from starlette.responses import JSONResponse, StreamingResponse
|
||
|
|
||
|
|
||
|
class JSONStreamingResponse(StreamingResponse, JSONResponse):
|
||
|
"""StreamingResponse that also render with JSON."""
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
content: Iterable | AsyncIterable,
|
||
|
status_code: int = 200,
|
||
|
headers: dict[str, str] | None = None,
|
||
|
media_type: str | None = None,
|
||
|
background: BackgroundTask | None = None,
|
||
|
) -> None:
|
||
|
if isinstance(content, AsyncIterable):
|
||
|
self._content_iterable: AsyncIterable = content
|
||
|
else:
|
||
|
self._content_iterable = iterate_in_threadpool(content)
|
||
|
|
||
|
|
||
|
|
||
|
async def body_iterator() -> AsyncIterable[bytes]:
|
||
|
async for content_ in self._content_iterable:
|
||
|
if isinstance(content_, BaseModel):
|
||
|
content_ = content_.model_dump()
|
||
|
yield self.render(content_)
|
||
|
|
||
|
|
||
|
|
||
|
self.body_iterator = body_iterator()
|
||
|
self.status_code = status_code
|
||
|
if media_type is not None:
|
||
|
self.media_type = media_type
|
||
|
self.background = background
|
||
|
self.init_headers(headers)
|
||
|
|
||
|
def render(self, content: typing.Any) -> bytes:
|
||
|
return (json.dumps(
|
||
|
content,
|
||
|
ensure_ascii=False,
|
||
|
allow_nan=False,
|
||
|
indent=None,
|
||
|
separators=(",", ":"),
|
||
|
) + "\n").encode("utf-8")
|