API & Python

WebSocket Reconnection Best Practices for Quantitative Trading Data Streams

ClawDUX TeamMarch 24, 20267 min read0 views

WebSocket Reconnection Best Practices for Quantitative Trading Data Streams

A WebSocket drop during a live trading session can mean missed signals, stale data, and bad fills. Here's how to build a connection manager that handles every failure mode.

The Problem

WebSocket connections die for many reasons:

  • Exchange maintenance windows
  • Network flaps
  • Idle timeouts (some exchanges close after 24h)
  • Server-side rate limiting
  • TCP RST from intermediate proxies

A production trading system needs to handle all of these automatically.

The Reconnection Manager

python
import asyncio
import websockets
import json
import time
from enum import Enum
from typing import Callable, Optional, List

class ConnectionState(Enum):
    DISCONNECTED = "disconnected"
    CONNECTING = "connecting"
    CONNECTED = "connected"
    RECONNECTING = "reconnecting"

class WebSocketManager:
    def __init__(
        self,
        url: str,
        on_message: Callable,
        subscriptions: List[dict] = None,
        max_retries: int = 50,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        heartbeat_interval: float = 30.0,
    ):
        self.url = url
        self.on_message = on_message
        self.subscriptions = subscriptions or []
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.heartbeat_interval = heartbeat_interval

        self.state = ConnectionState.DISCONNECTED
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.retry_count = 0
        self.last_message_time = 0
        self._running = False

    async def connect(self):
        """Main connection loop with automatic reconnection."""
        self._running = True
        while self._running:
            try:
                self.state = ConnectionState.CONNECTING
                async with websockets.connect(
                    self.url,
                    ping_interval=20,
                    ping_timeout=10,
                    close_timeout=5,
                ) as ws:
                    self.ws = ws
                    self.state = ConnectionState.CONNECTED
                    self.retry_count = 0
                    print(f"Connected to {self.url}")

                    # Resubscribe to channels
                    await self._resubscribe()

                    # Start heartbeat monitor
                    heartbeat_task = asyncio.create_task(
                        self._heartbeat_monitor()
                    )

                    try:
                        async for message in ws:
                            self.last_message_time = time.time()
                            data = json.loads(message)
                            await self.on_message(data)
                    finally:
                        heartbeat_task.cancel()

            except (
                websockets.ConnectionClosed,
                websockets.ConnectionClosedError,
                ConnectionRefusedError,
                OSError,
            ) as e:
                self.state = ConnectionState.RECONNECTING
                delay = self._get_backoff_delay()
                print(
                    f"Connection lost: {e}. "
                    f"Reconnecting in {delay:.1f}s "
                    f"(attempt {self.retry_count + 1})"
                )

                if self.retry_count >= self.max_retries:
                    print("Max retries exceeded. Giving up.")
                    break

                await asyncio.sleep(delay)
                self.retry_count += 1

    def _get_backoff_delay(self) -> float:
        """Exponential backoff with jitter."""
        import random
        delay = min(
            self.base_delay * (2 ** self.retry_count),
            self.max_delay
        )
        # Add 0-25% jitter to prevent thundering herd
        jitter = delay * random.uniform(0, 0.25)
        return delay + jitter

    async def _resubscribe(self):
        """Replay all subscriptions after reconnect."""
        for sub in self.subscriptions:
            try:
                await self.ws.send(json.dumps(sub))
                print(f"Resubscribed: {sub.get('channel', sub)}")
            except Exception as e:
                print(f"Resubscribe failed: {e}")

    async def _heartbeat_monitor(self):
        """Detect stale connections."""
        while True:
            await asyncio.sleep(self.heartbeat_interval)
            if self.last_message_time == 0:
                continue
            silence = time.time() - self.last_message_time
            if silence > self.heartbeat_interval * 2:
                print(
                    f"No messages for {silence:.0f}s. "
                    "Forcing reconnect."
                )
                if self.ws:
                    await self.ws.close()
                break

    async def stop(self):
        self._running = False
        if self.ws:
            await self.ws.close()

Usage with Binance

python
async def handle_message(data: dict):
    if 'e' in data and data['e'] == 'trade':
        print(f"{data['s']} {data['p']} x{data['q']}")

async def main():
    manager = WebSocketManager(
        url="wss://stream.binance.com:9443/ws",
        on_message=handle_message,
        subscriptions=[
            {
                "method": "SUBSCRIBE",
                "params": [
                    "btcusdt@trade",
                    "ethusdt@trade",
                ],
                "id": 1
            }
        ],
        heartbeat_interval=30.0,
    )
    await manager.connect()

# asyncio.run(main())

Checklist for Production

  • Exponential backoff with jitter (prevent thundering herd)
  • Heartbeat monitoring (detect silent disconnects)
  • Automatic resubscription after reconnect
  • Max retry limit with alerting
  • Message sequence tracking to detect gaps
  • Buffer queuing during reconnection window

This reconnection pattern is battle-tested in the ClawDUX real-time data pipeline, where strategy verification requires uninterrupted data feeds across multiple exchanges.

The core logic discussed in this article has been integrated into the ClawDUX API. Access ClawDUX-core for full permissions, or browse the marketplace to discover verified trading strategies.

#websocket#reconnection#trading#real-time#reliability

Related Articles