콘텐츠로 이동

JSON Lines 스트리밍

🌐 AI와 사람이 함께한 번역

이 번역은 사람의 안내를 받아 AI가 만들었습니다. 🤝

원문의 의미를 오해하거나 부자연스러워 보이는 등 오류가 있을 수 있습니다. 🤖

AI LLM을 더 잘 안내하는 데 도움을 주세요.

영문 버전

연속된 데이터를 "스트림"으로 보내고 싶다면 JSON Lines를 사용할 수 있습니다.

Info

FastAPI 0.134.0에 추가되었습니다.

스트림이란

데이터를 "스트리밍"한다는 것은 애플리케이션이 전체 항목 시퀀스가 모두 준비될 때까지 기다리지 않고 클라이언트로 데이터 항목을 보내기 시작한다는 뜻입니다.

즉, 첫 번째 항목을 보내면 클라이언트는 그것을 받아 처리하기 시작하고, 그동안 애플리케이션은 다음 항목을 계속 생성할 수 있습니다.

sequenceDiagram
    participant App
    participant Client

    App->>App: Produce Item 1
    App->>Client: Send Item 1
    App->>App: Produce Item 2
    Client->>Client: Process Item 1
    App->>Client: Send Item 2
    App->>App: Produce Item 3
    Client->>Client: Process Item 2
    App->>Client: Send Item 3
    Client->>Client: Process Item 3
    Note over App: Keeps producing...
    Note over Client: Keeps consuming...

데이터를 계속 보내는 무한 스트림일 수도 있습니다.

JSON Lines

이런 경우에는 한 줄에 하나의 JSON 객체를 보내는 형식인 "JSON Lines"를 사용하는 것이 일반적입니다.

응답의 콘텐츠 타입은 application/json 대신 application/jsonl이고, 본문은 다음과 같습니다:

{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}

JSON 배열(Python의 list에 해당)과 매우 비슷하지만, 항목들을 []로 감싸고 항목 사이에 ,를 넣는 대신, 줄마다 하나의 JSON 객체가 있고, 새 줄 문자로 구분됩니다.

Info

핵심은 애플리케이션이 각 줄을 차례로 생성하는 동안, 클라이언트는 이전 줄을 소비할 수 있다는 점입니다.

기술 세부사항

각 JSON 객체는 새 줄로 구분되므로, 내용에 실제 줄바꿈 문자를 포함할 수는 없습니다. 하지만 JSON 표준의 일부인 이스케이프된 줄바꿈(\n)은 포함할 수 있습니다.

보통은 신경 쓸 필요가 없습니다. 자동으로 처리되니 계속 읽어 주세요. 🤓

사용 예

이 방법을 사용해 AI LLM 서비스, 로그 또는 telemetry에서 오는 데이터, 혹은 JSON 항목으로 구조화할 수 있는 다른 유형의 데이터를 스트리밍할 수 있습니다.

Tip

비디오나 오디오처럼 바이너리 데이터를 스트리밍하려면 고급 가이드를 확인하세요: 스트림 데이터.

FastAPI로 JSON Lines 스트리밍

FastAPI에서 JSON Lines를 스트리밍하려면, 경로 처리 함수에서 return을 사용하는 대신 yield로 각 항목을 차례로 생성하면 됩니다.

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

보내려는 각 JSON 항목의 타입이 Item(Pydantic 모델)이고 함수가 async라면, 반환 타입을 AsyncIterable[Item]로 선언할 수 있습니다:

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

반환 타입을 선언하면 FastAPI가 이를 사용해 데이터를 검증하고, OpenAPI에 문서화하고, 필터링하고, Pydantic으로 직렬화합니다.

Tip

Pydantic이 Rust 측에서 직렬화하므로, 반환 타입을 선언하지 않았을 때보다 훨씬 높은 성능을 얻게 됩니다.

비동기 아님 경로 처리 함수

일반 def 함수(async 없이)도 사용할 수 있으며, 동일하게 yield를 사용할 수 있습니다.

FastAPI가 이벤트 루프를 막지 않도록 올바르게 실행되게 보장합니다.

이 경우 함수가 async가 아니므로, 올바른 반환 타입은 Iterable[Item]입니다:

# Code above omitted 👆

@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

반환 타입 생략

반환 타입을 생략할 수도 있습니다. 그러면 FastAPI가 jsonable_encoder를 사용해 데이터를 JSON으로 직렬화 가능한 형태로 변환한 뒤 JSON Lines로 전송합니다.

# Code above omitted 👆

@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

서버 전송 이벤트(SSE)

FastAPI는 Server-Sent Events(SSE)도 일급으로 지원합니다. 매우 비슷하지만 몇 가지 추가 세부사항이 있습니다. 다음 장에서 자세히 알아보세요: Server-Sent Events (SSE). 🤓