WebSockets¶
Sie können WebSockets mit FastAPI verwenden.
WebSockets
installieren¶
Zuerst müssen Sie WebSockets
installieren:
$ pip install websockets
---> 100%
WebSockets-Client¶
In Produktion¶
In Ihrem Produktionssystem haben Sie wahrscheinlich ein Frontend, das mit einem modernen Framework wie React, Vue.js oder Angular erstellt wurde.
Und um über WebSockets mit Ihrem Backend zu kommunizieren, würden Sie wahrscheinlich die Werkzeuge Ihres Frontends verwenden.
Oder Sie verfügen möglicherweise über eine native Mobile-Anwendung, die direkt in nativem Code mit Ihrem WebSocket-Backend kommuniziert.
Oder Sie haben andere Möglichkeiten, mit dem WebSocket-Endpunkt zu kommunizieren.
Für dieses Beispiel verwenden wir jedoch ein sehr einfaches HTML-Dokument mit etwas JavaScript, alles in einem langen String.
Das ist natürlich nicht optimal und man würde das nicht in der Produktion machen.
In der Produktion hätten Sie eine der oben genannten Optionen.
Aber es ist die einfachste Möglichkeit, sich auf die Serverseite von WebSockets zu konzentrieren und ein funktionierendes Beispiel zu haben:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
Einen websocket
erstellen¶
Erstellen Sie in Ihrer FastAPI-Anwendung einen websocket
:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
"Technische Details"
Sie können auch from starlette.websockets import WebSocket
verwenden.
FastAPI stellt den gleichen WebSocket
direkt zur Verfügung, als Annehmlichkeit für Sie, den Entwickler. Er kommt aber direkt von Starlette.
Nachrichten erwarten und Nachrichten senden¶
In Ihrer WebSocket-Route können Sie Nachrichten await
en und Nachrichten senden.
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
Sie können Binär-, Text- und JSON-Daten empfangen und senden.
Es ausprobieren¶
Wenn Ihre Datei main.py
heißt, führen Sie Ihre Anwendung so aus:
$ uvicorn main:app --reload
<span style="color: green;">INFO</span>: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Öffnen Sie Ihren Browser unter http://127.0.0.1:8000.
Sie sehen eine einfache Seite wie:
Sie können Nachrichten in das Eingabefeld tippen und absenden:
Und Ihre FastAPI-Anwendung mit WebSockets antwortet:
Sie können viele Nachrichten senden (und empfangen):
Und alle verwenden dieselbe WebSocket-Verbindung.
Verwendung von Depends
und anderen¶
In WebSocket-Endpunkten können Sie Folgendes aus fastapi
importieren und verwenden:
Depends
Security
Cookie
Header
Path
Query
Diese funktionieren auf die gleiche Weise wie für andere FastAPI-Endpunkte/Pfadoperationen:
from typing import Annotated
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: Annotated[str | None, Cookie()] = None,
token: Annotated[str | None, Query()] = None,
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
*,
websocket: WebSocket,
item_id: str,
q: int | None = None,
cookie_or_token: Annotated[str, Depends(get_cookie_or_token)],
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
from typing import Annotated, Union
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: Annotated[Union[str, None], Cookie()] = None,
token: Annotated[Union[str, None], Query()] = None,
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
*,
websocket: WebSocket,
item_id: str,
q: Union[int, None] = None,
cookie_or_token: Annotated[str, Depends(get_cookie_or_token)],
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
from typing import Union
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
from typing_extensions import Annotated
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: Annotated[Union[str, None], Cookie()] = None,
token: Annotated[Union[str, None], Query()] = None,
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
*,
websocket: WebSocket,
item_id: str,
q: Union[int, None] = None,
cookie_or_token: Annotated[str, Depends(get_cookie_or_token)],
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
"Tipp"
Bevorzugen Sie die Annotated
-Version, falls möglich.
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: str | None = Cookie(default=None),
token: str | None = Query(default=None),
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
websocket: WebSocket,
item_id: str,
q: int | None = None,
cookie_or_token: str = Depends(get_cookie_or_token),
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
"Tipp"
Bevorzugen Sie die Annotated
-Version, falls möglich.
from typing import Union
from fastapi import (
Cookie,
Depends,
FastAPI,
Query,
WebSocket,
WebSocketException,
status,
)
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: Union[str, None] = Cookie(default=None),
token: Union[str, None] = Query(default=None),
):
if session is None and token is None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
websocket: WebSocket,
item_id: str,
q: Union[int, None] = None,
cookie_or_token: str = Depends(get_cookie_or_token),
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(
f"Session cookie or query token value is: {cookie_or_token}"
)
if q is not None:
await websocket.send_text(f"Query parameter q is: {q}")
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
Info
Da es sich um einen WebSocket handelt, macht es keinen Sinn, eine HTTPException
auszulösen, stattdessen lösen wir eine WebSocketException
aus.
Sie können einen „Closing“-Code verwenden, aus den gültigen Codes, die in der Spezifikation definiert sind.
WebSockets mit Abhängigkeiten ausprobieren¶
Wenn Ihre Datei main.py
heißt, führen Sie Ihre Anwendung mit Folgendem aus:
$ uvicorn main:app --reload
<span style="color: green;">INFO</span>: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Öffnen Sie Ihren Browser unter http://127.0.0.1:8000.
Dort können Sie einstellen:
- Die „Item ID“, die im Pfad verwendet wird.
- Das „Token“, das als Query-Parameter verwendet wird.
"Tipp"
Beachten Sie, dass der Query-„Token“ von einer Abhängigkeit verarbeitet wird.
Damit können Sie den WebSocket verbinden und dann Nachrichten senden und empfangen:
Verbindungsabbrüche und mehreren Clients handhaben¶
Wenn eine WebSocket-Verbindung geschlossen wird, löst await websocket.receive_text()
eine WebSocketDisconnect
-Exception aus, die Sie dann wie in folgendem Beispiel abfangen und behandeln können.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>Your ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var client_id = Date.now()
document.querySelector("#ws-id").textContent = client_id;
var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
from typing import List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>Your ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var client_id = Date.now()
document.querySelector("#ws-id").textContent = client_id;
var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
Zum Ausprobieren:
- Öffnen Sie die Anwendung mit mehreren Browser-Tabs.
- Schreiben Sie Nachrichten in den Tabs.
- Schließen Sie dann einen der Tabs.
Das wird die Ausnahme WebSocketDisconnect
auslösen und alle anderen Clients erhalten eine Nachricht wie:
Client #1596980209979 left the chat
"Tipp"
Die obige Anwendung ist ein minimales und einfaches Beispiel, das zeigt, wie Nachrichten verarbeitet und an mehrere WebSocket-Verbindungen gesendet werden.
Beachten Sie jedoch, dass, da alles nur im Speicher in einer einzigen Liste verwaltet wird, es nur funktioniert, während der Prozess ausgeführt wird, und nur mit einem einzelnen Prozess.
Wenn Sie etwas benötigen, das sich leicht in FastAPI integrieren lässt, aber robuster ist und von Redis, PostgreSQL und anderen unterstützt wird, sehen Sie sich encode/broadcaster an.
Mehr Informationen¶
Weitere Informationen zu Optionen finden Sie in der Dokumentation von Starlette: