Connection Management
Once you have more than a handful of WebSocket clients, the question stops being "how do I send a message?" and becomes "how do I keep track of who's even on the line?" This page is about the bookkeeping - the data structures, the cleanup, the failure modes. None of it is glamorous, all of it matters.
What "managing" actually means
A WebSocket connection has a life cycle, and there's a handful of moments in it where something must happen:
accept ─── add to registry, announce join
receive (loop) ─── route message, refresh activity timestamp
ping/pong ─── prove the connection is still alive
disconnect ─── remove from registry, announce leave, free resourcesDrop any one of those and the symptoms are familiar: ghost users in the online list, messages sent into the void, memory creeping up over days.
A better registry
The toy registry from the previous page indexed by username. Real apps usually want to be richer than that - a user might have multiple connections (laptop + phone), and you want both to receive the same message.
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from fastapi import WebSocket
@dataclass
class Connection:
websocket: WebSocket
user_id: str
connected_at: datetime = field(default_factory=datetime.utcnow)
last_seen: datetime = field(default_factory=datetime.utcnow)
metadata: dict = field(default_factory=dict)
class ConnectionManager:
def __init__(self):
# one user -> many connections
self._by_user: dict[str, set[Connection]] = defaultdict(set)
# one ws -> its Connection (for fast cleanup on disconnect)
self._by_ws: dict[WebSocket, Connection] = {}
async def add(self, user_id: str, ws: WebSocket, metadata: dict | None = None) -> Connection:
conn = Connection(websocket=ws, user_id=user_id, metadata=metadata or {})
self._by_user[user_id].add(conn)
self._by_ws[ws] = conn
return conn
def remove(self, ws: WebSocket) -> Connection | None:
conn = self._by_ws.pop(ws, None)
if conn is None:
return None
bucket = self._by_user.get(conn.user_id)
if bucket:
bucket.discard(conn)
if not bucket:
self._by_user.pop(conn.user_id, None)
return conn
def for_user(self, user_id: str) -> set[Connection]:
return self._by_user.get(user_id, set())
@property
def all_connections(self) -> list[Connection]:
return list(self._by_ws.values())
@property
def online_users(self) -> list[str]:
return list(self._by_user.keys())
manager = ConnectionManager()Two dictionaries instead of one. The _by_ws index is the thing that makes disconnect cleanup O(1) instead of "scan everyone."
Using the manager from the endpoint
@app.websocket("/ws")
async def endpoint(websocket: WebSocket, user = Depends(get_ws_user)):
if user is None:
return
await websocket.accept()
conn = await manager.add(user.id, websocket, metadata={"agent": websocket.headers.get("user-agent", "")})
try:
while True:
data = await websocket.receive_json()
conn.last_seen = datetime.utcnow()
await route_message(conn, data)
except WebSocketDisconnect:
pass
finally:
manager.remove(websocket)The finally block is non-negotiable. If a handler bug raises anything, the entry must still come out of the registry. Otherwise the manager fills up with dead entries that the rest of the app will cheerfully try to send to.
Pings: the only way to know a connection is really alive
WebSockets are TCP, and TCP doesn't tell you when the wire silently dies. A laptop closing its lid, a phone losing signal, a corporate firewall dropping idle connections - all of these leave you with a socket that looks connected but will never deliver another byte.
The fix is application-level pings:
import asyncio
PING_INTERVAL = 25 # seconds
PING_TIMEOUT = 10
async def ping_loop(websocket: WebSocket):
while True:
await asyncio.sleep(PING_INTERVAL)
try:
await asyncio.wait_for(
websocket.send_json({"type": "ping"}),
timeout=PING_TIMEOUT,
)
except (asyncio.TimeoutError, RuntimeError):
await websocket.close(code=1011)
returnThe client answers with {"type": "pong"} (the receive loop just ignores it; existence is the signal). If the send blocks or fails, the connection is dead and we close it.
Run this alongside the main receive loop:
await asyncio.gather(
receive_loop(websocket, conn),
ping_loop(websocket),
)Without a ping, you'll wake up one day to a registry full of ghosts. With a ping, you have an upper bound on how long a dead connection lingers.
Cleanup, and the half-closed state
A real-world annoyance: await websocket.send_json(...) can fail in several ways, and the exception you get is not always what you'd hope for.
| Failure mode | Typical exception |
|---|---|
| Client closed cleanly | WebSocketDisconnect raised by the next receive |
| Client vanished mid-send | RuntimeError("Cannot call ... on closed connection") |
| Network buffer full, client hung | send blocks forever (need timeout) |
| Server-initiated close in progress | RuntimeError on subsequent ops |
A robust send-with-cleanup helper:
async def safe_send(conn: Connection, payload: dict) -> bool:
try:
await asyncio.wait_for(conn.websocket.send_json(payload), timeout=5)
return True
except (WebSocketDisconnect, RuntimeError, asyncio.TimeoutError):
manager.remove(conn.websocket)
return FalseReturn value lets the caller know if the message landed. Broadcast loops use this to silently prune dead connections as they go.
Limits and back-pressure
A misbehaving client can do real damage if you don't put limits in place.
| Limit | Why |
|---|---|
| Max message size (e.g. 64 KB) | A 50 MB blob will happily allocate 50 MB on your server |
| Max messages per second per connection | Bots and runaway loops |
| Max connections per user | Prevents one account from holding 10,000 sockets open |
| Total max connections | Hard cap to protect the process |
The first one is mostly handled by uvicorn config (ws_max_size). The others are application logic - a token-bucket counter on the Connection object, checked in the receive loop:
from collections import deque
import time
class RateLimit:
def __init__(self, max_msgs: int, per_seconds: float):
self.max = max_msgs
self.window = per_seconds
self.events: deque[float] = deque()
def allow(self) -> bool:
now = time.monotonic()
while self.events and now - self.events[0] > self.window:
self.events.popleft()
if len(self.events) >= self.max:
return False
self.events.append(now)
return TrueAttach one per connection, reject (or close) when it returns False.
The "but I have multiple workers" problem
Everything on this page assumed one process. If you run uvicorn with --workers 4, you have four independent managers, each holding a quarter of your users. A broadcast on worker 1 reaches none of the connections on worker 2.
The solutions, in order of complexity:
1 worker only ──── nothing to do, simplest deployment
─────────────────────────────────────────────────────────────────
Redis pub/sub ──── each worker subscribes to a channel
broadcasts publish; subscribers fan out
─────────────────────────────────────────────────────────────────
Dedicated message bus ──── NATS, RabbitMQ, etc.
more capable, more to operate
─────────────────────────────────────────────────────────────────
Managed real-time svc ──── Ably, Pusher, AWS API Gateway WS
they handle the fan-out, you hand offThe next page covers the Redis pattern - the one most projects land on. The point for now is: keep the single-process design clean, and the multi-process upgrade becomes a small layer on top instead of a rewrite.
One last habit
When you log connections, log the count, not the contents.
log.info("ws connect", extra={"user_id": user.id, "online_count": len(manager.all_connections)})The count is the operational signal you actually want on a dashboard. The contents are an operational mistake waiting to happen the first time you forget to scrub a token from the metadata. Counts and IDs make great logs. Whole connection records do not.
How is this guide?
Last updated on
