JSON Lines streamen¶
🌐 Übersetzung durch KI und Menschen
Diese Übersetzung wurde von KI erstellt, angeleitet von Menschen. 🤝
Sie könnte Fehler enthalten, etwa Missverständnisse des ursprünglichen Sinns oder unnatürliche Formulierungen, usw. 🤖
Sie können diese Übersetzung verbessern, indem Sie uns helfen, die KI-LLM besser anzuleiten.
Sie könnten eine Folge von Daten haben, die Sie in einem „Stream“ senden möchten, das können Sie mit JSON Lines tun.
Info
Hinzugefügt in FastAPI 0.134.0.
Was ist ein Stream?¶
„Streaming“ von Daten bedeutet, dass Ihre App damit beginnt, Datenelemente an den Client zu senden, ohne darauf zu warten, dass die gesamte Folge von Elementen fertig ist.
Sie sendet also das erste Element, der Client empfängt es und beginnt mit der Verarbeitung, und Sie erzeugen währenddessen möglicherweise bereits das nächste Element.
sequenceDiagram
participant App
participant Client
App->>App: Produce Item 1
App->>Client: Send Item 1
App->>App: Produce Item 2
Client->>Client: Process Item 1
App->>Client: Send Item 2
App->>App: Produce Item 3
Client->>Client: Process Item 2
App->>Client: Send Item 3
Client->>Client: Process Item 3
Note over App: Keeps producing...
Note over Client: Keeps consuming...
Es könnte sogar ein unendlicher Stream sein, bei dem Sie kontinuierlich Daten senden.
JSON Lines¶
In diesen Fällen ist es üblich, „JSON Lines“ zu senden, das ist ein Format, bei dem Sie pro Zeile genau ein JSON-Objekt senden.
Eine Response hätte einen Content-Type von application/jsonl (anstelle von application/json) und der Body sähe etwa so aus:
{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}
Es ist einem JSON-Array (entspricht einer Python-Liste) sehr ähnlich, aber anstatt in [] eingeschlossen zu sein und , zwischen den Elementen zu haben, gibt es hier ein JSON-Objekt pro Zeile, sie sind durch ein Zeilenumbruchzeichen getrennt.
Info
Der wichtige Punkt ist, dass Ihre App in der Lage ist, jede Zeile der Reihe nach zu erzeugen, während der Client die vorherigen Zeilen konsumiert.
Technische Details
Da jedes JSON-Objekt durch einen Zeilenumbruch getrennt wird, können sie keine wörtlichen Zeilenumbrüche in ihrem Inhalt enthalten, aber sie können escapte Zeilenumbrüche (\n) enthalten, das ist Teil des JSON-Standards.
Normalerweise müssen Sie sich darum aber nicht kümmern, das geschieht automatisch, lesen Sie weiter. 🤓
Anwendungsfälle¶
Sie könnten dies verwenden, um Daten von einem AI LLM-Service, aus Logs oder Telemetrie, oder aus anderen Typen von Daten zu streamen, die sich in JSON-Items strukturieren lassen.
Tipp
Wenn Sie Binärdaten streamen möchten, zum Beispiel Video oder Audio, sehen Sie sich den erweiterten Leitfaden an: Daten streamen.
JSON Lines mit FastAPI streamen¶
Um JSON Lines mit FastAPI zu streamen, können Sie anstelle von return in Ihrer Pfadoperation-Funktion yield verwenden, um jedes Element der Reihe nach zu erzeugen.
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Wenn jedes JSON-Item, das Sie zurücksenden möchten, vom Typ Item ist (ein Pydantic-Modell) und es sich um eine async-Funktion handelt, können Sie den Rückgabetyp als AsyncIterable[Item] deklarieren:
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Wenn Sie den Rückgabetyp deklarieren, wird FastAPI ihn verwenden, um die Daten zu validieren, sie in OpenAPI zu dokumentieren, sie zu filtern und sie mit Pydantic zu serialisieren.
Tipp
Da Pydantic es auf der Rust-Seite serialisiert, erhalten Sie eine deutlich höhere Leistung als wenn Sie keinen Rückgabetyp deklarieren.
Nicht-async Pfadoperation-Funktionen¶
Sie können auch normale def-Funktionen (ohne async) verwenden und yield auf die gleiche Weise einsetzen.
FastAPI stellt sicher, dass sie korrekt ausgeführt werden, sodass der Event Loop nicht blockiert wird.
Da die Funktion in diesem Fall nicht async ist, wäre der richtige Rückgabetyp Iterable[Item]:
# Code above omitted 👆
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Kein Rückgabetyp¶
Sie können den Rückgabetyp auch weglassen. FastAPI verwendet dann den jsonable_encoder, um die Daten in etwas zu konvertieren, das zu JSON serialisiert werden kann, und sendet sie anschließend als JSON Lines.
# Code above omitted 👆
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Server-Sent Events (SSE)¶
FastAPI hat außerdem erstklassige Unterstützung für Server-Sent Events (SSE), die dem sehr ähnlich sind, aber ein paar zusätzliche Details mitbringen. Sie können im nächsten Kapitel mehr darüber lernen: Server-Sent Events (SSE). 🤓