WebSockets
After mastering Server-Sent Events for one-way communication, Batman realized he needed something more powerful. When Commissioner Gordon wanted to chat with him in real-time during crisis situations, Batman needed bidirectional communication.
"SSE is great for pushing updates to my dashboard," Batman thought, "but I need two-way communication for coordinating with my allies!"
To handle real-time bidirectional communication, Batman learned how to work with WebSockets using Robyn's modern decorator-based API:
Request
from robyn import Robyn
app = Robyn(__file__)
@app.websocket("/web_socket")
async def websocket_endpoint(websocket):
await websocket.accept()
try:
while True:
message = await websocket.receive_text()
websocket_id = websocket.id
await websocket.send_text(f"Echo from {websocket_id}: {message}")
except Exception as e:
print(f"WebSocket {websocket.id} error: {e}")
@websocket_endpoint.on_connect
async def on_connect(websocket):
await websocket.send_text("Connected!")
return "Connection established"
@websocket_endpoint.on_close
async def on_close(websocket):
print(f"WebSocket {websocket.id} disconnected")
return "Disconnected"
WebSocket Methods
The modern WebSocket API provides clean, intuitive methods for handling real-time communication:
websocket.id- Get the unique connection IDwebsocket.send_text(data)- Send text messageswebsocket.send_json(data)- Send JSON messageswebsocket.receive_text()- Receive text messageswebsocket.receive_json()- Receive JSON messageswebsocket.broadcast(data)- Broadcast to all connected clientswebsocket.close()- Close the connection
WebSocket Methods
@app.websocket("/chat")
async def chat_handler(websocket):
await websocket.accept()
while True:
message = await websocket.receive_text()
websocket_id = websocket.id
# Send to this client
await websocket.send_text(f"You said: {message}")
# Broadcast to all clients
await websocket.broadcast(f"User {websocket_id}: {message}")
Dependency Injection
WebSockets support the same dependency injection system as HTTP routes, using global_dependencies and router_dependencies parameters:
Dependency Injection
from robyn import Robyn
import logging
app = Robyn(__file__)
# Configure dependencies
app.inject_global(
logger=logging.getLogger(__name__),
database=DatabaseConnection(),
metrics=MetricsCollector()
)
app.inject(
cache=RedisCache(),
auth_service=JWTAuthService()
)
Broadcasting Messages
Batman learned to send messages to all connected clients using the clean broadcast() method:
Broadcasting
@app.websocket("/notifications")
async def notification_handler(websocket):
await websocket.accept()
while True:
message = await websocket.receive_text()
# Broadcast to all connected clients
await websocket.broadcast(f"Notification: {message}")
# Confirm to sender
await websocket.send_text("Notification sent!")
Query Parameters and Headers
WebSockets can access query parameters and headers for authentication and configuration:
Query Params
@app.websocket("/secure_chat")
async def secure_chat(websocket):
# Access query parameters
token = websocket.query_params.get("token")
room_id = websocket.query_params.get("room")
if not token or not authenticate_token(token):
await websocket.close()
return
await websocket.accept()
await websocket.send_text(f"Joined room: {room_id}")
while True:
message = await websocket.receive_text()
# Broadcast only to users in the same room
await websocket.broadcast(f"Room {room_id} - {websocket.id}: {message}")
Connection Management
Batman learned to programmatically manage WebSocket connections with the close() method. This is useful for enforcing business rules, handling authentication failures, or managing resource limits:
Connection Management
@app.websocket("/admin_panel")
async def admin_handler(websocket):
await websocket.accept()
while True:
command = await websocket.receive_text()
if command == "shutdown":
await websocket.send_text("Shutting down connection...")
await websocket.close()
break
elif command.startswith("kick_user"):
user_id = command.split(":")[1]
await websocket.broadcast(f"User {user_id} has been kicked")
# Close specific user's connection (implementation depends on your user tracking)
else:
await websocket.send_text(f"Command executed: {command}")
Error Handling
Robust WebSocket applications need proper error handling for network issues, client disconnections, and application errors:
Error Handling
from robyn import WebSocketDisconnect
@app.websocket("/robust_chat")
async def robust_chat(websocket):
await websocket.accept()
try:
while True:
message = await websocket.receive_text()
await websocket.send_text(f"Echo: {message}")
except WebSocketDisconnect:
print(f"Client {websocket.id} disconnected gracefully")
except Exception as e:
print(f"Unexpected error for {websocket.id}: {e}")
await websocket.close()
What's next?
With real-time communication mastered, Batman was ready to scale his application. He learned about organizing his growing codebase with views and subrouters to keep everything maintainable as the Justice League joined his mission.
