コンテンツにスキップ

Server-Sent Events (SSE)

🌐 AI と人間による翻訳

この翻訳は、人間のガイドに基づいて AI によって作成されました。🤝

原文の意図を取り違えていたり、不自然な表現になっている可能性があります。🤖

AI LLM をより適切に誘導するのを手伝う ことで、この翻訳を改善できます。

英語版

Server-Sent Events (SSE) を使うと、クライアントへデータをストリーミングできます。

これはJSON Lines のストリーミングに似ていますが、text/event-stream フォーマットを使用します。これはブラウザがネイティブに EventSource API でサポートしています。

情報

FastAPI 0.135.0 で追加されました。

Server-Sent Events とは

SSE は、HTTP 経由でサーバーからクライアントへデータをストリーミングするための標準です。

各イベントは、dataeventidretry などの「フィールド」を含む小さなテキストブロックで、空行で区切られます。

次のようになります:

data: {"name": "Portal Gun", "price": 999.99}

data: {"name": "Plumbus", "price": 32.99}

SSE は、AI チャットのストリーミング、ライブ通知、ログやオブザビリティなど、サーバーがクライアントへ更新をプッシュする用途で一般的に使われます。

豆知識

バイナリデータ(例: 動画や音声)をストリーミングしたい場合は、上級ガイド データのストリーミング を参照してください。

FastAPI で SSE をストリーミング

FastAPI で SSE をストリーミングするには、path operation 関数yield を使い、response_class=EventSourceResponse を設定します。

EventSourceResponsefastapi.sse からインポートします:

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

yield された各アイテムは JSON にエンコードされ、SSE イベントの data: フィールドで送信されます。

戻り値の型を AsyncIterable[Item] と宣言すると、FastAPI は Pydantic を用いてデータを検証ドキュメント化シリアライズします。

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

豆知識

Pydantic がRust 側でシリアライズを行うため、戻り値の型を宣言しない場合に比べて大幅に高性能になります。

非 async の path operation 関数

通常の def 関数(async なし)も使用でき、同様に yield を使えます。

イベントループをブロックしないよう、FastAPI が正しく実行されるように調整します。

この場合は関数が async ではないため、適切な戻り値の型は Iterable[Item] です:

# Code above omitted 👆

@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

戻り値の型なし

戻り値の型を省略することもできます。FastAPI は jsonable_encoder を使ってデータを変換し、送信します。

# Code above omitted 👆

@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

ServerSentEvent

eventidretrycomment などの SSE フィールドを設定する必要がある場合は、生データの代わりに ServerSentEvent オブジェクトを yield できます。

ServerSentEventfastapi.sse からインポートします:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
    yield ServerSentEvent(comment="stream of item updates")
    for i, item in enumerate(items):
        yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)

data フィールドは常に JSON にエンコードされます。Pydantic モデルを含む、JSON にシリアライズ可能な任意の値を渡せます。

生データ

JSON エンコードせずにデータを送る必要がある場合は、data の代わりに raw_data を使用します。

これは、整形済みテキスト、ログ行、または [DONE] のような特別な "センチネル" 値を送るのに有用です。

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()


@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
    logs = [
        "2025-01-01 INFO  Application started",
        "2025-01-01 DEBUG Connected to database",
        "2025-01-01 WARN  High memory usage detected",
    ]
    for log_line in logs:
        yield ServerSentEvent(raw_data=log_line)

備考

dataraw_data は相互排他的です。各 ServerSentEvent ではどちらか一方しか設定できません。

Last-Event-ID での再開

接続が途切れた後にブラウザが再接続すると、最後に受信した idLast-Event-ID ヘッダーで送信します。

これをヘッダーパラメータとして受け取り、クライアントが離脱した位置からストリームを再開できます:

from collections.abc import AsyncIterable
from typing import Annotated

from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
    last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
    start = last_event_id + 1 if last_event_id is not None else 0
    for i, item in enumerate(items):
        if i < start:
            continue
        yield ServerSentEvent(data=item, id=str(i))

POST での SSE

SSE は GET だけでなく、任意の HTTP メソッドで動作します。

これは、POST 上で SSE をストリーミングする MCP のようなプロトコルで有用です:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Prompt(BaseModel):
    text: str


@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
    words = prompt.text.split()
    for word in words:
        yield ServerSentEvent(data=word, event="token")
    yield ServerSentEvent(raw_data="[DONE]", event="done")

技術詳細

FastAPI は SSE のいくつかのベストプラクティスを標準で実装しています。

  • メッセージがない場合は 15 秒ごとに「キープアライブ」用の ping コメントを送信し、一部のプロキシが接続を閉じるのを防ぎます(HTML 仕様: Server-Sent Events の推奨に従います)。
  • ストリームのキャッシュを防止するため、Cache-Control: no-cache ヘッダーを設定します。
  • Nginx など一部のプロキシでのバッファリングを防ぐため、特別なヘッダー X-Accel-Buffering: no を設定します。

追加の設定は不要で、そのまま動作します。🤓