串流 JSON Lines¶
當你有一連串資料想以「串流」方式傳送時,可以使用 JSON Lines。
Info
在 FastAPI 0.134.0 新增。
什麼是串流?¶
「Streaming」資料表示你的應用會在整個資料序列尚未完全準備好之前,就開始將資料項目傳送給用戶端。
也就是說,它會先送出第一個項目,用戶端接收並開始處理時,你的應用可能仍在產生下一個項目。
sequenceDiagram
participant App
participant Client
App->>App: Produce Item 1
App->>Client: Send Item 1
App->>App: Produce Item 2
Client->>Client: Process Item 1
App->>Client: Send Item 2
App->>App: Produce Item 3
Client->>Client: Process Item 2
App->>Client: Send Item 3
Client->>Client: Process Item 3
Note over App: Keeps producing...
Note over Client: Keeps consuming...
它甚至可以是無限串流,你可以一直持續傳送資料。
JSON Lines¶
在這些情況下,常見做法是傳送「JSON Lines」,這是一種每一行各包含一個 JSON 物件的格式。
回應的 content type 會是 application/jsonl(而不是 application/json),而本體內容會像這樣:
{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}
它和 JSON 陣列(相當於 Python 的 list)很像,但不同於用 [] 包起來並以 , 分隔項目,它是每一行各放一個 JSON 物件,彼此以換行字元分隔。
Info
重點在於你的應用能夠逐行產生資料,同時用戶端在消耗前一行的資料。
技術細節
由於每個 JSON 物件會以換行分隔,它們的內容中不能包含實際的換行字元,但可以包含跳脫後的換行(\n),這是 JSON 標準的一部分。
不過通常你不需要為此煩惱,這些都會自動處理,繼續往下看吧。🤓
使用情境¶
你可以用這種方式從 AI LLM 服務、日誌或遙測串流資料,或任何能以 JSON 項目結構化的其他型態資料。
Tip
如果你想串流二進位資料,例如影像或音訊,請參考進階指南:串流資料。
使用 FastAPI 串流 JSON Lines¶
要用 FastAPI 串流 JSON Lines,你可以在你的路徑操作函式中改用 yield 逐一產生項目,而不是用 return。
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
如果你要回傳的每個 JSON 項目型別都是 Item(一個 Pydantic 模型),而且該函式是 async,你可以將回傳型別宣告為 AsyncIterable[Item]:
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
如果你宣告了回傳型別,FastAPI 會用它來進行資料的驗證、在 OpenAPI 中文件化、過濾,並使用 Pydantic 進行序列化。
Tip
由於 Pydantic 會在 Rust 端進行序列化,宣告回傳型別可獲得比未宣告時高得多的效能。
非 async 的路徑操作函式¶
你也可以用一般的 def 函式(沒有 async),同樣用 yield。
FastAPI 會確保正確執行,不會阻塞事件迴圈。
因為這種情況下函式不是 async,正確的回傳型別會是 Iterable[Item]:
# Code above omitted 👆
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
不宣告回傳型別¶
你也可以省略回傳型別。此時 FastAPI 會使用 jsonable_encoder 將資料轉換為可序列化為 JSON 的形式,然後以 JSON Lines 傳送。
# Code above omitted 👆
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
for item in items:
yield item
伺服器推播事件(SSE)¶
FastAPI 也原生支援 Server-Sent Events(SSE),它們與此相當類似,但多了幾個細節。你可以在下一章學到更多:伺服器推播事件(SSE)。🤓