События, отправляемые сервером (SSE)¶
🌐 Перевод выполнен с помощью ИИ и людей
Этот перевод был сделан ИИ под руководством людей. 🤝
В нем могут быть ошибки из-за неправильного понимания оригинального смысла или неестественности и т. д. 🤖
Вы можете улучшить этот перевод, помогая нам лучше направлять ИИ LLM.
Вы можете передавать данные потоком клиенту, используя Server-Sent Events (SSE).
Это похоже на Стриминг JSON Lines, но использует формат text/event-stream, который нативно поддерживается браузерами через EventSource API.
Информация
Добавлено в FastAPI 0.135.0.
Что такое Server-Sent Events?¶
SSE — это стандарт для потоковой передачи данных с сервера на клиента по HTTP.
Каждое событие — это небольшой текстовый блок с «полями», такими как data, event, id и retry, разделёнными пустыми строками.
Это выглядит так:
data: {"name": "Portal Gun", "price": 999.99}
data: {"name": "Plumbus", "price": 32.99}
SSE часто используют для стриминга ответов ИИ в чатах, живых уведомлений, логов и наблюдаемости, а также в других случаях, когда сервер «проталкивает» обновления клиенту.
Совет
Если вам нужно стримить бинарные данные, например видео или аудио, посмотрите расширенное руководство: Stream Data.
Стриминг SSE с FastAPI¶
Чтобы стримить SSE с FastAPI, используйте yield в своей функции-обработчике пути и укажите response_class=EventSourceResponse.
Импортируйте EventSourceResponse из fastapi.sse:
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Каждый возвращаемый через yield элемент кодируется как JSON и отправляется в поле data: события SSE.
Если вы объявите тип возврата как AsyncIterable[Item], FastAPI будет использовать его, чтобы выполнить валидацию, добавить документацию и сериализовать данные с помощью Pydantic.
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Совет
Так как Pydantic будет сериализовать это на стороне Rust, вы получите значительно более высокую производительность, чем если не объявите тип возврата.
Несинхронные функции-обработчики пути¶
Вы также можете использовать обычные функции def (без async) и применять yield тем же образом.
FastAPI проследит, чтобы выполнение прошло корректно и не блокировало цикл событий.
Так как в этом случае функция не async, правильным типом возврата будет Iterable[Item]:
# Code above omitted 👆
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Без объявленного типа возврата¶
Вы также можете опустить тип возврата. FastAPI использует jsonable_encoder для преобразования данных и их отправки.
# Code above omitted 👆
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
ServerSentEvent¶
Если вам нужно задать поля SSE, такие как event, id, retry или comment, вы можете возвращать через yield объекты ServerSentEvent вместо обычных данных.
Импортируйте ServerSentEvent из fastapi.sse:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
yield ServerSentEvent(comment="stream of item updates")
for i, item in enumerate(items):
yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)
Поле data всегда кодируется как JSON. Вы можете передавать любое значение, сериализуемое в JSON, включая Pydantic-модели.
Необработанные данные¶
Если нужно отправлять данные без JSON-кодирования, используйте raw_data вместо data.
Это полезно для отправки заранее отформатированного текста, строк логов или специальных значений «сентинель», например [DONE].
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
logs = [
"2025-01-01 INFO Application started",
"2025-01-01 DEBUG Connected to database",
"2025-01-01 WARN High memory usage detected",
]
for log_line in logs:
yield ServerSentEvent(raw_data=log_line)
Примечание
data и raw_data взаимно исключают друг друга. В каждом ServerSentEvent можно задать только одно из них.
Возобновление с Last-Event-ID¶
Когда браузер переподключается после обрыва соединения, он отправляет последний полученный id в HTTP-заголовке Last-Event-ID.
Вы можете прочитать его как параметр заголовка и использовать, чтобы возобновить поток с того места, где клиент остановился:
from collections.abc import AsyncIterable
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
start = last_event_id + 1 if last_event_id is not None else 0
for i, item in enumerate(items):
if i < start:
continue
yield ServerSentEvent(data=item, id=str(i))
SSE с POST¶
SSE работает с любым HTTP-методом, не только с GET.
Это полезно для таких протоколов, как MCP, которые стримят SSE по POST:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Prompt(BaseModel):
text: str
@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
words = prompt.text.split()
for word in words:
yield ServerSentEvent(data=word, event="token")
yield ServerSentEvent(raw_data="[DONE]", event="done")
Технические детали¶
FastAPI из коробки реализует некоторые лучшие практики для SSE.
- Отправлять комментарий «ping» для поддержания соединения («keep alive») каждые 15 секунд, когда нет сообщений, чтобы предотвратить закрытие соединения некоторыми прокси, как рекомендовано в HTML specification: Server-Sent Events.
- Устанавливать заголовок
Cache-Control: no-cache, чтобы предотвратить кэширование потока. - Устанавливать специальный заголовок
X-Accel-Buffering: no, чтобы предотвратить буферизацию в некоторых прокси, например Nginx.
Вам не нужно ничего настраивать, это работает из коробки. 🤓