WebSocket Reconnection Best Practices for Quantitative Trading Data Streams
WebSocket Reconnection Best Practices for Quantitative Trading Data Streams
A WebSocket drop during a live trading session can mean missed signals, stale data, and bad fills. Here's how to build a connection manager that handles every failure mode.
The Problem
WebSocket connections die for many reasons:
- Exchange maintenance windows
- Network flaps
- Idle timeouts (some exchanges close after 24h)
- Server-side rate limiting
- TCP RST from intermediate proxies
A production trading system needs to handle all of these automatically.
The Reconnection Manager
import asyncio
import websockets
import json
import time
from enum import Enum
from typing import Callable, Optional, List
class ConnectionState(Enum):
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
RECONNECTING = "reconnecting"
class WebSocketManager:
def __init__(
self,
url: str,
on_message: Callable,
subscriptions: List[dict] = None,
max_retries: int = 50,
base_delay: float = 1.0,
max_delay: float = 60.0,
heartbeat_interval: float = 30.0,
):
self.url = url
self.on_message = on_message
self.subscriptions = subscriptions or []
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.heartbeat_interval = heartbeat_interval
self.state = ConnectionState.DISCONNECTED
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.retry_count = 0
self.last_message_time = 0
self._running = False
async def connect(self):
"""Main connection loop with automatic reconnection."""
self._running = True
while self._running:
try:
self.state = ConnectionState.CONNECTING
async with websockets.connect(
self.url,
ping_interval=20,
ping_timeout=10,
close_timeout=5,
) as ws:
self.ws = ws
self.state = ConnectionState.CONNECTED
self.retry_count = 0
print(f"Connected to {self.url}")
# Resubscribe to channels
await self._resubscribe()
# Start heartbeat monitor
heartbeat_task = asyncio.create_task(
self._heartbeat_monitor()
)
try:
async for message in ws:
self.last_message_time = time.time()
data = json.loads(message)
await self.on_message(data)
finally:
heartbeat_task.cancel()
except (
websockets.ConnectionClosed,
websockets.ConnectionClosedError,
ConnectionRefusedError,
OSError,
) as e:
self.state = ConnectionState.RECONNECTING
delay = self._get_backoff_delay()
print(
f"Connection lost: {e}. "
f"Reconnecting in {delay:.1f}s "
f"(attempt {self.retry_count + 1})"
)
if self.retry_count >= self.max_retries:
print("Max retries exceeded. Giving up.")
break
await asyncio.sleep(delay)
self.retry_count += 1
def _get_backoff_delay(self) -> float:
"""Exponential backoff with jitter."""
import random
delay = min(
self.base_delay * (2 ** self.retry_count),
self.max_delay
)
# Add 0-25% jitter to prevent thundering herd
jitter = delay * random.uniform(0, 0.25)
return delay + jitter
async def _resubscribe(self):
"""Replay all subscriptions after reconnect."""
for sub in self.subscriptions:
try:
await self.ws.send(json.dumps(sub))
print(f"Resubscribed: {sub.get('channel', sub)}")
except Exception as e:
print(f"Resubscribe failed: {e}")
async def _heartbeat_monitor(self):
"""Detect stale connections."""
while True:
await asyncio.sleep(self.heartbeat_interval)
if self.last_message_time == 0:
continue
silence = time.time() - self.last_message_time
if silence > self.heartbeat_interval * 2:
print(
f"No messages for {silence:.0f}s. "
"Forcing reconnect."
)
if self.ws:
await self.ws.close()
break
async def stop(self):
self._running = False
if self.ws:
await self.ws.close()
Usage with Binance
async def handle_message(data: dict):
if 'e' in data and data['e'] == 'trade':
print(f"{data['s']} {data['p']} x{data['q']}")
async def main():
manager = WebSocketManager(
url="wss://stream.binance.com:9443/ws",
on_message=handle_message,
subscriptions=[
{
"method": "SUBSCRIBE",
"params": [
"btcusdt@trade",
"ethusdt@trade",
],
"id": 1
}
],
heartbeat_interval=30.0,
)
await manager.connect()
# asyncio.run(main())
Checklist for Production
- Exponential backoff with jitter (prevent thundering herd)
- Heartbeat monitoring (detect silent disconnects)
- Automatic resubscription after reconnect
- Max retry limit with alerting
- Message sequence tracking to detect gaps
- Buffer queuing during reconnection window
This reconnection pattern is battle-tested in the ClawDUX real-time data pipeline, where strategy verification requires uninterrupted data feeds across multiple exchanges.
The core logic discussed in this article has been integrated into the ClawDUX API. Access ClawDUX-core for full permissions, or browse the marketplace to discover verified trading strategies.