Перейти до змісту

Стрімінг JSON Lines

🌐 Переклад ШІ та людьми

Цей переклад виконано ШІ під керівництвом людей. 🤝

Можливі помилки через неправильне розуміння початкового змісту або неприродні формулювання тощо. 🤖

Ви можете покращити цей переклад, допомігши нам краще спрямовувати AI LLM.

Англійська версія

У вас може бути послідовність даних, яку ви хочете надсилати у «потоці», це можна зробити за допомогою JSON Lines.

Інформація

Додано в FastAPI 0.134.0.

Що таке потік

«Стрімінг» даних означає, що ваш застосунок почне надсилати елементи даних клієнту, не чекаючи, доки буде готова вся послідовність елементів.

Тобто він надішле перший елемент, клієнт його отримає і почне обробляти, а ви в цей час уже можете створювати наступний елемент.

sequenceDiagram
    participant App
    participant Client

    App->>App: Produce Item 1
    App->>Client: Send Item 1
    App->>App: Produce Item 2
    Client->>Client: Process Item 1
    App->>Client: Send Item 2
    App->>App: Produce Item 3
    Client->>Client: Process Item 2
    App->>Client: Send Item 3
    Client->>Client: Process Item 3
    Note over App: Keeps producing...
    Note over Client: Keeps consuming...

Це може бути навіть нескінченний потік, у якому ви постійно надсилаєте дані.

JSON Lines

У таких випадках часто надсилають «JSON Lines» - формат, у якому ви надсилаєте по одному об’єкту JSON на рядок.

Відповідь матиме тип вмісту application/jsonl (замість application/json), а тіло буде приблизно таким:

{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}

Це дуже схоже на масив JSON (еквівалент списку Python), але замість того, щоб бути загорнутим у [] і мати , між елементами, тут є по одному об’єкту JSON на рядок, вони розділені символом нового рядка.

Інформація

Важливо те, що ваш застосунок зможе по черзі створювати кожен рядок, поки клієнт споживає попередні рядки.

Технічні деталі

Оскільки кожен об’єкт JSON буде розділено новим рядком, він не може містити буквальні символи нового рядка у своєму вмісті, але може містити екрановані нові рядки (\n), що є частиною стандарту JSON.

Зазвичай про це не треба турбуватися, усе робиться автоматично, читайте далі. 🤓

Випадки використання

Ви можете використовувати це, щоб стрімити дані зі служби AI LLM, із логів чи телеметрії, або з інших типів даних, які можна структурувати як елементи JSON.

Порада

Якщо ви хочете стрімити бінарні дані, наприклад відео чи аудіо, перегляньте просунутий посібник: Потокова передача даних.

Стрімінг JSON Lines з FastAPI

Щоб стрімити JSON Lines з FastAPI, замість використання return у вашій функції операції шляху використовуйте yield, щоб по черзі створювати кожен елемент.

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Якщо кожен елемент JSON, який ви хочете надіслати у відповідь, має тип Item (модель Pydantic) і це async-функція, ви можете оголосити тип повернення як AsyncIterable[Item]:

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Якщо ви оголосите тип повернення, FastAPI використає його, щоб перевіряти дані, документувати їх в OpenAPI, фільтрувати їх і серіалізувати за допомогою Pydantic.

Порада

Оскільки Pydantic серіалізуватиме це на боці Rust, ви отримаєте значно вищу продуктивність, ніж якби не оголошували тип повернення.

Не-async функції операцій шляху

Ви також можете використовувати звичайні функції def (без async) і використовувати yield так само.

FastAPI подбає про коректне виконання, щоб це не блокувало цикл подій.

Оскільки в цьому випадку функція не є async, правильним типом повернення буде Iterable[Item]:

# Code above omitted 👆

@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Без типу повернення

Ви також можете опустити тип повернення. Тоді FastAPI використає jsonable_encoder, щоб перетворити дані на щось, що можна серіалізувати в JSON, і потім надішле їх як JSON Lines.

# Code above omitted 👆

@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Події, надіслані сервером (SSE)

FastAPI також має повноцінну підтримку Server-Sent Events (SSE), які досить схожі, але мають кілька додаткових деталей. Ви можете дізнатися про них у наступному розділі: Події, надіслані сервером (SSE). 🤓