Події, надіслані сервером (SSE)¶
🌐 Переклад ШІ та людьми
Цей переклад виконано ШІ під керівництвом людей. 🤝
Можливі помилки через неправильне розуміння початкового змісту або неприродні формулювання тощо. 🤖
Ви можете покращити цей переклад, допомігши нам краще спрямовувати AI LLM.
Ви можете транслювати дані клієнту за допомогою Server-Sent Events (SSE).
Це подібно до Потік JSON Lines, але використовує формат text/event-stream, який нативно підтримується браузерами через API EventSource.
Інформація
Додано у FastAPI 0.135.0.
Що таке Server-Sent Events¶
SSE - це стандарт для трансляції даних із сервера до клієнта по HTTP.
Кожна подія - це невеликий текстовий блок із «полями» на кшталт data, event, id та retry, розділений порожніми рядками.
Виглядає так:
data: {"name": "Portal Gun", "price": 999.99}
data: {"name": "Plumbus", "price": 32.99}
SSE часто використовують для стрімінгу чатів ШІ, живих сповіщень, логів і спостережуваності, а також інших випадків, коли сервер надсилає оновлення клієнту.
Порада
Якщо ви хочете транслювати бінарні дані, наприклад відео чи аудіо, перегляньте просунутий посібник: Потік даних.
Стрімінг SSE у FastAPI¶
Щоб транслювати SSE з FastAPI, використовуйте yield у вашій функції операції шляху і встановіть response_class=EventSourceResponse.
Імпортуйте EventSourceResponse з fastapi.sse:
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Кожен елемент, повернений через yield, кодується як JSON і надсилається в полі data: події SSE.
Якщо ви оголосите тип повернення як AsyncIterable[Item], FastAPI використає його, щоб перевіряти, документувати і серіалізувати дані за допомогою Pydantic.
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Порада
Оскільки Pydantic серіалізує це на боці Rust, ви отримаєте значно вищу продуктивність, ніж якби не оголошували тип повернення.
Не-async функції операцій шляху¶
Ви також можете використовувати звичайні функції def (без async) і використовувати yield так само.
FastAPI подбає про коректне виконання, щоб воно не блокувало цикл подій.
Оскільки в цьому випадку функція не async, коректним типом повернення буде Iterable[Item]:
# Code above omitted 👆
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Без типу повернення¶
Можна також опустити тип повернення. FastAPI використає jsonable_encoder, щоб конвертувати дані і надіслати їх.
# Code above omitted 👆
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
ServerSentEvent¶
Якщо вам потрібно встановити поля SSE, такі як event, id, retry або comment, ви можете повертати через yield об'єкти ServerSentEvent замість звичайних даних.
Імпортуйте ServerSentEvent з fastapi.sse:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
yield ServerSentEvent(comment="stream of item updates")
for i, item in enumerate(items):
yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)
Поле data завжди кодується як JSON. Ви можете передати будь-яке значення, яке можна серіалізувати в JSON, включно з моделями Pydantic.
Сирі дані¶
Якщо потрібно надіслати дані без кодування в JSON, використовуйте raw_data замість data.
Це корисно для надсилання попередньо відформатованого тексту, рядків логів або спеціальних значень «значення-сторож», як-от [DONE].
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
logs = [
"2025-01-01 INFO Application started",
"2025-01-01 DEBUG Connected to database",
"2025-01-01 WARN High memory usage detected",
]
for log_line in logs:
yield ServerSentEvent(raw_data=log_line)
Примітка
data і raw_data взаємовиключні. У кожному ServerSentEvent ви можете встановити лише одне з них.
Відновлення з Last-Event-ID¶
Коли браузер перепідключається після розриву з'єднання, він надсилає останній отриманий id у заголовку Last-Event-ID.
Ви можете прочитати його як параметр заголовка і використати, щоб відновити потік із місця, де клієнт зупинився:
from collections.abc import AsyncIterable
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
start = last_event_id + 1 if last_event_id is not None else 0
for i, item in enumerate(items):
if i < start:
continue
yield ServerSentEvent(data=item, id=str(i))
SSE з POST¶
SSE працює з будь-яким HTTP-методом, не лише з GET.
Це корисно для протоколів на кшталт MCP, які транслюють SSE через POST:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Prompt(BaseModel):
text: str
@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
words = prompt.text.split()
for word in words:
yield ServerSentEvent(data=word, event="token")
yield ServerSentEvent(raw_data="[DONE]", event="done")
Технічні деталі¶
FastAPI реалізує деякі найкращі практики SSE «з коробки».
- Надсилати коментар «keep alive»
pingкожні 15 секунд, коли не було жодного повідомлення, щоб запобігти закриттю з'єднання деякими проксі, як рекомендовано у Специфікації HTML: Події, надіслані сервером. - Встановити заголовок
Cache-Control: no-cache, щоб запобігти кешуванню потоку. - Встановити спеціальний заголовок
X-Accel-Buffering: no, щоб запобігти буферизації у деяких проксі, наприклад Nginx.
Вам не потрібно нічого з цим робити, воно працює «з коробки». 🤓