Complete DevOps Bootcamp: Master DevOps in 12 Weeks
FastAPIWebSockets and Real-Time

Connection Management

Once you have more than a handful of WebSocket clients, the question stops being "how do I send a message?" and becomes "how do I keep track of who's even on the line?" This page is about the bookkeeping - the data structures, the cleanup, the failure modes. None of it is glamorous, all of it matters.

What "managing" actually means

A WebSocket connection has a life cycle, and there's a handful of moments in it where something must happen:

   accept            ─── add to registry, announce join
   receive (loop)    ─── route message, refresh activity timestamp
   ping/pong         ─── prove the connection is still alive
   disconnect        ─── remove from registry, announce leave, free resources

Drop any one of those and the symptoms are familiar: ghost users in the online list, messages sent into the void, memory creeping up over days.

A better registry

The toy registry from the previous page indexed by username. Real apps usually want to be richer than that - a user might have multiple connections (laptop + phone), and you want both to receive the same message.

from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from fastapi import WebSocket

@dataclass
class Connection:
    websocket: WebSocket
    user_id: str
    connected_at: datetime = field(default_factory=datetime.utcnow)
    last_seen: datetime = field(default_factory=datetime.utcnow)
    metadata: dict = field(default_factory=dict)

class ConnectionManager:
    def __init__(self):
        # one user -> many connections
        self._by_user: dict[str, set[Connection]] = defaultdict(set)
        # one ws -> its Connection (for fast cleanup on disconnect)
        self._by_ws: dict[WebSocket, Connection] = {}

    async def add(self, user_id: str, ws: WebSocket, metadata: dict | None = None) -> Connection:
        conn = Connection(websocket=ws, user_id=user_id, metadata=metadata or {})
        self._by_user[user_id].add(conn)
        self._by_ws[ws] = conn
        return conn

    def remove(self, ws: WebSocket) -> Connection | None:
        conn = self._by_ws.pop(ws, None)
        if conn is None:
            return None
        bucket = self._by_user.get(conn.user_id)
        if bucket:
            bucket.discard(conn)
            if not bucket:
                self._by_user.pop(conn.user_id, None)
        return conn

    def for_user(self, user_id: str) -> set[Connection]:
        return self._by_user.get(user_id, set())

    @property
    def all_connections(self) -> list[Connection]:
        return list(self._by_ws.values())

    @property
    def online_users(self) -> list[str]:
        return list(self._by_user.keys())

manager = ConnectionManager()

Two dictionaries instead of one. The _by_ws index is the thing that makes disconnect cleanup O(1) instead of "scan everyone."

Using the manager from the endpoint

@app.websocket("/ws")
async def endpoint(websocket: WebSocket, user = Depends(get_ws_user)):
    if user is None:
        return
    await websocket.accept()

    conn = await manager.add(user.id, websocket, metadata={"agent": websocket.headers.get("user-agent", "")})
    try:
        while True:
            data = await websocket.receive_json()
            conn.last_seen = datetime.utcnow()
            await route_message(conn, data)
    except WebSocketDisconnect:
        pass
    finally:
        manager.remove(websocket)

The finally block is non-negotiable. If a handler bug raises anything, the entry must still come out of the registry. Otherwise the manager fills up with dead entries that the rest of the app will cheerfully try to send to.

Pings: the only way to know a connection is really alive

WebSockets are TCP, and TCP doesn't tell you when the wire silently dies. A laptop closing its lid, a phone losing signal, a corporate firewall dropping idle connections - all of these leave you with a socket that looks connected but will never deliver another byte.

The fix is application-level pings:

import asyncio

PING_INTERVAL = 25  # seconds
PING_TIMEOUT = 10

async def ping_loop(websocket: WebSocket):
    while True:
        await asyncio.sleep(PING_INTERVAL)
        try:
            await asyncio.wait_for(
                websocket.send_json({"type": "ping"}),
                timeout=PING_TIMEOUT,
            )
        except (asyncio.TimeoutError, RuntimeError):
            await websocket.close(code=1011)
            return

The client answers with {"type": "pong"} (the receive loop just ignores it; existence is the signal). If the send blocks or fails, the connection is dead and we close it.

Run this alongside the main receive loop:

await asyncio.gather(
    receive_loop(websocket, conn),
    ping_loop(websocket),
)

Without a ping, you'll wake up one day to a registry full of ghosts. With a ping, you have an upper bound on how long a dead connection lingers.

Cleanup, and the half-closed state

A real-world annoyance: await websocket.send_json(...) can fail in several ways, and the exception you get is not always what you'd hope for.

Failure modeTypical exception
Client closed cleanlyWebSocketDisconnect raised by the next receive
Client vanished mid-sendRuntimeError("Cannot call ... on closed connection")
Network buffer full, client hungsend blocks forever (need timeout)
Server-initiated close in progressRuntimeError on subsequent ops

A robust send-with-cleanup helper:

async def safe_send(conn: Connection, payload: dict) -> bool:
    try:
        await asyncio.wait_for(conn.websocket.send_json(payload), timeout=5)
        return True
    except (WebSocketDisconnect, RuntimeError, asyncio.TimeoutError):
        manager.remove(conn.websocket)
        return False

Return value lets the caller know if the message landed. Broadcast loops use this to silently prune dead connections as they go.

Limits and back-pressure

A misbehaving client can do real damage if you don't put limits in place.

LimitWhy
Max message size (e.g. 64 KB)A 50 MB blob will happily allocate 50 MB on your server
Max messages per second per connectionBots and runaway loops
Max connections per userPrevents one account from holding 10,000 sockets open
Total max connectionsHard cap to protect the process

The first one is mostly handled by uvicorn config (ws_max_size). The others are application logic - a token-bucket counter on the Connection object, checked in the receive loop:

from collections import deque
import time

class RateLimit:
    def __init__(self, max_msgs: int, per_seconds: float):
        self.max = max_msgs
        self.window = per_seconds
        self.events: deque[float] = deque()

    def allow(self) -> bool:
        now = time.monotonic()
        while self.events and now - self.events[0] > self.window:
            self.events.popleft()
        if len(self.events) >= self.max:
            return False
        self.events.append(now)
        return True

Attach one per connection, reject (or close) when it returns False.

The "but I have multiple workers" problem

Everything on this page assumed one process. If you run uvicorn with --workers 4, you have four independent managers, each holding a quarter of your users. A broadcast on worker 1 reaches none of the connections on worker 2.

The solutions, in order of complexity:

   1 worker only           ──── nothing to do, simplest deployment
   ─────────────────────────────────────────────────────────────────
   Redis pub/sub           ──── each worker subscribes to a channel
                                broadcasts publish; subscribers fan out
   ─────────────────────────────────────────────────────────────────
   Dedicated message bus   ──── NATS, RabbitMQ, etc.
                                more capable, more to operate
   ─────────────────────────────────────────────────────────────────
   Managed real-time svc   ──── Ably, Pusher, AWS API Gateway WS
                                they handle the fan-out, you hand off

The next page covers the Redis pattern - the one most projects land on. The point for now is: keep the single-process design clean, and the multi-process upgrade becomes a small layer on top instead of a rewrite.

One last habit

When you log connections, log the count, not the contents.

log.info("ws connect", extra={"user_id": user.id, "online_count": len(manager.all_connections)})

The count is the operational signal you actually want on a dashboard. The contents are an operational mistake waiting to happen the first time you forget to scrub a token from the metadata. Counts and IDs make great logs. Whole connection records do not.

How is this guide?

Last updated on