Saltar a contenido

Server-Sent Events (SSE)

🌐 Traducción por IA y humanos

Esta traducción fue hecha por IA guiada por humanos. 🤝

Podría tener errores al interpretar el significado original, o sonar poco natural, etc. 🤖

Puedes mejorar esta traducción ayudándonos a guiar mejor al LLM de IA.

Versión en inglés

Puedes enviar datos en streaming al cliente usando Server-Sent Events (SSE).

Esto es similar a Stream JSON Lines, pero usa el formato text/event-stream, que los navegadores soportan de forma nativa con la EventSource API.

Información

Añadido en FastAPI 0.135.0.

¿Qué son los Server-Sent Events?

SSE es un estándar para hacer streaming de datos desde el servidor al cliente sobre HTTP.

Cada evento es un pequeño bloque de texto con “campos” como data, event, id y retry, separados por líneas en blanco.

Se ve así:

data: {"name": "Portal Gun", "price": 999.99}

data: {"name": "Plumbus", "price": 32.99}

SSE se usa comúnmente para streaming de chat de IA, notificaciones en vivo, logs y observabilidad, y otros casos donde el servidor envía actualizaciones al cliente.

Consejo

Si quieres hacer streaming de datos binarios, por ejemplo video o audio, Revisa la guía avanzada: Stream Data.

Streaming de SSE con FastAPI

Para hacer streaming de SSE con FastAPI, usa yield en tu path operation function y establece response_class=EventSourceResponse.

import EventSourceResponse de fastapi.sse:

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Cada ítem producido con yield se codifica como JSON y se envía en el campo data: de un evento SSE.

Si declaras el tipo de retorno como AsyncIterable[Item], FastAPI lo usará para validar, documentar y serializar los datos usando Pydantic.

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Consejo

Como Pydantic lo serializará en el lado de Rust, obtendrás un rendimiento mucho mayor que si no declaras un tipo de retorno.

No async path operation functions

También puedes usar funciones def normales (sin async), y usar yield de la misma manera.

FastAPI se asegurará de ejecutarlo correctamente para que no bloquee el event loop.

Como en este caso la función no es async, el tipo de retorno correcto sería Iterable[Item]:

# Code above omitted 👆

@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Sin tipo de retorno

También puedes omitir el tipo de retorno. FastAPI usará el jsonable_encoder para convertir los datos y enviarlos.

# Code above omitted 👆

@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

ServerSentEvent

Si necesitas configurar campos SSE como event, id, retry o comment, puedes hacer yield de objetos ServerSentEvent en lugar de datos simples.

import ServerSentEvent de fastapi.sse:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
    yield ServerSentEvent(comment="stream of item updates")
    for i, item in enumerate(items):
        yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)

El campo data siempre se codifica como JSON. Puedes pasar cualquier valor que pueda serializarse como JSON, incluidos modelos de Pydantic.

Datos sin procesar

Si necesitas enviar datos sin codificarlos a JSON, usa raw_data en lugar de data.

Esto es útil para enviar texto preformateado, líneas de log, o valores especiales de "centinela" como [DONE].

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()


@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
    logs = [
        "2025-01-01 INFO  Application started",
        "2025-01-01 DEBUG Connected to database",
        "2025-01-01 WARN  High memory usage detected",
    ]
    for log_line in logs:
        yield ServerSentEvent(raw_data=log_line)

Nota

data y raw_data son mutuamente excluyentes. Solo puedes establecer uno de ellos en cada ServerSentEvent.

Reanudar con Last-Event-ID

Cuando un navegador se reconecta después de una caída de la conexión, envía el último id recibido en el header Last-Event-ID.

Puedes leerlo como un parámetro de header y usarlo para reanudar el stream desde donde el cliente se quedó:

from collections.abc import AsyncIterable
from typing import Annotated

from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
    last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
    start = last_event_id + 1 if last_event_id is not None else 0
    for i, item in enumerate(items):
        if i < start:
            continue
        yield ServerSentEvent(data=item, id=str(i))

SSE con POST

SSE funciona con cualquier método HTTP, no solo con GET.

Esto es útil para protocolos como MCP que hacen streaming de SSE sobre POST:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Prompt(BaseModel):
    text: str


@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
    words = prompt.text.split()
    for word in words:
        yield ServerSentEvent(data=word, event="token")
    yield ServerSentEvent(raw_data="[DONE]", event="done")

Detalles técnicos

FastAPI implementa algunas mejores prácticas de SSE desde el primer momento.

  • Enviar un comentario de "keep alive" ping cada 15 segundos cuando no ha habido ningún mensaje, para evitar que algunos proxies cierren la conexión, como se sugiere en la Especificación HTML: Server-Sent Events.
  • Configurar el header Cache-Control: no-cache para evitar el almacenamiento en caché del stream.
  • Configurar un header especial X-Accel-Buffering: no para evitar el buffering en algunos proxies como Nginx.

No tienes que hacer nada, funciona tal cual viene. 🤓