服务器发送事件(SSE)¶
你可以使用服务器发送事件(SSE)向客户端流式发送数据。
这类似于流式传输 JSON Lines,但使用 text/event-stream 格式,浏览器原生通过 EventSource API 支持。
信息
新增于 FastAPI 0.135.0。
什么是服务器发送事件?¶
SSE 是一种通过 HTTP 从服务器向客户端流式传输数据的标准。
每个事件是一个带有 data、event、id 和 retry 等“字段”的小文本块,以空行分隔。
看起来像这样:
data: {"name": "Portal Gun", "price": 999.99}
data: {"name": "Plumbus", "price": 32.99}
SSE 常用于 AI 聊天流式输出、实时通知、日志与可观测性,以及其他服务器向客户端推送更新的场景。
提示
如果你想流式传输二进制数据(例如视频或音频),请查看高级指南:流式传输数据。
使用 FastAPI 流式传输 SSE¶
要在 FastAPI 中流式传输 SSE,在你的路径操作函数中使用 yield,并设置 response_class=EventSourceResponse。
从 fastapi.sse 导入 EventSourceResponse:
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
每个被 yield 的项会被编码为 JSON,并放入 SSE 事件的 data: 字段发送。
如果你将返回类型声明为 AsyncIterable[Item],FastAPI 将使用它通过 Pydantic对数据进行校验、文档化和序列化。
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
提示
由于 Pydantic 会在Rust 端序列化它,相比未声明返回类型,你将获得更高的性能。
非 async 的路径操作函数¶
你也可以使用常规的 def 函数(没有 async),并以同样的方式使用 yield。
FastAPI 会确保其正确运行,从而不阻塞事件循环。
由于此时函数不是 async,正确的返回类型应为 Iterable[Item]:
# Code above omitted 👆
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
无返回类型¶
你也可以省略返回类型。FastAPI 将使用 jsonable_encoder 转换数据并发送。
# Code above omitted 👆
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
ServerSentEvent¶
如果你需要设置 event、id、retry 或 comment 等 SSE 字段,你可以 yield ServerSentEvent 对象,而不是直接返回数据。
从 fastapi.sse 导入 ServerSentEvent:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
yield ServerSentEvent(comment="stream of item updates")
for i, item in enumerate(items):
yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)
data 字段始终会被编码为 JSON。你可以传入任何可被序列化为 JSON 的值,包括 Pydantic 模型。
原始数据¶
如果你需要发送不进行 JSON 编码的数据,请使用 raw_data 而不是 data。
这对于发送预格式化文本、日志行或特殊的 "哨兵" 值(例如 [DONE])很有用。
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
logs = [
"2025-01-01 INFO Application started",
"2025-01-01 DEBUG Connected to database",
"2025-01-01 WARN High memory usage detected",
]
for log_line in logs:
yield ServerSentEvent(raw_data=log_line)
注意
data 和 raw_data 是互斥的。每个 ServerSentEvent 上只能设置其中一个。
使用 Last-Event-ID 恢复¶
当连接中断后浏览器重新连接时,会在 Last-Event-ID 头中发送上次收到的 id。
你可以将其读取为一个请求头参数,并据此从客户端离开的地方恢复流:
from collections.abc import AsyncIterable
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
start = last_event_id + 1 if last_event_id is not None else 0
for i, item in enumerate(items):
if i < start:
continue
yield ServerSentEvent(data=item, id=str(i))
使用 POST 的 SSE¶
SSE 适用于任意 HTTP 方法,不仅仅是 GET。
这对像 MCP 这样通过 POST 传输 SSE 的协议很有用:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Prompt(BaseModel):
text: str
@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
words = prompt.text.split()
for word in words:
yield ServerSentEvent(data=word, event="token")
yield ServerSentEvent(raw_data="[DONE]", event="done")
技术细节¶
FastAPI 开箱即用地实现了一些 SSE 的最佳实践。
- 当 15 秒内没有任何消息时,发送一个保活
ping注释,以防某些代理关闭连接,正如 HTML 规范:Server-Sent Events 中建议的那样。 - 设置
Cache-Control: no-cache响应头,防止缓存流。 - 设置特殊响应头
X-Accel-Buffering: no,以防止某些代理(如 Nginx)缓冲。
你无需做任何事,它开箱即用。🤓