コンテンツにスキップ

JSON Lines をストリームする

🌐 AI と人間による翻訳

この翻訳は、人間のガイドに基づいて AI によって作成されました。🤝

原文の意図を取り違えていたり、不自然な表現になっている可能性があります。🤖

AI LLM をより適切に誘導するのを手伝う ことで、この翻訳を改善できます。

英語版

データのシーケンスを「ストリーム」で送りたい場合、JSON Lines を使って実現できます。

情報

FastAPI 0.134.0 で追加されました。

ストリームとは

データをストリーミングするとは、アイテムの全シーケンスが用意できるのを待たずに、アプリがデータアイテムの送信をクライアントに対して開始することを意味します。

つまり、最初のアイテムを送信し、クライアントはそれを受け取って処理を始めます。その間に、次のアイテムをまだ生成しているかもしれません。

sequenceDiagram
    participant App
    participant Client

    App->>App: Produce Item 1
    App->>Client: Send Item 1
    App->>App: Produce Item 2
    Client->>Client: Process Item 1
    App->>Client: Send Item 2
    App->>App: Produce Item 3
    Client->>Client: Process Item 2
    App->>Client: Send Item 3
    Client->>Client: Process Item 3
    Note over App: Keeps producing...
    Note over Client: Keeps consuming...

データを送り続ける無限ストリームにすることもできます。

JSON Lines

このような場合、1 行に 1 つの JSON オブジェクトを送る「JSON Lines」形式を使うのが一般的です。

レスポンスの content type は application/jsonlapplication/json の代わり)となり、ボディは次のようになります:

{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}

これは JSON 配列(Python の list に相当)にとてもよく似ていますが、[] で囲まず、アイテム間の , もありません。その代わりに、1 行に 1 つの JSON オブジェクトで、改行文字で区切られます。

情報

重要な点は、クライアントが前の行を消費している間に、アプリ側は次の行を順次生成して送れることです。

技術詳細

各 JSON オブジェクトは改行で区切られるため、内容にリテラルな改行文字は含められません。ですが、エスケープした改行(\n)は含められます。これは JSON 標準の一部です。

とはいえ、通常は気にする必要はありません。自動で処理されますので、読み進めてください。🤓

ユースケース

これは AI LLM サービス、ログテレメトリ、あるいは JSON アイテムとして構造化できる他の種類のデータをストリームするのに使えます。

豆知識

動画や音声などのバイナリデータをストリームしたい場合は、上級ガイドを参照してください: データのストリーム

FastAPI で JSON Lines をストリームする

FastAPI で JSON Lines をストリームするには、path operation 関数return を使う代わりに、yield を使って各アイテムを順に生成します。

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

送り返す各 JSON アイテムが Item(Pydantic モデル)型で、関数が async の場合、戻り値の型を AsyncIterable[Item] と宣言できます:

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

戻り値の型を宣言すると、FastAPI はそれを使ってデータを検証し、OpenAPI にドキュメント化し、フィルターし、Pydantic でシリアライズします。

豆知識

Pydantic は Rust 側でシリアライズを行うため、戻り値の型を宣言しない場合に比べて大幅に高いパフォーマンスが得られます。

非 async の path operation 関数

async を使わない通常の def 関数でも同様に yield を使えます。

FastAPI が適切に実行されるように処理するため、イベントループをブロックしません。

この場合は関数が async ではないので、適切な戻り値の型は Iterable[Item] です:

# Code above omitted 👆

@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

戻り値の型なし

戻り値の型を省略することもできます。FastAPI はその場合、データを JSON にシリアライズ可能な形に変換するために jsonable_encoder を使い、JSON Lines として送信します。

# Code above omitted 👆

@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Server-Sent Events (SSE)

FastAPI は Server-Sent Events (SSE) にもファーストクラスで対応しています。とても似ていますが、いくつか追加の詳細があります。次の章で学べます: Server-Sent Events (SSE)。🤓