API & Python

How to Build a Lightweight Crypto Real-Time Price Scraping Script in Python

ClawDUX TeamMarch 23, 20266 min read0 views

How to Build a Lightweight Crypto Real-Time Price Scraping Script in Python

You don't need a Bloomberg terminal or expensive data feeds to get real-time crypto prices. With Python and the ccxt library, you can aggregate live data from 100+ exchanges in under 100 lines of code.

The Stack

  • ccxt: Unified API for 100+ crypto exchanges
  • asyncio: Non-blocking concurrent data fetching
  • pandas: Optional, for data analysis
bash
pip install ccxt pandas

Basic: Fetch Spot Prices from Multiple Exchanges

python
import ccxt
import time
from typing import Dict, List

def fetch_prices(symbols: List[str],
                 exchanges: List[str] = None) -> Dict:
    """Fetch current prices from multiple exchanges."""
    if exchanges is None:
        exchanges = ['binance', 'coinbasepro', 'kraken']

    results = {}
    for exchange_id in exchanges:
        try:
            exchange = getattr(ccxt, exchange_id)({
                'enableRateLimit': True,  # built-in rate limiting
                'timeout': 10000,
            })
            tickers = exchange.fetch_tickers(symbols)
            results[exchange_id] = {
                symbol: {
                    'bid': ticker['bid'],
                    'ask': ticker['ask'],
                    'last': ticker['last'],
                    'volume': ticker['baseVolume'],
                }
                for symbol, ticker in tickers.items()
                if ticker['bid'] is not None
            }
        except Exception as e:
            print(f"Error fetching from {exchange_id}: {e}")

    return results

# Example
prices = fetch_prices(['BTC/USDT', 'ETH/USDT'])
for exchange, data in prices.items():
    for symbol, info in data.items():
        print(f"{exchange:15} {symbol:10} "
              f"bid={info['bid']:<12.2f} "
              f"ask={info['ask']:<12.2f}")

Advanced: Async Streaming with Spread Detection

python
import ccxt.async_support as ccxt_async
import asyncio

class CryptoStreamer:
    def __init__(self, symbols: list, exchanges: list = None):
        self.symbols = symbols
        self.exchange_ids = exchanges or [
            'binance', 'coinbasepro', 'kraken'
        ]
        self.latest_prices: Dict = {}
        self.exchanges = {}

    async def start(self):
        """Initialize exchanges and start streaming."""
        for eid in self.exchange_ids:
            self.exchanges[eid] = getattr(ccxt_async, eid)({
                'enableRateLimit': True
            })

        tasks = [
            self._poll_exchange(eid)
            for eid in self.exchange_ids
        ]
        await asyncio.gather(*tasks)

    async def _poll_exchange(self, exchange_id: str):
        """Continuously poll one exchange."""
        exchange = self.exchanges[exchange_id]
        while True:
            try:
                tickers = await exchange.fetch_tickers(self.symbols)
                for symbol, ticker in tickers.items():
                    if ticker['bid'] is None:
                        continue
                    key = f"{exchange_id}:{symbol}"
                    self.latest_prices[key] = {
                        'bid': ticker['bid'],
                        'ask': ticker['ask'],
                        'last': ticker['last'],
                        'ts': ticker['timestamp'],
                    }
                self._check_arbitrage()
            except Exception as e:
                print(f"[{exchange_id}] Error: {e}")
            await asyncio.sleep(2)

    def _check_arbitrage(self):
        """Detect cross-exchange spread opportunities."""
        for symbol in self.symbols:
            bids, asks = [], []
            for eid in self.exchange_ids:
                key = f"{eid}:{symbol}"
                if key in self.latest_prices:
                    p = self.latest_prices[key]
                    bids.append((eid, p['bid']))
                    asks.append((eid, p['ask']))

            if len(bids) < 2:
                continue

            best_bid = max(bids, key=lambda x: x[1])
            best_ask = min(asks, key=lambda x: x[1])

            if best_bid[0] != best_ask[0]:
                spread_pct = (
                    (best_bid[1] - best_ask[1]) / best_ask[1]
                ) * 100
                if spread_pct > 0.1:  # 0.1% threshold
                    print(
                        f"SPREAD {symbol}: "
                        f"Buy@{best_ask[0]} ${best_ask[1]:.2f} -> "
                        f"Sell@{best_bid[0]} ${best_bid[1]:.2f} "
                        f"({spread_pct:.3f}%)"
                    )

    async def shutdown(self):
        for exchange in self.exchanges.values():
            await exchange.close()

# Run the streamer
async def main():
    streamer = CryptoStreamer(
        symbols=['BTC/USDT', 'ETH/USDT', 'SOL/USDT'],
        exchanges=['binance', 'coinbasepro', 'kraken']
    )
    try:
        await streamer.start()
    except KeyboardInterrupt:
        await streamer.shutdown()

# asyncio.run(main())

Output Example

plaintext
binance         BTC/USDT   bid=67234.50     ask=67235.00
coinbasepro     BTC/USDT   bid=67238.20     ask=67239.10
kraken          BTC/USDT   bid=67231.00     ask=67233.00
SPREAD BTC/USDT: Buy@kraken $67233.00 -> Sell@coinbasepro $67238.20 (0.008%)

This multi-exchange data aggregation pattern is one of the building blocks behind the strategy verification engine on ClawDUX, where we validate strategy performance against real market data from multiple sources.

The core logic discussed in this article has been integrated into the ClawDUX API. Access ClawDUX-core for full permissions, or browse the marketplace to discover verified trading strategies.

#python#ccxt#websocket#crypto#real-time-data

Related Articles