Перейти до змісту

Події, надіслані сервером (SSE)

🌐 Переклад ШІ та людьми

Цей переклад виконано ШІ під керівництвом людей. 🤝

Можливі помилки через неправильне розуміння початкового змісту або неприродні формулювання тощо. 🤖

Ви можете покращити цей переклад, допомігши нам краще спрямовувати AI LLM.

Англійська версія

Ви можете транслювати дані клієнту за допомогою Server-Sent Events (SSE).

Це подібно до Потік JSON Lines, але використовує формат text/event-stream, який нативно підтримується браузерами через API EventSource.

Інформація

Додано у FastAPI 0.135.0.

Що таке Server-Sent Events

SSE - це стандарт для трансляції даних із сервера до клієнта по HTTP.

Кожна подія - це невеликий текстовий блок із «полями» на кшталт data, event, id та retry, розділений порожніми рядками.

Виглядає так:

data: {"name": "Portal Gun", "price": 999.99}

data: {"name": "Plumbus", "price": 32.99}

SSE часто використовують для стрімінгу чатів ШІ, живих сповіщень, логів і спостережуваності, а також інших випадків, коли сервер надсилає оновлення клієнту.

Порада

Якщо ви хочете транслювати бінарні дані, наприклад відео чи аудіо, перегляньте просунутий посібник: Потік даних.

Стрімінг SSE у FastAPI

Щоб транслювати SSE з FastAPI, використовуйте yield у вашій функції операції шляху і встановіть response_class=EventSourceResponse.

Імпортуйте EventSourceResponse з fastapi.sse:

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Кожен елемент, повернений через yield, кодується як JSON і надсилається в полі data: події SSE.

Якщо ви оголосите тип повернення як AsyncIterable[Item], FastAPI використає його, щоб перевіряти, документувати і серіалізувати дані за допомогою Pydantic.

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Порада

Оскільки Pydantic серіалізує це на боці Rust, ви отримаєте значно вищу продуктивність, ніж якби не оголошували тип повернення.

Не-async функції операцій шляху

Ви також можете використовувати звичайні функції def (без async) і використовувати yield так само.

FastAPI подбає про коректне виконання, щоб воно не блокувало цикл подій.

Оскільки в цьому випадку функція не async, коректним типом повернення буде Iterable[Item]:

# Code above omitted 👆

@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Без типу повернення

Можна також опустити тип повернення. FastAPI використає jsonable_encoder, щоб конвертувати дані і надіслати їх.

# Code above omitted 👆

@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

ServerSentEvent

Якщо вам потрібно встановити поля SSE, такі як event, id, retry або comment, ви можете повертати через yield об'єкти ServerSentEvent замість звичайних даних.

Імпортуйте ServerSentEvent з fastapi.sse:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
    yield ServerSentEvent(comment="stream of item updates")
    for i, item in enumerate(items):
        yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)

Поле data завжди кодується як JSON. Ви можете передати будь-яке значення, яке можна серіалізувати в JSON, включно з моделями Pydantic.

Сирі дані

Якщо потрібно надіслати дані без кодування в JSON, використовуйте raw_data замість data.

Це корисно для надсилання попередньо відформатованого тексту, рядків логів або спеціальних значень «значення-сторож», як-от [DONE].

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()


@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
    logs = [
        "2025-01-01 INFO  Application started",
        "2025-01-01 DEBUG Connected to database",
        "2025-01-01 WARN  High memory usage detected",
    ]
    for log_line in logs:
        yield ServerSentEvent(raw_data=log_line)

Примітка

data і raw_data взаємовиключні. У кожному ServerSentEvent ви можете встановити лише одне з них.

Відновлення з Last-Event-ID

Коли браузер перепідключається після розриву з'єднання, він надсилає останній отриманий id у заголовку Last-Event-ID.

Ви можете прочитати його як параметр заголовка і використати, щоб відновити потік із місця, де клієнт зупинився:

from collections.abc import AsyncIterable
from typing import Annotated

from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
    last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
    start = last_event_id + 1 if last_event_id is not None else 0
    for i, item in enumerate(items):
        if i < start:
            continue
        yield ServerSentEvent(data=item, id=str(i))

SSE з POST

SSE працює з будь-яким HTTP-методом, не лише з GET.

Це корисно для протоколів на кшталт MCP, які транслюють SSE через POST:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Prompt(BaseModel):
    text: str


@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
    words = prompt.text.split()
    for word in words:
        yield ServerSentEvent(data=word, event="token")
    yield ServerSentEvent(raw_data="[DONE]", event="done")

Технічні деталі

FastAPI реалізує деякі найкращі практики SSE «з коробки».

  • Надсилати коментар «keep alive» ping кожні 15 секунд, коли не було жодного повідомлення, щоб запобігти закриттю з'єднання деякими проксі, як рекомендовано у Специфікації HTML: Події, надіслані сервером.
  • Встановити заголовок Cache-Control: no-cache, щоб запобігти кешуванню потоку.
  • Встановити спеціальний заголовок X-Accel-Buffering: no, щоб запобігти буферизації у деяких проксі, наприклад Nginx.

Вам не потрібно нічого з цим робити, воно працює «з коробки». 🤓