Python asyncio for High-Frequency Trading Data Collection
Python asyncio for High-Frequency Trading Data Collection
When you need tick-level data from 5+ exchanges simultaneously, synchronous code won't cut it. asyncio lets you handle hundreds of concurrent connections without threads.
Why asyncio for Trading
- Non-blocking I/O: One process handles 100+ WebSocket connections
- Low latency: No thread switching overhead
- Deterministic: Single-threaded = no race conditions on shared state
- Memory efficient: Coroutines use ~1KB vs ~8MB per thread
The Data Collection Pipeline
import asyncio
import aiohttp
import json
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List
@dataclass
class TickData:
exchange: str
symbol: str
price: float
volume: float
timestamp: float
side: str # 'buy' or 'sell'
class AsyncDataCollector:
def __init__(self, symbols: List[str]):
self.symbols = symbols
self.tick_buffer: List[TickData] = []
self.stats: Dict[str, int] = defaultdict(int)
self._running = False
async def start(self):
"""Start all exchange connections concurrently."""
self._running = True
tasks = [
self._binance_stream(),
self._coinbase_stream(),
self._flush_buffer_loop(),
]
await asyncio.gather(*tasks, return_exceptions=True)
async def _binance_stream(self):
"""Connect to Binance trade stream."""
streams = "/".join(
f"{s.lower().replace('/', '')}@trade"
for s in self.symbols
)
url = (
f"wss://stream.binance.com:9443"
f"/stream?streams={streams}"
)
async with aiohttp.ClientSession() as session:
while self._running:
try:
async with session.ws_connect(
url, heartbeat=20
) as ws:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if 'data' in data:
trade = data['data']
tick = TickData(
exchange='binance',
symbol=trade['s'],
price=float(trade['p']),
volume=float(trade['q']),
timestamp=trade['T'] / 1000,
side='sell' if trade['m']
else 'buy',
)
self.tick_buffer.append(tick)
self.stats['binance'] += 1
except Exception as e:
print(f"Binance error: {e}")
await asyncio.sleep(2)
async def _coinbase_stream(self):
"""Connect to Coinbase WebSocket feed."""
url = "wss://ws-feed.exchange.coinbase.com"
sub_msg = {
"type": "subscribe",
"channels": ["matches"],
"product_ids": [
s.replace('/', '-') for s in self.symbols
],
}
async with aiohttp.ClientSession() as session:
while self._running:
try:
async with session.ws_connect(url) as ws:
await ws.send_json(sub_msg)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data.get('type') == 'match':
tick = TickData(
exchange='coinbase',
symbol=data['product_id'],
price=float(data['price']),
volume=float(data['size']),
timestamp=time.time(),
side=data['side'],
)
self.tick_buffer.append(tick)
self.stats['coinbase'] += 1
except Exception as e:
print(f"Coinbase error: {e}")
await asyncio.sleep(2)
async def _flush_buffer_loop(self):
"""Periodically flush tick buffer to storage."""
while self._running:
await asyncio.sleep(5)
if self.tick_buffer:
batch = self.tick_buffer.copy()
self.tick_buffer.clear()
# In production: write to database or parquet
total = sum(self.stats.values())
print(
f"Flushed {len(batch)} ticks "
f"(total: {total:,})"
)
def stop(self):
self._running = False
# Usage
async def main():
collector = AsyncDataCollector(
symbols=['BTC/USDT', 'ETH/USDT']
)
try:
await collector.start()
except KeyboardInterrupt:
collector.stop()
# asyncio.run(main())
Performance Numbers
| Metric | Sync (requests) | Async (aiohttp) |
|---|---|---|
| Connections | 1 at a time | 100+ concurrent |
| Latency per call | 50-200ms | 1-5ms overhead |
| Memory (100 feeds) | ~800MB (threads) | ~50MB |
| Throughput | ~20 req/s | ~5000 msg/s |
This async data collection architecture is the backbone of the ClawDUX strategy verification system, where strategies are tested against real-time data from multiple exchanges simultaneously.
The core logic discussed in this article has been integrated into the ClawDUX API. Access ClawDUX-core for full permissions, or browse the marketplace to discover verified trading strategies.