Transmitir JSON Lines¶
🌐 Traducción por IA y humanos
Esta traducción fue hecha por IA guiada por humanos. 🤝
Podría tener errores al interpretar el significado original, o sonar poco natural, etc. 🤖
Puedes mejorar esta traducción ayudándonos a guiar mejor al LLM de IA.
Podrías tener una secuencia de datos que quieras enviar en un "stream", podrías hacerlo con JSON Lines.
Información
Añadido en FastAPI 0.134.0.
¿Qué es un Stream?¶
Hacer "Streaming" de datos significa que tu app empezará a enviar ítems de datos al cliente sin esperar a que toda la secuencia de ítems esté lista.
Entonces, enviará el primer ítem, el cliente lo recibirá y empezará a procesarlo, y tú podrías seguir produciendo el siguiente ítem.
sequenceDiagram
participant App
participant Client
App->>App: Produce Item 1
App->>Client: Send Item 1
App->>App: Produce Item 2
Client->>Client: Process Item 1
App->>Client: Send Item 2
App->>App: Produce Item 3
Client->>Client: Process Item 2
App->>Client: Send Item 3
Client->>Client: Process Item 3
Note over App: Keeps producing...
Note over Client: Keeps consuming...
Incluso podría ser un stream infinito, donde sigues enviando datos.
JSON Lines¶
En estos casos, es común enviar "JSON Lines", que es un formato donde envías un objeto JSON por línea.
Una response tendría un tipo de contenido application/jsonl (en lugar de application/json) y el response body sería algo como:
{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}
Es muy similar a un array JSON (equivalente de una list de Python), pero en lugar de estar envuelto en [] y tener , entre los ítems, tiene un objeto JSON por línea, separados por un carácter de nueva línea.
Información
El punto importante es que tu app podrá producir cada línea a su turno, mientras el cliente consume las líneas anteriores.
Detalles técnicos
Como cada objeto JSON estará separado por una nueva línea, no pueden contener caracteres de nueva línea literales en su contenido, pero sí pueden contener nuevas líneas escapadas (\n), lo cual es parte del estándar JSON.
Pero normalmente no tendrás que preocuparte por eso, se hace automáticamente, sigue leyendo. 🤓
Casos de uso¶
Podrías usar esto para hacer stream de datos desde un servicio de AI LLM, desde logs o telemetry, o desde otros tipos de datos que puedan estructurarse en ítems JSON.
Consejo
Si quieres hacer stream de datos binarios, por ejemplo video o audio, Revisa la guía avanzada: Transmitir datos.
Transmitir JSON Lines con FastAPI¶
Para transmitir JSON Lines con FastAPI puedes, en lugar de usar return en tu path operation function, usar yield para producir cada ítem a su turno.
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Si cada ítem JSON que quieres enviar de vuelta es de tipo Item (un modelo de Pydantic) y es una función async, puedes declarar el tipo de retorno como AsyncIterable[Item]:
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Si declaras el tipo de retorno, FastAPI lo usará para validar los datos, documentarlos en OpenAPI, filtrarlos y serializarlos usando Pydantic.
Consejo
Como Pydantic lo serializará en el lado de Rust, obtendrás un rendimiento mucho mayor que si no declaras un tipo de retorno.
path operation functions no-async¶
También puedes usar funciones def regulares (sin async), y usar yield de la misma forma.
FastAPI se asegurará de que se ejecute correctamente para que no bloquee el event loop.
Como en este caso la función no es async, el tipo de retorno correcto sería Iterable[Item]:
# Code above omitted 👆
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Sin tipo de retorno¶
También puedes omitir el tipo de retorno. Entonces FastAPI usará jsonable_encoder para convertir los datos a algo que se pueda serializar a JSON y luego enviarlo como JSON Lines.
# Code above omitted 👆
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
Server-Sent Events (SSE)¶
FastAPI también tiene soporte de primera clase para Server-Sent Events (SSE), que son bastante similares pero con un par de detalles extra. Puedes aprender sobre ellos en el siguiente capítulo: Eventos enviados por el servidor (SSE). 🤓