Zum Inhalt

Server-Sent Events (SSE)

🌐 Übersetzung durch KI und Menschen

Diese Übersetzung wurde von KI erstellt, angeleitet von Menschen. 🤝

Sie könnte Fehler enthalten, etwa Missverständnisse des ursprünglichen Sinns oder unnatürliche Formulierungen, usw. 🤖

Sie können diese Übersetzung verbessern, indem Sie uns helfen, die KI-LLM besser anzuleiten.

Englische Version

Sie können Daten mithilfe von Server-Sent Events (SSE) an den Client streamen.

Das ist ähnlich wie JSON Lines streamen, verwendet aber das Format text/event-stream, das von Browsern nativ mit der die EventSource-API unterstützt wird.

Info

Hinzugefügt in FastAPI 0.135.0.

Was sind Server-Sent Events?

SSE ist ein Standard zum Streamen von Daten vom Server zum Client über HTTP.

Jedes Event ist ein kleiner Textblock mit „Feldern“ wie data, event, id und retry, getrennt durch Leerzeilen.

Das sieht so aus:

data: {"name": "Portal Gun", "price": 999.99}

data: {"name": "Plumbus", "price": 32.99}

SSE wird häufig für KI-Chat-Streaming, Live-Benachrichtigungen, Logs und Observability sowie andere Fälle verwendet, in denen der Server Updates an den Client pusht.

Tipp

Wenn Sie Binärdaten streamen wollen, z. B. Video oder Audio, sehen Sie im fortgeschrittenen Handbuch nach: Daten streamen.

SSE mit FastAPI streamen

Um SSE mit FastAPI zu streamen, verwenden Sie yield in Ihrer Pfadoperation-Funktion und setzen Sie response_class=EventSourceResponse.

Importieren Sie EventSourceResponse aus fastapi.sse:

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Jedes mit yield zurückgegebene Element wird als JSON kodiert und im Feld data: eines SSE-Events gesendet.

Wenn Sie den Rückgabetyp als AsyncIterable[Item] deklarieren, verwendet FastAPI ihn, um die Daten mit Pydantic zu validieren, zu dokumentieren und zu serialisieren.

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Tipp

Da Pydantic es auf der Rust-Seite serialisiert, erhalten Sie eine deutlich höhere Leistung, als wenn Sie keinen Rückgabetyp deklarieren.

Nicht-async-Pfadoperation-Funktionen

Sie können auch normale def-Funktionen (ohne async) verwenden und yield genauso einsetzen.

FastAPI stellt sicher, dass sie korrekt ausgeführt wird, sodass sie die Event Loop nicht blockiert.

Da die Funktion in diesem Fall nicht async ist, wäre der passende Rückgabetyp Iterable[Item]:

# Code above omitted 👆

@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Kein Rückgabetyp

Sie können den Rückgabetyp auch weglassen. FastAPI verwendet dann den jsonable_encoder, um die Daten zu konvertieren und zu senden.

# Code above omitted 👆

@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

ServerSentEvent

Wenn Sie SSE-Felder wie event, id, retry oder comment setzen müssen, können Sie statt reiner Daten ServerSentEvent-Objekte yielden.

Importieren Sie ServerSentEvent aus fastapi.sse:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
    yield ServerSentEvent(comment="stream of item updates")
    for i, item in enumerate(items):
        yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)

Das Feld data wird immer als JSON kodiert. Sie können jeden Wert übergeben, der als JSON serialisierbar ist, einschließlich Pydantic-Modellen.

Rohdaten

Wenn Sie Daten ohne JSON-Kodierung senden müssen, verwenden Sie raw_data statt data.

Das ist nützlich zum Senden vorformatierter Texte, Logzeilen oder spezieller „Sentinel“-Werte wie [DONE].

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()


@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
    logs = [
        "2025-01-01 INFO  Application started",
        "2025-01-01 DEBUG Connected to database",
        "2025-01-01 WARN  High memory usage detected",
    ]
    for log_line in logs:
        yield ServerSentEvent(raw_data=log_line)

Hinweis

data und raw_data schließen sich gegenseitig aus. Sie können pro ServerSentEvent nur eines von beiden setzen.

Mit Last-Event-ID fortsetzen

Wenn ein Browser nach einem Verbindungsabbruch erneut verbindet, sendet er die zuletzt empfangene id im Header Last-Event-ID.

Sie können ihn als Header-Parameter einlesen und verwenden, um den Stream dort fortzusetzen, wo der Client aufgehört hat:

from collections.abc import AsyncIterable
from typing import Annotated

from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
    last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
    start = last_event_id + 1 if last_event_id is not None else 0
    for i, item in enumerate(items):
        if i < start:
            continue
        yield ServerSentEvent(data=item, id=str(i))

SSE mit POST

SSE funktioniert mit jedem HTTP-Method, nicht nur mit GET.

Das ist nützlich für Protokolle wie MCP, die SSE über POST streamen:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Prompt(BaseModel):
    text: str


@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
    words = prompt.text.split()
    for word in words:
        yield ServerSentEvent(data=word, event="token")
    yield ServerSentEvent(raw_data="[DONE]", event="done")

Technische Details

FastAPI implementiert einige bewährte SSE-Praktiken direkt out of the box.

  • Alle 15 Sekunden, wenn keine Nachricht gesendet wurde, einen „keep alive“-ping-Kommentar senden, um zu verhindern, dass einige Proxys die Verbindung schließen, wie in der HTML-Spezifikation: Server-Sent Events vorgeschlagen.
  • Den Header Cache-Control: no-cache setzen, um Caching des Streams zu verhindern.
  • Einen speziellen Header X-Accel-Buffering: no setzen, um Buffering in einigen Proxys wie Nginx zu verhindern.

Sie müssen dafür nichts tun, das funktioniert out of the box. 🤓