Broadcasting Messages
Broadcasting is the word for "send this one thing to many connections." It sounds trivial - loop over your registry and call send_json. And it is trivial, in a single-process toy. In a real deployment it's where most of the hard questions live.
This page is half about the what (different shapes of broadcast) and half about the how (making it work when you have more than one worker).
Three flavors of broadcast
Different problems call for different scopes. The names below aren't standard - they're just useful labels:
to-all ──► every connected client, everywhere
(chat room with one channel, system announcement)
to-room ──► every connected client in a named group
(#general vs #random, "table 7" in a game)
to-user ──► every connection belonging to one user
(their phone + laptop + tablet all get the notification)Most apps end up with all three eventually. The data structure changes a little for each.
To-all, in one process
Building on the manager from the previous page:
async def broadcast(payload: dict) -> int:
sent = 0
dead: list[WebSocket] = []
for conn in manager.all_connections:
try:
await conn.websocket.send_json(payload)
sent += 1
except (WebSocketDisconnect, RuntimeError):
dead.append(conn.websocket)
for ws in dead:
manager.remove(ws)
return sentThe pattern repeats: iterate, send, collect failures, prune them after. Notice we don't prune during iteration - mutating the collection while iterating is the kind of bug that hides for weeks.
A subtle performance note
Calling await send_json(...) serially means each send waits for the previous one to finish. With 1,000 connections and 5ms per send, that's 5 seconds before the last person sees the message.
For most chat-scale apps that's fine. For anything bigger, fan out in parallel:
import asyncio
async def broadcast_parallel(payload: dict) -> None:
coros = [conn.websocket.send_json(payload) for conn in manager.all_connections]
results = await asyncio.gather(*coros, return_exceptions=True)
for conn, result in zip(manager.all_connections, results):
if isinstance(result, Exception):
manager.remove(conn.websocket)Now all 1,000 sends start at once. The bottleneck moves from latency-per-connection to your network bandwidth, which is usually a much better problem to have.
To-room: a thin layer on top
A "room" is just a named set of users. The cheapest implementation is one more dictionary:
class RoomManager:
def __init__(self):
self.rooms: dict[str, set[str]] = {} # room name -> user ids
def join(self, room: str, user_id: str) -> None:
self.rooms.setdefault(room, set()).add(user_id)
def leave(self, room: str, user_id: str) -> None:
if room in self.rooms:
self.rooms[room].discard(user_id)
if not self.rooms[room]:
del self.rooms[room]
def members(self, room: str) -> set[str]:
return self.rooms.get(room, set())
rooms = RoomManager()And the room broadcast pulls the right slice of the connection manager:
async def broadcast_to_room(room: str, payload: dict) -> None:
for user_id in rooms.members(room):
for conn in manager.for_user(user_id):
try:
await conn.websocket.send_json(payload)
except (WebSocketDisconnect, RuntimeError):
manager.remove(conn.websocket)Two layers of indirection (room → users → connections), but each layer is small and one job apiece.
To-user: the simple one
async def send_to_user(user_id: str, payload: dict) -> int:
sent = 0
for conn in list(manager.for_user(user_id)):
try:
await conn.websocket.send_json(payload)
sent += 1
except (WebSocketDisconnect, RuntimeError):
manager.remove(conn.websocket)
return sentNote the list(...) wrap. Mutating the underlying set while iterating it would crash - copying first avoids the problem.
The "but I have N workers" wall
You'll hit this fast. With four uvicorn workers:
Client A connects ────► picked up by worker 2
Client B connects ────► picked up by worker 3
Worker 2 calls broadcast(...)
Only A sees it. B never knows.Each worker has its own in-memory manager. They cannot see each other. The fix is a message bus that all workers subscribe to.
Redis pub/sub: the standard answer
Redis ships with a pub/sub channel system that's perfect for this. Each worker subscribes to a single channel; broadcasts go through the channel; every worker re-fans-out to its local connections.
┌─ worker 1 ─┐ ┌─ worker 1 ─┐
│ publishes │ ───► Redis "broadcast" ───► │ receives │ ──► its local clients
│ broadcast │ channel │ │
└────────────┘ └────────────┘
┌─ worker 2 ─┐
│ receives │ ──► its local clients
└────────────┘
┌─ worker 3 ─┐
│ receives │ ──► its local clients
└────────────┘Using redis.asyncio:
import json
import asyncio
import redis.asyncio as redis
redis_client = redis.from_url("redis://localhost:6379")
CHANNEL = "ws.broadcast"
async def publish(payload: dict) -> None:
await redis_client.publish(CHANNEL, json.dumps(payload))
async def subscribe_and_fanout():
pubsub = redis_client.pubsub()
await pubsub.subscribe(CHANNEL)
async for message in pubsub.listen():
if message["type"] != "message":
continue
payload = json.loads(message["data"])
# local fan-out - only to connections on THIS worker
for conn in manager.all_connections:
try:
await conn.websocket.send_json(payload)
except (WebSocketDisconnect, RuntimeError):
manager.remove(conn.websocket)
# at startup
@app.on_event("startup")
async def start_subscriber():
asyncio.create_task(subscribe_and_fanout())Now the rule changes: routes and WebSocket handlers never broadcast directly. They publish(...) to Redis, and the subscriber on each worker delivers to its local clients.
# instead of:
await broadcast({"type": "msg", "text": "hi"})
# always:
await publish({"type": "msg", "text": "hi"})That single change is what makes the same code work whether you run one worker or twenty.
Routing by room or user, via Redis
The publish-everywhere approach gets noisy if 90% of broadcasts are room-scoped - every worker decodes and inspects every message. A common refinement is to use multiple channels:
def channel_for_room(room: str) -> str:
return f"ws.room.{room}"
def channel_for_user(user_id: str) -> str:
return f"ws.user.{user_id}"Workers subscribe dynamically as users join rooms: when the first user in room general connects on this worker, the worker subscribes to ws.room.general. When the last one leaves, it unsubscribes. Per-user channels are subscribed on connection and unsubscribed on disconnect.
This keeps the network chatter proportional to actual interest. It's also a bit more code to maintain - start with one channel and graduate when you can measure that it's a problem.
Reliability: pub/sub is not a queue
A point worth being honest about: Redis pub/sub is best-effort. If a worker is briefly disconnected from Redis, it misses any messages published during the gap. There is no replay.
For chat messages and notifications, "best effort" is usually fine - the canonical message lives in the database; the WebSocket push is the live signal. If a user misses a push because their network blinked, they'll see the message when their client refetches history.
For events that must be delivered, you want a real queue (Kafka, NATS Jetstream, Redis Streams with consumer groups). At that point you've left "WebSockets with a sprinkle of Redis" territory and are doing real distributed messaging, which is a different kind of project.
A small sanity table
| Scope | One worker | Many workers |
|---|---|---|
| Send to one connection | ws.send_json(...) directly | Same - connection is local |
| Send to one user | Loop manager.for_user(user_id) | publish_user(user_id, ...) |
| Send to a room | Loop room members | publish_room(room, ...) |
| Send to everyone | Loop all_connections | publish(...) to broadcast channel |
| Receive from a client | Standard receive loop | Standard receive loop |
The "many workers" column is the version you should write now, even if you only run one worker today. The difference is one call site; the upside is your app survives the day you scale up.
What we have so far
Three pages in, we've got:
- A working chat endpoint with multiple clients (page 2).
- A robust connection manager with cleanup, pings, and limits (page 3).
- A broadcast layer that works across one worker or many (this page).
The next page steps back from chat specifically and looks at the broader pattern these tools enable: real-time notifications - the kind of "ping the user when something happens" feature most apps need at some point, chat or no chat.
How is this guide?
Last updated on
