Complete DevOps Bootcamp: Master DevOps in 12 Weeks
FastAPIWebSockets and Real-Time

Broadcasting Messages

Broadcasting is the word for "send this one thing to many connections." It sounds trivial - loop over your registry and call send_json. And it is trivial, in a single-process toy. In a real deployment it's where most of the hard questions live.

This page is half about the what (different shapes of broadcast) and half about the how (making it work when you have more than one worker).

Three flavors of broadcast

Different problems call for different scopes. The names below aren't standard - they're just useful labels:

   to-all          ──► every connected client, everywhere
                       (chat room with one channel, system announcement)

   to-room         ──► every connected client in a named group
                       (#general vs #random, "table 7" in a game)

   to-user         ──► every connection belonging to one user
                       (their phone + laptop + tablet all get the notification)

Most apps end up with all three eventually. The data structure changes a little for each.

To-all, in one process

Building on the manager from the previous page:

async def broadcast(payload: dict) -> int:
    sent = 0
    dead: list[WebSocket] = []
    for conn in manager.all_connections:
        try:
            await conn.websocket.send_json(payload)
            sent += 1
        except (WebSocketDisconnect, RuntimeError):
            dead.append(conn.websocket)
    for ws in dead:
        manager.remove(ws)
    return sent

The pattern repeats: iterate, send, collect failures, prune them after. Notice we don't prune during iteration - mutating the collection while iterating is the kind of bug that hides for weeks.

A subtle performance note

Calling await send_json(...) serially means each send waits for the previous one to finish. With 1,000 connections and 5ms per send, that's 5 seconds before the last person sees the message.

For most chat-scale apps that's fine. For anything bigger, fan out in parallel:

import asyncio

async def broadcast_parallel(payload: dict) -> None:
    coros = [conn.websocket.send_json(payload) for conn in manager.all_connections]
    results = await asyncio.gather(*coros, return_exceptions=True)
    for conn, result in zip(manager.all_connections, results):
        if isinstance(result, Exception):
            manager.remove(conn.websocket)

Now all 1,000 sends start at once. The bottleneck moves from latency-per-connection to your network bandwidth, which is usually a much better problem to have.

To-room: a thin layer on top

A "room" is just a named set of users. The cheapest implementation is one more dictionary:

class RoomManager:
    def __init__(self):
        self.rooms: dict[str, set[str]] = {}   # room name -> user ids

    def join(self, room: str, user_id: str) -> None:
        self.rooms.setdefault(room, set()).add(user_id)

    def leave(self, room: str, user_id: str) -> None:
        if room in self.rooms:
            self.rooms[room].discard(user_id)
            if not self.rooms[room]:
                del self.rooms[room]

    def members(self, room: str) -> set[str]:
        return self.rooms.get(room, set())

rooms = RoomManager()

And the room broadcast pulls the right slice of the connection manager:

async def broadcast_to_room(room: str, payload: dict) -> None:
    for user_id in rooms.members(room):
        for conn in manager.for_user(user_id):
            try:
                await conn.websocket.send_json(payload)
            except (WebSocketDisconnect, RuntimeError):
                manager.remove(conn.websocket)

Two layers of indirection (room → users → connections), but each layer is small and one job apiece.

To-user: the simple one

async def send_to_user(user_id: str, payload: dict) -> int:
    sent = 0
    for conn in list(manager.for_user(user_id)):
        try:
            await conn.websocket.send_json(payload)
            sent += 1
        except (WebSocketDisconnect, RuntimeError):
            manager.remove(conn.websocket)
    return sent

Note the list(...) wrap. Mutating the underlying set while iterating it would crash - copying first avoids the problem.

The "but I have N workers" wall

You'll hit this fast. With four uvicorn workers:

   Client A connects ────► picked up by worker 2
   Client B connects ────► picked up by worker 3

   Worker 2 calls broadcast(...)
   Only A sees it. B never knows.

Each worker has its own in-memory manager. They cannot see each other. The fix is a message bus that all workers subscribe to.

Redis pub/sub: the standard answer

Redis ships with a pub/sub channel system that's perfect for this. Each worker subscribes to a single channel; broadcasts go through the channel; every worker re-fans-out to its local connections.

   ┌─ worker 1 ─┐                              ┌─ worker 1 ─┐
   │  publishes │ ───►  Redis "broadcast" ───► │  receives  │ ──► its local clients
   │  broadcast │       channel                │            │
   └────────────┘                              └────────────┘
                                               ┌─ worker 2 ─┐
                                               │  receives  │ ──► its local clients
                                               └────────────┘
                                               ┌─ worker 3 ─┐
                                               │  receives  │ ──► its local clients
                                               └────────────┘

Using redis.asyncio:

import json
import asyncio
import redis.asyncio as redis

redis_client = redis.from_url("redis://localhost:6379")

CHANNEL = "ws.broadcast"

async def publish(payload: dict) -> None:
    await redis_client.publish(CHANNEL, json.dumps(payload))

async def subscribe_and_fanout():
    pubsub = redis_client.pubsub()
    await pubsub.subscribe(CHANNEL)
    async for message in pubsub.listen():
        if message["type"] != "message":
            continue
        payload = json.loads(message["data"])
        # local fan-out - only to connections on THIS worker
        for conn in manager.all_connections:
            try:
                await conn.websocket.send_json(payload)
            except (WebSocketDisconnect, RuntimeError):
                manager.remove(conn.websocket)

# at startup
@app.on_event("startup")
async def start_subscriber():
    asyncio.create_task(subscribe_and_fanout())

Now the rule changes: routes and WebSocket handlers never broadcast directly. They publish(...) to Redis, and the subscriber on each worker delivers to its local clients.

# instead of:
await broadcast({"type": "msg", "text": "hi"})

# always:
await publish({"type": "msg", "text": "hi"})

That single change is what makes the same code work whether you run one worker or twenty.

Routing by room or user, via Redis

The publish-everywhere approach gets noisy if 90% of broadcasts are room-scoped - every worker decodes and inspects every message. A common refinement is to use multiple channels:

def channel_for_room(room: str) -> str:
    return f"ws.room.{room}"

def channel_for_user(user_id: str) -> str:
    return f"ws.user.{user_id}"

Workers subscribe dynamically as users join rooms: when the first user in room general connects on this worker, the worker subscribes to ws.room.general. When the last one leaves, it unsubscribes. Per-user channels are subscribed on connection and unsubscribed on disconnect.

This keeps the network chatter proportional to actual interest. It's also a bit more code to maintain - start with one channel and graduate when you can measure that it's a problem.

Reliability: pub/sub is not a queue

A point worth being honest about: Redis pub/sub is best-effort. If a worker is briefly disconnected from Redis, it misses any messages published during the gap. There is no replay.

For chat messages and notifications, "best effort" is usually fine - the canonical message lives in the database; the WebSocket push is the live signal. If a user misses a push because their network blinked, they'll see the message when their client refetches history.

For events that must be delivered, you want a real queue (Kafka, NATS Jetstream, Redis Streams with consumer groups). At that point you've left "WebSockets with a sprinkle of Redis" territory and are doing real distributed messaging, which is a different kind of project.

A small sanity table

ScopeOne workerMany workers
Send to one connectionws.send_json(...) directlySame - connection is local
Send to one userLoop manager.for_user(user_id)publish_user(user_id, ...)
Send to a roomLoop room memberspublish_room(room, ...)
Send to everyoneLoop all_connectionspublish(...) to broadcast channel
Receive from a clientStandard receive loopStandard receive loop

The "many workers" column is the version you should write now, even if you only run one worker today. The difference is one call site; the upside is your app survives the day you scale up.

What we have so far

Three pages in, we've got:

  • A working chat endpoint with multiple clients (page 2).
  • A robust connection manager with cleanup, pings, and limits (page 3).
  • A broadcast layer that works across one worker or many (this page).

The next page steps back from chat specifically and looks at the broader pattern these tools enable: real-time notifications - the kind of "ping the user when something happens" feature most apps need at some point, chat or no chat.

How is this guide?

Last updated on