Server-Sent Events (SSE)¶
🌐 AI と人間による翻訳
この翻訳は、人間のガイドに基づいて AI によって作成されました。🤝
原文の意図を取り違えていたり、不自然な表現になっている可能性があります。🤖
AI LLM をより適切に誘導するのを手伝う ことで、この翻訳を改善できます。
Server-Sent Events (SSE) を使うと、クライアントへデータをストリーミングできます。
これはJSON Lines のストリーミングに似ていますが、text/event-stream フォーマットを使用します。これはブラウザがネイティブに EventSource API でサポートしています。
情報
FastAPI 0.135.0 で追加されました。
Server-Sent Events とは¶
SSE は、HTTP 経由でサーバーからクライアントへデータをストリーミングするための標準です。
各イベントは、data、event、id、retry などの「フィールド」を含む小さなテキストブロックで、空行で区切られます。
次のようになります:
data: {"name": "Portal Gun", "price": 999.99}
data: {"name": "Plumbus", "price": 32.99}
SSE は、AI チャットのストリーミング、ライブ通知、ログやオブザビリティなど、サーバーがクライアントへ更新をプッシュする用途で一般的に使われます。
豆知識
バイナリデータ(例: 動画や音声)をストリーミングしたい場合は、上級ガイド データのストリーミング を参照してください。
FastAPI で SSE をストリーミング¶
FastAPI で SSE をストリーミングするには、path operation 関数で yield を使い、response_class=EventSourceResponse を設定します。
EventSourceResponse は fastapi.sse からインポートします:
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
yield された各アイテムは JSON にエンコードされ、SSE イベントの data: フィールドで送信されます。
戻り値の型を AsyncIterable[Item] と宣言すると、FastAPI は Pydantic を用いてデータを検証、ドキュメント化、シリアライズします。
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
豆知識
Pydantic がRust 側でシリアライズを行うため、戻り値の型を宣言しない場合に比べて大幅に高性能になります。
非 async の path operation 関数¶
通常の def 関数(async なし)も使用でき、同様に yield を使えます。
イベントループをブロックしないよう、FastAPI が正しく実行されるように調整します。
この場合は関数が async ではないため、適切な戻り値の型は Iterable[Item] です:
# Code above omitted 👆
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
戻り値の型なし¶
戻り値の型を省略することもできます。FastAPI は jsonable_encoder を使ってデータを変換し、送信します。
# Code above omitted 👆
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
ServerSentEvent¶
event、id、retry、comment などの SSE フィールドを設定する必要がある場合は、生データの代わりに ServerSentEvent オブジェクトを yield できます。
ServerSentEvent は fastapi.sse からインポートします:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
yield ServerSentEvent(comment="stream of item updates")
for i, item in enumerate(items):
yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)
data フィールドは常に JSON にエンコードされます。Pydantic モデルを含む、JSON にシリアライズ可能な任意の値を渡せます。
生データ¶
JSON エンコードせずにデータを送る必要がある場合は、data の代わりに raw_data を使用します。
これは、整形済みテキスト、ログ行、または [DONE] のような特別な "センチネル" 値を送るのに有用です。
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
logs = [
"2025-01-01 INFO Application started",
"2025-01-01 DEBUG Connected to database",
"2025-01-01 WARN High memory usage detected",
]
for log_line in logs:
yield ServerSentEvent(raw_data=log_line)
備考
data と raw_data は相互排他的です。各 ServerSentEvent ではどちらか一方しか設定できません。
Last-Event-ID での再開¶
接続が途切れた後にブラウザが再接続すると、最後に受信した id を Last-Event-ID ヘッダーで送信します。
これをヘッダーパラメータとして受け取り、クライアントが離脱した位置からストリームを再開できます:
from collections.abc import AsyncIterable
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
start = last_event_id + 1 if last_event_id is not None else 0
for i, item in enumerate(items):
if i < start:
continue
yield ServerSentEvent(data=item, id=str(i))
POST での SSE¶
SSE は GET だけでなく、任意の HTTP メソッドで動作します。
これは、POST 上で SSE をストリーミングする MCP のようなプロトコルで有用です:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Prompt(BaseModel):
text: str
@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
words = prompt.text.split()
for word in words:
yield ServerSentEvent(data=word, event="token")
yield ServerSentEvent(raw_data="[DONE]", event="done")
技術詳細¶
FastAPI は SSE のいくつかのベストプラクティスを標準で実装しています。
- メッセージがない場合は 15 秒ごとに「キープアライブ」用の
pingコメントを送信し、一部のプロキシが接続を閉じるのを防ぎます(HTML 仕様: Server-Sent Events の推奨に従います)。 - ストリームのキャッシュを防止するため、
Cache-Control: no-cacheヘッダーを設定します。 - Nginx など一部のプロキシでのバッファリングを防ぐため、特別なヘッダー
X-Accel-Buffering: noを設定します。
追加の設定は不要で、そのまま動作します。🤓