Ana içeriğe geç

Server-Sent Events (SSE)

🌐 Yapay Zekâ ve İnsanlar Tarafından Çeviri

Bu çeviri, insanlar tarafından yönlendirilen bir yapay zekâ ile oluşturuldu. 🤝

Orijinal anlamın yanlış anlaşılması ya da kulağa doğal gelmeme gibi hatalar içerebilir. 🤖

Yapay zekâ LLM'ini daha iyi yönlendirmemize yardımcı olarak bu çeviriyi iyileştirebilirsiniz.

İngilizce sürüm

İstemciye veri akışını Server-Sent Events (SSE) ile sağlayabilirsiniz.

Bu, JSON Lines Akışı ile benzerdir ancak tarayıcılar tarafından yerel olarak desteklenen EventSource API'si ile text/event-stream formatını kullanır.

Bilgi

FastAPI 0.135.0'da eklendi.

Server-Sent Events Nedir?

SSE, HTTP üzerinden sunucudan istemciye veri akışı için bir standarttır.

Her olay, aralarında boş satırlar bulunan ve data, event, id ve retry gibi "alanlar" içeren küçük bir metin bloğudur.

Şuna benzer:

data: {"name": "Portal Gun", "price": 999.99}

data: {"name": "Plumbus", "price": 32.99}

SSE; yapay zekâ sohbet akışı, canlı bildirimler, log ve gözlemlenebilirlik (observability) gibi senaryolarda ve sunucunun istemciye güncellemeleri ittiği diğer durumlarda yaygın olarak kullanılır.

İpucu

İkili (binary) veri akışı yapmak istiyorsanız, gelişmiş kılavuza bakın: Veri Akışı.

FastAPI ile SSE Akışı

FastAPI ile SSE akışı yapmak için, path operation function içinde yield kullanın ve response_class=EventSourceResponse olarak ayarlayın.

EventSourceResponse'u fastapi.sse içinden içe aktarın:

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Yield edilen her öğe JSON olarak kodlanır ve bir SSE olayının data: alanında gönderilir.

Dönüş tipini AsyncIterable[Item] olarak bildirirseniz, FastAPI bunu Pydantic ile veriyi doğrulamak, belgelemek ve serileştirmek için kullanır.

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

İpucu

Pydantic serileştirmeyi Rust tarafında yapacağından, dönüş tipi bildirmediğiniz duruma göre çok daha yüksek performans elde edersiniz.

Async Olmayan Path Operation Fonksiyonları

Normal def fonksiyonlarını (yani async olmadan) da kullanabilir ve aynı şekilde yield kullanabilirsiniz.

FastAPI, event loop'u bloke etmeyecek şekilde doğru biçimde çalışmasını sağlar.

Bu örnekte fonksiyon async olmadığı için doğru dönüş tipi Iterable[Item] olur:

# Code above omitted 👆

@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

Dönüş Tipi Olmadan

Dönüş tipini belirtmeyebilirsiniz. FastAPI, veriyi dönüştürmek ve göndermek için jsonable_encoder kullanır.

# Code above omitted 👆

@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Full file preview
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async", response_class=EventSourceResponse)
def sse_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation", response_class=EventSourceResponse)
async def sse_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse)
def sse_items_no_async_no_annotation():
    for item in items:
        yield item

ServerSentEvent

event, id, retry veya comment gibi SSE alanlarını ayarlamanız gerekirse, düz veri yerine ServerSentEvent nesneleri yield edebilirsiniz.

ServerSentEvent'i fastapi.sse içinden içe aktarın:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
    yield ServerSentEvent(comment="stream of item updates")
    for i, item in enumerate(items):
        yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000)

data alanı her zaman JSON olarak kodlanır. Pydantic modelleri dâhil, JSON olarak serileştirilebilen herhangi bir değeri geçebilirsiniz.

Ham Veri

Veriyi JSON kodlaması olmadan göndermeniz gerekiyorsa, data yerine raw_data kullanın.

Bu, önceden biçimlendirilmiş metin, log satırları veya [DONE] gibi özel "işaretçi" değerleri göndermek için kullanışlıdır.

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()


@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
    logs = [
        "2025-01-01 INFO  Application started",
        "2025-01-01 DEBUG Connected to database",
        "2025-01-01 WARN  High memory usage detected",
    ]
    for log_line in logs:
        yield ServerSentEvent(raw_data=log_line)

Not

data ve raw_data birbirini dışlar. Her ServerSentEvent için bunlardan yalnızca birini ayarlayabilirsiniz.

Last-Event-ID ile Devam Etme

Bir tarayıcı bağlantı koptuktan sonra yeniden bağlandığında, son aldığı id'yi Last-Event-ID header'ında gönderir.

Bunu bir header parametresi olarak okuyup, istemcinin kaldığı yerden akışı sürdürmek için kullanabilirsiniz:

from collections.abc import AsyncIterable
from typing import Annotated

from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


items = [
    Item(name="Plumbus", price=32.99),
    Item(name="Portal Gun", price=999.99),
    Item(name="Meeseeks Box", price=49.99),
]


@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
    last_event_id: Annotated[int | None, Header()] = None,
) -> AsyncIterable[ServerSentEvent]:
    start = last_event_id + 1 if last_event_id is not None else 0
    for i, item in enumerate(items):
        if i < start:
            continue
        yield ServerSentEvent(data=item, id=str(i))

POST ile SSE

SSE, sadece GET değil, tüm HTTP metodlarıyla çalışır.

Bu, SSE'yi POST üzerinden akıtan MCP gibi protokoller için kullanışlıdır:

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()


class Prompt(BaseModel):
    text: str


@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
    words = prompt.text.split()
    for word in words:
        yield ServerSentEvent(data=word, event="token")
    yield ServerSentEvent(raw_data="[DONE]", event="done")

Teknik Detaylar

FastAPI, bazı SSE en iyi uygulamalarını kutudan çıktığı gibi uygular.

  • HTML spesifikasyonu: Server-Sent Events önerisine uygun olarak, bazı proxy'lerin bağlantıyı kapatmasını önlemek için, 15 saniye boyunca hiç mesaj gelmezse "keep alive" ping yorumu gönderir.
  • Akışın cache'lenmesini önlemek için Cache-Control: no-cache header'ını ayarlar.
  • Nginx gibi bazı proxy'lerde buffering'i önlemek için özel X-Accel-Buffering: no header'ını ayarlar.

Bunun için ekstra bir şey yapmanız gerekmez, doğrudan çalışır. 🤓