Creating a Chat-style WebSocket Endpoint
The "echo server" from the previous page is a useful warm-up but not a useful app. Real WebSocket code starts to get interesting when there's more than one client involved and the server has to keep track of who's connected. This page builds up to that, step by step.
By the end you'll have a working chat endpoint where many clients can connect, send messages, and see each other's messages - the foundation almost every "real-time" feature in production grows out of.
Step 1: A single-user echo, but typed properly
Before adding rooms and broadcasting, let's tighten the basic loop. A handful of small changes go a long way.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, ValidationError
app = FastAPI()
class ChatMessage(BaseModel):
type: str
text: str
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
raw = await websocket.receive_json()
try:
msg = ChatMessage.model_validate(raw)
except ValidationError as e:
await websocket.send_json({"type": "error", "errors": e.errors()})
continue
await websocket.send_json({"type": "echo", "text": msg.text})
except WebSocketDisconnect:
returnWhat changed since the bare echo:
- Messages go over the wire as JSON, not raw text. Easier to evolve.
- Inputs are validated by a Pydantic model. The schema acts as a contract.
- Validation errors come back as application errors (a JSON message), not a connection close. The connection survives.
This is already enough to be a contract a frontend can build against.
Step 2: Identifying the user
A chat without users is just an echo at scale. The simplest way to attach a user to a WebSocket is a query parameter on the connection URL:
ws://localhost:8000/ws/chat?token=eyJhbGc...FastAPI handles query parameters on WebSocket endpoints the same way it does for HTTP routes:
from fastapi import WebSocket, status
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket, token: str | None = None):
if token is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
user = decode_token_or_none(token)
if user is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await websocket.accept()
# ...rest of the loop, but now we know `user`A small but important detail: rejecting the connection happens before accept(). Once accepted, even a close with a meaningful code shows up to the browser as a disconnect rather than a clean refusal. Reject early; accept once you're sure.
For real apps you'd reach for a dependency:
from fastapi import Depends
async def get_ws_user(websocket: WebSocket, token: str | None = None):
if token is None or (user := decode_token_or_none(token)) is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return None
return user
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket, user = Depends(get_ws_user)):
if user is None:
return
await websocket.accept()Dependencies on WebSocket routes work, with the caveat that you can't raise HTTPException - there's no HTTP response to attach it to.
Step 3: A tiny connection registry
For multiple clients to see each other's messages, the server needs to remember who's connected. The standard pattern is a small in-memory dictionary on the application.
class Connections:
def __init__(self):
self.active: dict[str, WebSocket] = {}
async def connect(self, username: str, websocket: WebSocket) -> None:
await websocket.accept()
self.active[username] = websocket
def disconnect(self, username: str) -> None:
self.active.pop(username, None)
async def broadcast(self, payload: dict) -> None:
dead: list[str] = []
for username, ws in self.active.items():
try:
await ws.send_json(payload)
except RuntimeError:
dead.append(username)
for username in dead:
self.disconnect(username)
connections = Connections()Two design choices to flag:
- The registry is per-process. If you run two workers, each has its own and they don't see each other's clients. We'll fix that in a later page with Redis pub/sub.
- The
broadcastmethod is defensive. Trying to send to a half-closed socket raisesRuntimeError. We collect the offenders and prune them rather than letting one bad client crash the whole broadcast.
Step 4: Putting it together
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket, user = Depends(get_ws_user)):
if user is None:
return
await connections.connect(user.username, websocket)
await connections.broadcast({"type": "join", "user": user.username})
try:
while True:
raw = await websocket.receive_json()
try:
msg = ChatMessage.model_validate(raw)
except ValidationError as e:
await websocket.send_json({"type": "error", "errors": e.errors()})
continue
await connections.broadcast({
"type": "message",
"user": user.username,
"text": msg.text,
})
except WebSocketDisconnect:
pass
finally:
connections.disconnect(user.username)
await connections.broadcast({"type": "leave", "user": user.username})That's a real chat endpoint. Open two browser tabs, connect with different tokens, and messages flow between them.
A few of the small choices in there earn their keep:
- The
try/except/finallymakes sure the registry is always cleaned up, even if the loop body raises. - "join" and "leave" events broadcast on connect and disconnect. The frontend can render an online-users list without ever polling.
- Errors stay inside the connection (one bad message doesn't drop the whole session).
Step 5: A minimal frontend to test it
You don't need a frontend framework to verify this works.
<!doctype html>
<html>
<body>
<input id="text" autofocus>
<button id="send">Send</button>
<pre id="log"></pre>
<script>
const log = (m) => document.getElementById("log").textContent += m + "\n"
const ws = new WebSocket("ws://localhost:8000/ws/chat?token=" + prompt("token"))
ws.onopen = () => log("(connected)")
ws.onclose = () => log("(disconnected)")
ws.onmessage = (e) => log(e.data)
document.getElementById("send").onclick = () => {
const text = document.getElementById("text").value
ws.send(JSON.stringify({ type: "msg", text }))
document.getElementById("text").value = ""
}
</script>
</body>
</html>Open it in two tabs, type a different token in each prompt, and you're chatting with yourself.
What's missing (on purpose)
This endpoint is a real foundation, but it has the rough edges every minimal version has:
| Missing | When you'll need it |
|---|---|
| Rooms / channels | When "everyone sees everything" stops scaling |
| Message history | When users expect to see what happened before they joined |
| Typing indicators, read receipts | When the UI demands them |
| Cross-process broadcast | When you run more than one worker |
| Rate limiting per connection | When users discover they can flood the room |
The next pages add the most important of those - connection management, broadcasting across workers, and the patterns that make real-time notifications work without turning into a swamp. The skeleton here doesn't need to grow much to support any of it.
A small habit to form early
Always send messages as objects with a "type" field, never as bare strings. The day will come when you want to send three different shapes of message on the same connection (chat, typing, presence), and a discriminated union is far easier to add to existing code than to retrofit later.
# Good
await ws.send_json({"type": "message", "text": "hi"})
await ws.send_json({"type": "typing", "user": "alice"})
# A short-term shortcut that turns into a long-term problem
await ws.send_text("hi")Pay this tax up front. It is the cheapest form of API design you'll ever do.
How is this guide?
Last updated on
