Stream de JSON Lines¶
🌐 Tradução por IA e humanos
Esta tradução foi feita por IA orientada por humanos. 🤝
Ela pode conter erros de interpretação do significado original ou soar pouco natural, etc. 🤖
Você pode melhorar esta tradução ajudando-nos a orientar melhor o LLM de IA.
Você pode ter uma sequência de dados que deseja enviar em um "Stream"; é possível fazer isso com JSON Lines.
Informação
Adicionado no FastAPI 0.134.0.
O que é um Stream?¶
"Streaming" de dados significa que sua aplicação começará a enviar itens ao cliente sem esperar que toda a sequência esteja pronta.
Assim, ela envia o primeiro item, o cliente o recebe e começa a processá-lo, enquanto você ainda pode estar produzindo o próximo item.
sequenceDiagram
participant App
participant Client
App->>App: Produce Item 1
App->>Client: Send Item 1
App->>App: Produce Item 2
Client->>Client: Process Item 1
App->>Client: Send Item 2
App->>App: Produce Item 3
Client->>Client: Process Item 2
App->>Client: Send Item 3
Client->>Client: Process Item 3
Note over App: Keeps producing...
Note over Client: Keeps consuming...
Pode até ser um Stream infinito, em que você continua enviando dados.
JSON Lines¶
Nesses casos, é comum enviar "JSON Lines", um formato em que você envia um objeto JSON por linha.
Uma response teria um tipo de conteúdo application/jsonl (em vez de application/json) e o corpo seria algo como:
{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}
É muito semelhante a um array JSON (equivalente a uma list do Python), mas em vez de estar envolto em [] e ter , entre os itens, há um objeto JSON por linha, separados por um caractere de nova linha.
Informação
O ponto importante é que sua aplicação poderá produzir cada linha em sequência, enquanto o cliente consome as anteriores.
Detalhes Técnicos
Como cada objeto JSON será separado por uma nova linha, eles não podem conter caracteres de nova linha literais em seu conteúdo, mas podem conter novas linhas com escape (\n), o que faz parte do padrão JSON.
Mas, normalmente, você não precisará se preocupar com isso, é feito automaticamente, continue lendo. 🤓
Casos de uso¶
Você pode usar isso para transmitir dados de um serviço de IA LLM, de logs ou telemetria, ou de outros tipos de dados que possam ser estruturados em itens JSON.
Dica
Se você quiser transmitir dados binários, por exemplo vídeo ou áudio, confira o guia avançado: Stream Data.
Stream de JSON Lines com FastAPI¶
Para transmitir JSON Lines com FastAPI, em vez de usar return na sua função de operação de rota, use yield para produzir cada item em sequência.
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Se cada item JSON que você quer enviar de volta for do tipo Item (um modelo Pydantic) e a função for assíncrona, você pode declarar o tipo de retorno como AsyncIterable[Item]:
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Se você declarar o tipo de retorno, o FastAPI o usará para validar os dados, documentá-los no OpenAPI, filtrá-los e serializá-los usando o Pydantic.
Dica
Como o Pydantic fará a serialização no lado em Rust, você terá uma performance muito maior do que se não declarar um tipo de retorno.
Funções de operação de rota não assíncronas¶
Você também pode usar funções def normais (sem async) e usar yield da mesma forma.
O FastAPI garantirá que sejam executadas corretamente para não bloquear o event loop.
Como, neste caso, a função não é assíncrona, o tipo de retorno adequado seria Iterable[Item]:
# Code above omitted 👆
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Sem tipo de retorno¶
Você também pode omitir o tipo de retorno. O FastAPI então usará o jsonable_encoder para converter os dados em algo que possa ser serializado para JSON e depois enviá-los como JSON Lines.
# Code above omitted 👆
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Eventos enviados pelo servidor (SSE)¶
O FastAPI também tem suporte de primeira classe a Server-Sent Events (SSE), que são bastante semelhantes, mas com alguns detalhes extras. Você pode aprender sobre eles no próximo capítulo: Eventos enviados pelo servidor (SSE). 🤓