Стриминг JSON Lines¶
🌐 Перевод выполнен с помощью ИИ и людей
Этот перевод был сделан ИИ под руководством людей. 🤝
В нем могут быть ошибки из-за неправильного понимания оригинального смысла или неестественности и т. д. 🤖
Вы можете улучшить этот перевод, помогая нам лучше направлять ИИ LLM.
У вас может быть последовательность данных, которую вы хотите отправлять в «потоке». Это можно сделать с помощью JSON Lines.
Информация
Добавлено в FastAPI 0.134.0.
Что такое поток?¶
«Стриминг» данных означает, что ваше приложение начнет отправлять элементы данных клиенту, не дожидаясь готовности всей последовательности.
То есть оно отправит первый элемент, клиент его получит и начнет обрабатывать, а вы в это время можете все еще генерировать следующий элемент.
sequenceDiagram
participant App
participant Client
App->>App: Produce Item 1
App->>Client: Send Item 1
App->>App: Produce Item 2
Client->>Client: Process Item 1
App->>Client: Send Item 2
App->>App: Produce Item 3
Client->>Client: Process Item 2
App->>Client: Send Item 3
Client->>Client: Process Item 3
Note over App: Keeps producing...
Note over Client: Keeps consuming...
Это может быть даже бесконечный поток, когда вы продолжаете отправлять данные.
JSON Lines¶
В таких случаях часто отправляют «JSON Lines», это формат, в котором отправляется по одному JSON-объекту на строку.
Ответ будет иметь тип содержимого application/jsonl (вместо application/json), а тело ответа будет примерно таким:
{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}
Это очень похоже на JSON-массив (эквивалент списка Python), но вместо того чтобы быть обернутым в [] и иметь , между элементами, здесь один JSON-объект на строку, они разделены символом новой строки.
Информация
Важный момент в том, что ваше приложение сможет по очереди производить каждую строку, пока клиент потребляет предыдущие строки.
Технические детали
Так как каждый JSON-объект будет разделен новой строкой, в их содержимом не могут быть буквальные символы новой строки, но могут быть экранированные переводы строк (\n), что входит в стандарт JSON.
Однако обычно об этом не нужно беспокоиться — всё делается автоматически, читайте дальше. 🤓
Варианты использования¶
Вы можете использовать это для стриминга данных из сервиса AI LLM, из логов или телеметрии, или из других типов данных, которые можно структурировать в элементы JSON.
Совет
Если вы хотите стримить бинарные данные, например видео или аудио, посмотрите расширенное руководство: Потоковая передача данных.
Стриминг JSON Lines с FastAPI¶
Чтобы стримить JSON Lines с FastAPI, вместо использования return в вашей функции-обработчике пути используйте yield, чтобы по очереди выдавать каждый элемент.
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Если каждый JSON-элемент, который вы хотите отправить обратно, имеет тип Item (Pydantic-модель), и это асинхронная функция, вы можете объявить тип возвращаемого значения как AsyncIterable[Item]:
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Если вы объявите тип возвращаемого значения, FastAPI будет использовать его, чтобы валидировать данные, документировать их в OpenAPI, фильтровать и сериализовать с помощью Pydantic.
Совет
Так как Pydantic будет сериализовывать это на стороне Rust, вы получите значительно более высокую производительность, чем если бы вы не указывали тип возвращаемого значения.
Неасинхронные функции-обработчики пути¶
Вы также можете использовать обычные функции def (без async) и использовать yield таким же образом.
FastAPI обеспечит корректное выполнение так, чтобы это не блокировало цикл событий.
Поскольку в этом случае функция не асинхронная, подходящим типом возвращаемого значения будет Iterable[Item]:
# Code above omitted 👆
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Без возвращаемого типа¶
Вы также можете опустить тип возвращаемого значения. Тогда FastAPI использует jsonable_encoder, чтобы преобразовать данные к виду, который можно сериализовать в JSON, и затем отправит их как JSON Lines.
# Code above omitted 👆
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
События, отправляемые сервером (SSE)¶
FastAPI также имеет полноценную поддержку Server-Sent Events (SSE), которые довольно похожи, но с парой дополнительных деталей. Вы можете узнать о них в следующей главе: События, отправляемые сервером (SSE). 🤓