Saltar a contenido

Transmitir JSON Lines

🌐 Traducción por IA y humanos

Esta traducción fue hecha por IA guiada por humanos. 🤝

Podría tener errores al interpretar el significado original, o sonar poco natural, etc. 🤖

Puedes mejorar esta traducción ayudándonos a guiar mejor al LLM de IA.

Versión en inglés

Podrías tener una secuencia de datos que quieras enviar en un "stream", podrías hacerlo con JSON Lines.

Información

Añadido en FastAPI 0.134.0.

¿Qué es un Stream?

Hacer "Streaming" de datos significa que tu app empezará a enviar ítems de datos al cliente sin esperar a que toda la secuencia de ítems esté lista.

Entonces, enviará el primer ítem, el cliente lo recibirá y empezará a procesarlo, y tú podrías seguir produciendo el siguiente ítem.

sequenceDiagram
    participant App
    participant Client

    App->>App: Produce Item 1
    App->>Client: Send Item 1
    App->>App: Produce Item 2
    Client->>Client: Process Item 1
    App->>Client: Send Item 2
    App->>App: Produce Item 3
    Client->>Client: Process Item 2
    App->>Client: Send Item 3
    Client->>Client: Process Item 3
    Note over App: Keeps producing...
    Note over Client: Keeps consuming...

Incluso podría ser un stream infinito, donde sigues enviando datos.

JSON Lines

En estos casos, es común enviar "JSON Lines", que es un formato donde envías un objeto JSON por línea.

Una response tendría un tipo de contenido application/jsonl (en lugar de application/json) y el response body sería algo como:

{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}

Es muy similar a un array JSON (equivalente de una list de Python), pero en lugar de estar envuelto en [] y tener , entre los ítems, tiene un objeto JSON por línea, separados por un carácter de nueva línea.

Información

El punto importante es que tu app podrá producir cada línea a su turno, mientras el cliente consume las líneas anteriores.

Detalles técnicos

Como cada objeto JSON estará separado por una nueva línea, no pueden contener caracteres de nueva línea literales en su contenido, pero sí pueden contener nuevas líneas escapadas (\n), lo cual es parte del estándar JSON.

Pero normalmente no tendrás que preocuparte por eso, se hace automáticamente, sigue leyendo. 🤓

Casos de uso

Podrías usar esto para hacer stream de datos desde un servicio de AI LLM, desde logs o telemetry, o desde otros tipos de datos que puedan estructurarse en ítems JSON.

Consejo

Si quieres hacer stream de datos binarios, por ejemplo video o audio, Revisa la guía avanzada: Transmitir datos.

Transmitir JSON Lines con FastAPI

Para transmitir JSON Lines con FastAPI puedes, en lugar de usar return en tu path operation function, usar yield para producir cada ítem a su turno.

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Si cada ítem JSON que quieres enviar de vuelta es de tipo Item (un modelo de Pydantic) y es una función async, puedes declarar el tipo de retorno como AsyncIterable[Item]:

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Si declaras el tipo de retorno, FastAPI lo usará para validar los datos, documentarlos en OpenAPI, filtrarlos y serializarlos usando Pydantic.

Consejo

Como Pydantic lo serializará en el lado de Rust, obtendrás un rendimiento mucho mayor que si no declaras un tipo de retorno.

path operation functions no-async

También puedes usar funciones def regulares (sin async), y usar yield de la misma forma.

FastAPI se asegurará de que se ejecute correctamente para que no bloquee el event loop.

Como en este caso la función no es async, el tipo de retorno correcto sería Iterable[Item]:

# Code above omitted 👆

@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Sin tipo de retorno

También puedes omitir el tipo de retorno. Entonces FastAPI usará jsonable_encoder para convertir los datos a algo que se pueda serializar a JSON y luego enviarlo como JSON Lines.

# Code above omitted 👆

@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Server-Sent Events (SSE)

FastAPI también tiene soporte de primera clase para Server-Sent Events (SSE), que son bastante similares pero con un par de detalles extra. Puedes aprender sobre ellos en el siguiente capítulo: Eventos enviados por el servidor (SSE). 🤓