Server-Sent Events (SSE)¶
🌐 Yapay Zekâ ve İnsanlar Tarafından Çeviri
Bu çeviri, insanlar tarafından yönlendirilen bir yapay zekâ ile oluşturuldu. 🤝
Orijinal anlamın yanlış anlaşılması ya da kulağa doğal gelmeme gibi hatalar içerebilir. 🤖
Yapay zekâ LLM'ini daha iyi yönlendirmemize yardımcı olarak bu çeviriyi iyileştirebilirsiniz.
İstemciye veri akışını Server-Sent Events (SSE) ile sağlayabilirsiniz.
Bu, JSON Lines Akışı ile benzerdir ancak tarayıcılar tarafından yerel olarak desteklenen EventSource API'si ile text/event-stream formatını kullanır.
Bilgi
FastAPI 0.135.0'da eklendi.
Server-Sent Events Nedir?¶
SSE, HTTP üzerinden sunucudan istemciye veri akışı için bir standarttır.
Her olay, aralarında boş satırlar bulunan ve data, event, id ve retry gibi "alanlar" içeren küçük bir metin bloğudur.
Şuna benzer:
data: {"name": "Portal Gun", "price": 999.99}
data: {"name": "Plumbus", "price": 32.99}
SSE; yapay zekâ sohbet akışı, canlı bildirimler, log ve gözlemlenebilirlik (observability) gibi senaryolarda ve sunucunun istemciye güncellemeleri ittiği diğer durumlarda yaygın olarak kullanılır.
İpucu
İkili (binary) veri akışı yapmak istiyorsanız, gelişmiş kılavuza bakın: Veri Akışı.
FastAPI ile SSE Akışı¶
FastAPI ile SSE akışı yapmak için, path operation function içinde yield kullanın ve response_class=EventSourceResponse olarak ayarlayın.
EventSourceResponse'u fastapi.sse içinden içe aktarın:
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Yield edilen her öğe JSON olarak kodlanır ve bir SSE olayının data: alanında gönderilir.
Dönüş tipini AsyncIterable[Item] olarak bildirirseniz, FastAPI bunu Pydantic ile veriyi doğrulamak, belgelemek ve serileştirmek için kullanır.
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
İpucu
Pydantic serileştirmeyi Rust tarafında yapacağından, dönüş tipi bildirmediğiniz duruma göre çok daha yüksek performans elde edersiniz.
Async Olmayan Path Operation Fonksiyonları¶
Normal def fonksiyonlarını (yani async olmadan) da kullanabilir ve aynı şekilde yield kullanabilirsiniz.
FastAPI, event loop'u bloke etmeyecek şekilde doğru biçimde çalışmasını sağlar.
Bu örnekte fonksiyon async olmadığı için doğru dönüş tipi Iterable[Item] olur:
# Code above omitted 👆
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
Dönüş Tipi Olmadan¶
Dönüş tipini belirtmeyebilirsiniz. FastAPI, veriyi dönüştürmek ve göndermek için jsonable_encoder kullanır.
# Code above omitted 👆
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None
items = [
Item(name="Plumbus", description="A multi-purpose household device."),
Item(name="Portal Gun", description="A portal opening device."),
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
for item in items:
yield item
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
for item in items:
yield item
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
for item in items:
yield item
ServerSentEvent¶
event, id, retry veya comment gibi SSE alanlarını ayarlamanız gerekirse, düz veri yerine ServerSentEvent nesneleri yield edebilirsiniz.
ServerSentEvent'i fastapi.sse içinden içe aktarın:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
yield ServerSentEvent(comment="stream of item updates")
for i, item in enumerate(items):
yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)
data alanı her zaman JSON olarak kodlanır. Pydantic modelleri dâhil, JSON olarak serileştirilebilen herhangi bir değeri geçebilirsiniz.
Ham Veri¶
Veriyi JSON kodlaması olmadan göndermeniz gerekiyorsa, data yerine raw_data kullanın.
Bu, önceden biçimlendirilmiş metin, log satırları veya [DONE] gibi özel "işaretçi" değerleri göndermek için kullanışlıdır.
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
logs = [
"2025-01-01 INFO Application started",
"2025-01-01 DEBUG Connected to database",
"2025-01-01 WARN High memory usage detected",
]
for log_line in logs:
yield ServerSentEvent(raw_data=log_line)
Not
data ve raw_data birbirini dışlar. Her ServerSentEvent için bunlardan yalnızca birini ayarlayabilirsiniz.
Last-Event-ID ile Devam Etme¶
Bir tarayıcı bağlantı koptuktan sonra yeniden bağlandığında, son aldığı id'yi Last-Event-ID header'ında gönderir.
Bunu bir header parametresi olarak okuyup, istemcinin kaldığı yerden akışı sürdürmek için kullanabilirsiniz:
from collections.abc import AsyncIterable
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="Plumbus", price=32.99),
Item(name="Portal Gun", price=999.99),
Item(name="Meeseeks Box", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
start = last_event_id + 1 if last_event_id is not None else 0
for i, item in enumerate(items):
if i < start:
continue
yield ServerSentEvent(data=item, id=str(i))
POST ile SSE¶
SSE, sadece GET değil, tüm HTTP metodlarıyla çalışır.
Bu, SSE'yi POST üzerinden akıtan MCP gibi protokoller için kullanışlıdır:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Prompt(BaseModel):
text: str
@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
words = prompt.text.split()
for word in words:
yield ServerSentEvent(data=word, event="token")
yield ServerSentEvent(raw_data="[DONE]", event="done")
Teknik Detaylar¶
FastAPI, bazı SSE en iyi uygulamalarını kutudan çıktığı gibi uygular.
- HTML spesifikasyonu: Server-Sent Events önerisine uygun olarak, bazı proxy'lerin bağlantıyı kapatmasını önlemek için, 15 saniye boyunca hiç mesaj gelmezse "keep alive"
pingyorumu gönderir. - Akışın cache'lenmesini önlemek için
Cache-Control: no-cacheheader'ını ayarlar. - Nginx gibi bazı proxy'lerde buffering'i önlemek için özel
X-Accel-Buffering: noheader'ını ayarlar.
Bunun için ekstra bir şey yapmanız gerekmez, doğrudan çalışır. 🤓