Polygon.io API: Real-Time US Stock Data Ingestion and Cleaning in Python
Polygon.io API: Real-Time US Stock Data Ingestion and Cleaning in Python
Polygon.io provides institutional-grade US equity data at a fraction of Bloomberg's cost. Here's how to build a clean data pipeline with it.
Setup
pip install polygon-api-client pandas websocket-client
REST API: Historical Data
from polygon import RESTClient
import pandas as pd
from datetime import datetime, timedelta
client = RESTClient(api_key="your_polygon_key")
def get_aggregates(
symbol: str,
days: int = 90,
timespan: str = "day",
multiplier: int = 1,
) -> pd.DataFrame:
"""Fetch aggregate bars from Polygon."""
end = datetime.now()
start = end - timedelta(days=days)
aggs = client.get_aggs(
ticker=symbol,
multiplier=multiplier,
timespan=timespan,
from_=start.strftime('%Y-%m-%d'),
to=end.strftime('%Y-%m-%d'),
limit=50000,
)
df = pd.DataFrame([{
'timestamp': pd.Timestamp(a.timestamp, unit='ms'),
'open': a.open,
'high': a.high,
'low': a.low,
'close': a.close,
'volume': a.volume,
'vwap': a.vwap,
'trades': a.transactions,
} for a in aggs])
df.set_index('timestamp', inplace=True)
return df
# Get 90 days of minute bars for AAPL
df = get_aggregates('AAPL', days=90, timespan='minute')
print(f"Loaded {len(df):,} minute bars")
Data Cleaning Pipeline
def clean_market_data(df: pd.DataFrame) -> pd.DataFrame:
"""Clean and normalize market data."""
df = df.copy()
# Remove pre/post market if present
if df.index.dtype == 'datetime64[ns]':
df = df.between_time('09:30', '16:00')
# Remove zero-volume bars (market closed)
df = df[df['volume'] > 0]
# Handle missing VWAP
df['vwap'] = df['vwap'].fillna(
(df['high'] + df['low'] + df['close']) / 3
)
# Remove obvious bad ticks (price > 3 std from rolling mean)
rolling_mean = df['close'].rolling(20).mean()
rolling_std = df['close'].rolling(20).std()
df = df[
(df['close'] > rolling_mean - 3 * rolling_std) &
(df['close'] < rolling_mean + 3 * rolling_std)
]
# Forward-fill small gaps (up to 5 bars)
df = df.asfreq(df.index.inferred_freq)
df = df.ffill(limit=5)
df = df.dropna()
return df
clean_df = clean_market_data(df)
print(f"After cleaning: {len(clean_df):,} bars "
f"(removed {len(df) - len(clean_df):,})")
WebSocket: Real-Time Streaming
import websocket
import json
def on_message(ws, message):
data = json.loads(message)
for event in data:
if event.get('ev') == 'T': # Trade event
print(
f"{event['sym']} "
f"${event['p']:.2f} "
f"x{event['s']} "
f"({event['x']})" # exchange ID
)
def on_open(ws):
# Authenticate
ws.send(json.dumps({
"action": "auth",
"params": "your_polygon_key"
}))
# Subscribe to trades
ws.send(json.dumps({
"action": "subscribe",
"params": "T.AAPL,T.MSFT,T.GOOGL"
}))
ws = websocket.WebSocketApp(
"wss://socket.polygon.io/stocks",
on_open=on_open,
on_message=on_message,
)
# ws.run_forever()
This data pipeline architecture — REST for historical backtesting, WebSocket for live execution — is the same dual-mode pattern used in ClawDUX's strategy verification engine.
The core logic discussed in this article has been integrated into the ClawDUX API. Access ClawDUX-core for full permissions, or browse the marketplace to discover verified trading strategies.