Événements envoyés par le serveur (SSE)¶
🌐 Traduction par IA et humains
Cette traduction a été réalisée par une IA guidée par des humains. 🤝
Elle peut contenir des erreurs d'interprétation du sens original, ou paraître peu naturelle, etc. 🤖
Vous pouvez améliorer cette traduction en nous aidant à mieux guider le LLM d'IA.
Vous pouvez diffuser des données vers le client en utilisant les Server-Sent Events (SSE).
C'est similaire à Diffuser des JSON Lines, mais cela utilise le format text/event-stream, pris en charge nativement par les navigateurs via l’API EventSource.
Info
Ajouté dans FastAPI 0.135.0.
Que sont les Server-Sent Events ?¶
SSE est un standard pour diffuser des données du serveur au client via HTTP.
Chaque événement est un petit bloc de texte avec des « champs » comme data, event, id et retry, séparés par des lignes vides.
Cela ressemble à ceci :
data: {"name": "Portal Gun", "price": 999.99}
data: {"name": "Plumbus", "price": 32.99}
Les SSE sont couramment utilisés pour le streaming de chat IA, les notifications en direct, les journaux et l’observabilité, et d’autres cas où le serveur envoie des mises à jour au client.
Astuce
Si vous souhaitez diffuser des données binaires, par exemple de la vidéo ou de l’audio, consultez le guide avancé : Diffuser des données.
Diffuser des SSE avec FastAPI¶
Pour diffuser des SSE avec FastAPI, utilisez yield dans votre fonction de chemin d'accès et définissez response_class=EventSourceResponse.
Importez EventSourceResponse depuis fastapi.sse :
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Chaque élément produit avec yield est encodé en JSON et envoyé dans le champ data: d’un événement SSE.
Si vous déclarez le type de retour comme AsyncIterable[Item], FastAPI l’utilisera pour valider, documenter et sérialiser les données avec Pydantic.
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Astuce
Comme Pydantic le sérialisera du côté Rust, vous obtiendrez une performance bien supérieure que si vous ne déclarez pas de type de retour.
Fonctions de chemin d'accès non async¶
Vous pouvez aussi utiliser des fonctions def normales (sans async), et utiliser yield de la même façon.
FastAPI s’assure qu’elles s’exécutent correctement pour ne pas bloquer la boucle d’événements.
Dans ce cas la fonction n’est pas async, le type de retour approprié serait Iterable[Item] :
# Code above omitted 👆
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Sans type de retour¶
Vous pouvez aussi omettre le type de retour. FastAPI utilisera le jsonable_encoder pour convertir les données et les envoyer.
# Code above omitted 👆
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
ServerSentEvent¶
Si vous devez définir des champs SSE comme event, id, retry ou comment, vous pouvez produire des objets ServerSentEvent au lieu de données brutes.
Importez ServerSentEvent depuis fastapi.sse :
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
yield ServerSentEvent(comment="stream of item updates")
for i, item in enumerate(items):
yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)
Le champ data est toujours encodé en JSON. Vous pouvez passer toute valeur sérialisable en JSON, y compris des modèles Pydantic.
Données brutes¶
Si vous devez envoyer des données sans encodage JSON, utilisez raw_data au lieu de data.
C’est utile pour envoyer du texte préformaté, des lignes de log, ou des valeurs « sentinelle » spéciales comme [DONE].
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
logs = [
"2025-01-01 INFO Application started",
"2025-01-01 DEBUG Connected to database",
"2025-01-01 WARN High memory usage detected",
]
for log_line in logs:
yield ServerSentEvent(raw_data=log_line)
Remarque
data et raw_data s’excluent mutuellement. Vous ne pouvez en définir qu’un seul par ServerSentEvent.
Reprendre avec Last-Event-ID¶
Quand un navigateur se reconnecte après une coupure, il envoie le dernier id reçu dans l’en-tête Last-Event-ID.
Vous pouvez le lire comme paramètre d’en-tête et l’utiliser pour reprendre le flux là où le client s’était arrêté :
from collections.abc import AsyncIterable
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
start = last_event_id + 1 if last_event_id is not None else 0
for i, item in enumerate(items):
if i < start:
continue
yield ServerSentEvent(data=item, id=str(i))
SSE avec POST¶
SSE fonctionne avec n’importe quelle méthode HTTP, pas seulement GET.
C’est utile pour des protocoles comme MCP qui diffusent des SSE via POST :
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Prompt(BaseModel):
text: str
@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
words = prompt.text.split()
for word in words:
yield ServerSentEvent(data=word, event="token")
yield ServerSentEvent(raw_data="[DONE]", event="done")
Détails techniques¶
FastAPI met en œuvre certaines bonnes pratiques SSE prêtes à l’emploi.
- Envoyer un commentaire « keep alive »
pingtoutes les 15 secondes quand aucun message n’a été émis, pour éviter que certains proxys ne ferment la connexion, comme suggéré dans la Spécification HTML : Server-Sent Events. - Définir l’en-tête
Cache-Control: no-cachepour empêcher la mise en cache du flux. - Définir un en-tête spécial
X-Accel-Buffering: nopour empêcher le buffering dans certains proxys comme Nginx.
Vous n’avez rien à faire, cela fonctionne prêt à l’emploi. 🤓