Skip to content

Server-Sent Events(SSE)

🌐 AI 與人類共同完成的翻譯

此翻譯由人類指導的 AI 完成。🤝

可能會有對原意的誤解,或讀起來不自然等問題。🤖

你可以透過協助我們更好地引導 AI LLM來改進此翻譯。

英文版

你可以使用 Server-Sent Events(SSE)把資料串流傳送給用戶端。

這與串流 JSON Lines類似,但使用瀏覽器原生支援、透過 EventSource APItext/event-stream 格式。

Info

在 FastAPI 0.135.0 新增。

什麼是 Server-Sent Events?

SSE 是一種透過 HTTP 從伺服器向用戶端串流傳送資料的標準。

每個事件都是一個小型文字區塊,包含 dataeventidretry 等「欄位」,並以空白行分隔。

看起來像這樣:

data: {"name": "Portal Gun", "price": 999.99}

data: {"name": "Plumbus", "price": 32.99}

SSE 常用於 AI 聊天串流、即時通知、日誌與可觀察性,以及其他由伺服器主動推送更新給用戶端的情境。

Tip

如果你要串流二進位資料,例如影片或音訊,請參考進階指南:串流資料

使用 FastAPI 串流 SSE

要在 FastAPI 中串流 SSE,請在你的路徑操作函式(path operation function)中使用 yield,並設定 response_class=EventSourceResponse

fastapi.sse 匯入 EventSourceResponse

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

每個 yield 的項目都會以 JSON 編碼並放在 SSE 事件的 data: 欄位中送出。

如果你把回傳型別宣告為 AsyncIterable[Item],FastAPI 會用它來透過 Pydantic 進行驗證文件化序列化

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Tip

因為 Pydantic 會在 Rust 端進行序列化,如果你有宣告回傳型別,效能會比未宣告時高很多。

非 async 的路徑操作函式

你也可以使用一般的 def 函式(沒有 async),並同樣使用 yield

FastAPI 會確保正確執行,不會阻塞事件迴圈。

由於此函式不是 async,正確的回傳型別是 Iterable[Item]

# Code above omitted 👆

@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

無回傳型別

你也可以省略回傳型別。FastAPI 會使用 jsonable_encoder 轉換資料並送出。

# Code above omitted 👆

@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

ServerSentEvent

如果你需要設定 eventidretrycomment 等 SSE 欄位,你可以改為 yieldServerSentEvent 物件,而不是單純的資料。

fastapi.sse 匯入 ServerSentEvent

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
    yield ServerSentEvent(comment="stream of item updates")
    for i, item in enumerate(items):
        yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)

data 欄位一律會以 JSON 編碼。你可以傳入任何可序列化為 JSON 的值,包括 Pydantic 模型。

原始資料

如果你需要在進行 JSON 編碼的情況下傳送資料,請使用 raw_data 取代 data

這對於傳送已格式化的文字、日誌行或特殊的 "哨兵" 值(例如 [DONE])很有用。

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()


@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
    logs = [
        "2025-01-01 INFO  Application started",
        "2025-01-01 DEBUG Connected to database",
        "2025-01-01 WARN  High memory usage detected",
    ]
    for log_line in logs:
        yield ServerSentEvent(raw_data=log_line)

Note

dataraw_data 互斥。每個 ServerSentEvent 只能設定其中一個。

使用 Last-Event-ID 繼續

當瀏覽器在連線中斷後重新連線時,會在 Last-Event-ID 標頭中傳送最後接收到的 id

你可以將它作為標頭參數讀取,並用來從用戶端中斷處繼續串流:

from collections.abc import AsyncIterable
from typing import Annotated

from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
    last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
    start = last_event_id + 1 if last_event_id is not None else 0
    for i, item in enumerate(items):
        if i < start:
            continue
        yield ServerSentEvent(data=item, id=str(i))

使用 POST 的 SSE

SSE 可搭配任何 HTTP 方法,不僅限於 GET

這對於像是透過 POST 串流 SSE 的協定(例如 MCP)很有用:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Prompt(BaseModel):
    text: str


@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
    words = prompt.text.split()
    for word in words:
        yield ServerSentEvent(data=word, event="token")
    yield ServerSentEvent(raw_data="[DONE]", event="done")

技術細節

FastAPI 內建實作了一些 SSE 的最佳實務。

  • 當 15 秒內沒有任何訊息時,傳送一次「保活」ping 註解,以避免某些代理伺服器關閉連線;此作法源自於HTML 規範:Server-Sent Events中的建議。
  • 設定 Cache-Control: no-cache 標頭,以防止快取串流內容。
  • 設定特殊標頭 X-Accel-Buffering: no,以避免在像 Nginx 這類代理中被緩衝

你不需要做任何事,開箱即用。🤓