生命週期事件¶
你可以定義在應用程式啟動之前要執行的邏輯(程式碼)。也就是說,這段程式碼會在應用開始接收請求之前、只執行一次。
同樣地,你也可以定義在應用程式關閉時要執行的邏輯(程式碼)。在這種情況下,這段程式碼會在處理了許多請求之後、只執行一次。
因為這些程式碼分別在應用開始接收請求之前與完成處理請求之後執行,所以涵蓋了整個應用的生命週期(「lifespan」這個詞稍後會很重要 😉)。
這對於為整個應用設定需要共用於多個請求的資源,以及在之後進行清理,非常有用。比如資料庫連線池、或載入一個共用的機器學習模型。
使用情境¶
先從一個使用情境開始,然後看看如何用這個機制解決。
想像你有一些要用來處理請求的機器學習模型。🤖
同一組模型會在多個請求間共用,所以不是每個請求或每個使用者各有一個模型。
再想像一下,載入模型需要一段時間,因為它必須從磁碟讀取大量資料。所以你不想在每個請求都做一次。
你可以在模組/檔案的最上層載入,但這也表示即使只是要跑一個簡單的自動化測試,也會去載入模型,導致測試變慢,因為它得等模型載入完才能執行與模型無關的程式碼部分。
我們要解決的正是這件事:在開始處理請求之前再載入模型,但只在應用程式即將開始接收請求時載入,而不是在匯入程式碼時就載入。
生命週期(Lifespan)¶
你可以使用 FastAPI 應用的 lifespan 參數,搭配「context manager」(稍後會示範),來定義這些 startup 與 shutdown 邏輯。
先看一個例子,接著再深入說明。
我們建立一個帶有 yield 的非同步函式 lifespan(),如下:
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
🤓 Other versions and variants
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
這裡我們透過在 yield 之前把(假的)模型函式放進機器學習模型的字典中,來模擬昂貴的 startup 載入模型操作。這段程式會在應用開始接收請求之前執行,也就是 startup 階段。
接著,在 yield 之後,我們卸載模型。這段程式會在應用完成處理請求之後、也就是 shutdown 前執行。這可以用來釋放資源,例如記憶體或 GPU。
Tip
shutdown 會在你停止應用程式時發生。
也許你要啟動新版本,或只是不想再跑它了。🤷
Lifespan 函式¶
首先要注意的是,我們定義了一個帶有 yield 的 async 函式。這和帶有 yield 的依賴(Dependencies)非常相似。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
🤓 Other versions and variants
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
函式在 yield 之前的部分,會在應用啟動前先執行。
yield 之後的部分,會在應用結束後再執行。
非同步內容管理器(Async Context Manager)¶
你會看到這個函式被 @asynccontextmanager 裝飾。
它會把函式轉換成所謂的「非同步內容管理器(async context manager)」。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
🤓 Other versions and variants
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
Python 中的內容管理器(context manager)可以用在 with 陳述式中,例如 open() 可以作為內容管理器使用:
with open("file.txt") as file:
file.read()
在較新的 Python 版本中,也有非同步內容管理器。你可以用 async with 來使用它:
async with lifespan(app):
await do_stuff()
當你像上面那樣建立一個內容管理器或非同步內容管理器時,在進入 with 區塊之前,會先執行 yield 之前的程式碼;離開 with 區塊之後,會執行 yield 之後的程式碼。
在我們的範例中,並不是直接用它,而是把它傳給 FastAPI 來使用。
FastAPI 應用的 lifespan 參數需要一個非同步內容管理器,所以我們可以把剛寫好的 lifespan 非同步內容管理器傳給它。
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
🤓 Other versions and variants
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
替代事件(已棄用)¶
Warning
目前建議使用上面所述,透過 FastAPI 應用的 lifespan 參數來處理 startup 與 shutdown。如果你提供了 lifespan 參數,startup 與 shutdown 事件處理器將不會被呼叫。要嘛全用 lifespan,要嘛全用事件,不能同時混用。
你大概可以直接跳過這一節。
也有另一種方式可以定義在 startup 與 shutdown 期間要執行的邏輯。
你可以定義事件處理器(函式)來在應用啟動前或關閉時執行。
這些函式可以用 async def 或一般的 def 宣告。
startup 事件¶
要加入一個在應用啟動前執行的函式,使用事件 "startup" 來宣告:
from fastapi import FastAPI
app = FastAPI()
items = {}
@app.on_event("startup")
async def startup_event():
items["foo"] = {"name": "Fighters"}
items["bar"] = {"name": "Tenders"}
@app.get("/items/{item_id}")
async def read_items(item_id: str):
return items[item_id]
🤓 Other versions and variants
from fastapi import FastAPI
app = FastAPI()
items = {}
@app.on_event("startup")
async def startup_event():
items["foo"] = {"name": "Fighters"}
items["bar"] = {"name": "Tenders"}
@app.get("/items/{item_id}")
async def read_items(item_id: str):
return items[item_id]
在這個例子中,startup 事件處理器函式會用一些值來初始化 items 的「資料庫」(其實就是個 dict)。
你可以註冊多個事件處理函式。
而且在所有 startup 事件處理器都完成之前,你的應用不會開始接收請求。
shutdown 事件¶
要加入一個在應用關閉時執行的函式,使用事件 "shutdown" 來宣告:
from fastapi import FastAPI
app = FastAPI()
@app.on_event("shutdown")
def shutdown_event():
with open("log.txt", mode="a") as log:
log.write("Application shutdown")
@app.get("/items/")
async def read_items():
return [{"name": "Foo"}]
🤓 Other versions and variants
from fastapi import FastAPI
app = FastAPI()
@app.on_event("shutdown")
def shutdown_event():
with open("log.txt", mode="a") as log:
log.write("Application shutdown")
@app.get("/items/")
async def read_items():
return [{"name": "Foo"}]
在這裡,shutdown 事件處理器函式會把一行文字 "Application shutdown" 寫入檔案 log.txt。
Info
在 open() 函式中,mode="a" 表示「append(附加)」;也就是說,這行文字會加在檔案現有內容之後,而不會覆寫先前的內容。
Tip
注意這裡我們使用的是標準 Python 的 open() 函式來操作檔案。
這涉及 I/O(輸入/輸出),也就是需要「等待」資料寫入磁碟。
但 open() 並不使用 async 與 await。
所以我們用一般的 def 來宣告事件處理器,而不是 async def。
同時使用 startup 與 shutdown¶
你的 startup 與 shutdown 邏輯很可能是相關聯的:你可能會先啟動某個東西再把它結束、先取得資源再釋放它,等等。
如果把它們拆成兩個彼此不共享邏輯或變數的獨立函式,會比較麻煩,你得把值存在全域變數或用其他技巧。
因此,現在建議改用上面介紹的 lifespan。
技術細節¶
給有興趣鑽研的同好一點技術細節。🤓
在底層的 ASGI 技術規範中,這屬於 Lifespan Protocol 的一部分,並定義了 startup 與 shutdown 兩種事件。
子應用程式¶
🚨 請記住,這些生命週期事件(startup 與 shutdown)只會在主應用程式上執行,不會在子應用程式 - 掛載上執行。