API & Python

Polygon.io API: Real-Time US Stock Data Ingestion and Cleaning in Python

ClawDUX TeamMarch 27, 20266 min read0 views

Polygon.io API: Real-Time US Stock Data Ingestion and Cleaning in Python

Polygon.io provides institutional-grade US equity data at a fraction of Bloomberg's cost. Here's how to build a clean data pipeline with it.

Setup

bash
pip install polygon-api-client pandas websocket-client

REST API: Historical Data

python
from polygon import RESTClient
import pandas as pd
from datetime import datetime, timedelta

client = RESTClient(api_key="your_polygon_key")

def get_aggregates(
    symbol: str,
    days: int = 90,
    timespan: str = "day",
    multiplier: int = 1,
) -> pd.DataFrame:
    """Fetch aggregate bars from Polygon."""
    end = datetime.now()
    start = end - timedelta(days=days)

    aggs = client.get_aggs(
        ticker=symbol,
        multiplier=multiplier,
        timespan=timespan,
        from_=start.strftime('%Y-%m-%d'),
        to=end.strftime('%Y-%m-%d'),
        limit=50000,
    )

    df = pd.DataFrame([{
        'timestamp': pd.Timestamp(a.timestamp, unit='ms'),
        'open': a.open,
        'high': a.high,
        'low': a.low,
        'close': a.close,
        'volume': a.volume,
        'vwap': a.vwap,
        'trades': a.transactions,
    } for a in aggs])

    df.set_index('timestamp', inplace=True)
    return df

# Get 90 days of minute bars for AAPL
df = get_aggregates('AAPL', days=90, timespan='minute')
print(f"Loaded {len(df):,} minute bars")

Data Cleaning Pipeline

python
def clean_market_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean and normalize market data."""
    df = df.copy()

    # Remove pre/post market if present
    if df.index.dtype == 'datetime64[ns]':
        df = df.between_time('09:30', '16:00')

    # Remove zero-volume bars (market closed)
    df = df[df['volume'] > 0]

    # Handle missing VWAP
    df['vwap'] = df['vwap'].fillna(
        (df['high'] + df['low'] + df['close']) / 3
    )

    # Remove obvious bad ticks (price > 3 std from rolling mean)
    rolling_mean = df['close'].rolling(20).mean()
    rolling_std = df['close'].rolling(20).std()
    df = df[
        (df['close'] > rolling_mean - 3 * rolling_std) &
        (df['close'] < rolling_mean + 3 * rolling_std)
    ]

    # Forward-fill small gaps (up to 5 bars)
    df = df.asfreq(df.index.inferred_freq)
    df = df.ffill(limit=5)
    df = df.dropna()

    return df

clean_df = clean_market_data(df)
print(f"After cleaning: {len(clean_df):,} bars "
      f"(removed {len(df) - len(clean_df):,})")

WebSocket: Real-Time Streaming

python
import websocket
import json

def on_message(ws, message):
    data = json.loads(message)
    for event in data:
        if event.get('ev') == 'T':  # Trade event
            print(
                f"{event['sym']} "
                f"${event['p']:.2f} "
                f"x{event['s']} "
                f"({event['x']})"  # exchange ID
            )

def on_open(ws):
    # Authenticate
    ws.send(json.dumps({
        "action": "auth",
        "params": "your_polygon_key"
    }))
    # Subscribe to trades
    ws.send(json.dumps({
        "action": "subscribe",
        "params": "T.AAPL,T.MSFT,T.GOOGL"
    }))

ws = websocket.WebSocketApp(
    "wss://socket.polygon.io/stocks",
    on_open=on_open,
    on_message=on_message,
)
# ws.run_forever()

This data pipeline architecture — REST for historical backtesting, WebSocket for live execution — is the same dual-mode pattern used in ClawDUX's strategy verification engine.

The core logic discussed in this article has been integrated into the ClawDUX API. Access ClawDUX-core for full permissions, or browse the marketplace to discover verified trading strategies.

#polygon#stocks#real-time-data#python#market-data

Related Articles