API & Python

Python asyncio for High-Frequency Trading Data Collection

ClawDUX TeamMarch 26, 20267 min read0 views

Python asyncio for High-Frequency Trading Data Collection

When you need tick-level data from 5+ exchanges simultaneously, synchronous code won't cut it. asyncio lets you handle hundreds of concurrent connections without threads.

Why asyncio for Trading

  • Non-blocking I/O: One process handles 100+ WebSocket connections
  • Low latency: No thread switching overhead
  • Deterministic: Single-threaded = no race conditions on shared state
  • Memory efficient: Coroutines use ~1KB vs ~8MB per thread

The Data Collection Pipeline

python
import asyncio
import aiohttp
import json
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List

@dataclass
class TickData:
    exchange: str
    symbol: str
    price: float
    volume: float
    timestamp: float
    side: str  # 'buy' or 'sell'

class AsyncDataCollector:
    def __init__(self, symbols: List[str]):
        self.symbols = symbols
        self.tick_buffer: List[TickData] = []
        self.stats: Dict[str, int] = defaultdict(int)
        self._running = False

    async def start(self):
        """Start all exchange connections concurrently."""
        self._running = True
        tasks = [
            self._binance_stream(),
            self._coinbase_stream(),
            self._flush_buffer_loop(),
        ]
        await asyncio.gather(*tasks, return_exceptions=True)

    async def _binance_stream(self):
        """Connect to Binance trade stream."""
        streams = "/".join(
            f"{s.lower().replace('/', '')}@trade"
            for s in self.symbols
        )
        url = (
            f"wss://stream.binance.com:9443"
            f"/stream?streams={streams}"
        )

        async with aiohttp.ClientSession() as session:
            while self._running:
                try:
                    async with session.ws_connect(
                        url, heartbeat=20
                    ) as ws:
                        async for msg in ws:
                            if msg.type == aiohttp.WSMsgType.TEXT:
                                data = json.loads(msg.data)
                                if 'data' in data:
                                    trade = data['data']
                                    tick = TickData(
                                        exchange='binance',
                                        symbol=trade['s'],
                                        price=float(trade['p']),
                                        volume=float(trade['q']),
                                        timestamp=trade['T'] / 1000,
                                        side='sell' if trade['m']
                                             else 'buy',
                                    )
                                    self.tick_buffer.append(tick)
                                    self.stats['binance'] += 1
                except Exception as e:
                    print(f"Binance error: {e}")
                    await asyncio.sleep(2)

    async def _coinbase_stream(self):
        """Connect to Coinbase WebSocket feed."""
        url = "wss://ws-feed.exchange.coinbase.com"
        sub_msg = {
            "type": "subscribe",
            "channels": ["matches"],
            "product_ids": [
                s.replace('/', '-') for s in self.symbols
            ],
        }

        async with aiohttp.ClientSession() as session:
            while self._running:
                try:
                    async with session.ws_connect(url) as ws:
                        await ws.send_json(sub_msg)
                        async for msg in ws:
                            if msg.type == aiohttp.WSMsgType.TEXT:
                                data = json.loads(msg.data)
                                if data.get('type') == 'match':
                                    tick = TickData(
                                        exchange='coinbase',
                                        symbol=data['product_id'],
                                        price=float(data['price']),
                                        volume=float(data['size']),
                                        timestamp=time.time(),
                                        side=data['side'],
                                    )
                                    self.tick_buffer.append(tick)
                                    self.stats['coinbase'] += 1
                except Exception as e:
                    print(f"Coinbase error: {e}")
                    await asyncio.sleep(2)

    async def _flush_buffer_loop(self):
        """Periodically flush tick buffer to storage."""
        while self._running:
            await asyncio.sleep(5)
            if self.tick_buffer:
                batch = self.tick_buffer.copy()
                self.tick_buffer.clear()
                # In production: write to database or parquet
                total = sum(self.stats.values())
                print(
                    f"Flushed {len(batch)} ticks "
                    f"(total: {total:,})"
                )

    def stop(self):
        self._running = False

# Usage
async def main():
    collector = AsyncDataCollector(
        symbols=['BTC/USDT', 'ETH/USDT']
    )
    try:
        await collector.start()
    except KeyboardInterrupt:
        collector.stop()

# asyncio.run(main())

Performance Numbers

Metric Sync (requests) Async (aiohttp)
Connections 1 at a time 100+ concurrent
Latency per call 50-200ms 1-5ms overhead
Memory (100 feeds) ~800MB (threads) ~50MB
Throughput ~20 req/s ~5000 msg/s

This async data collection architecture is the backbone of the ClawDUX strategy verification system, where strategies are tested against real-time data from multiple exchanges simultaneously.

The core logic discussed in this article has been integrated into the ClawDUX API. Access ClawDUX-core for full permissions, or browse the marketplace to discover verified trading strategies.

#python#asyncio#high-frequency#data-collection#performance

Related Articles