使用WebSocket实时获取印度股票数据源(无调用次数限制)实战
一、前置准备
1. 获取API密钥
登录 StockTV开发者平台 → 联系客服获取测试Key(格式MY4b781f618e3f43c4b055f25fa61941ad
),该密钥无调用次数限制且支持实时数据持续订阅。
2. 安装Python依赖
pip install websocket-client json pandas
二、核心代码实现
1. 建立WebSocket连接
import websocket
import json
import time
API_KEY = "YOUR_API_KEY"
WS_URL = f"wss://ws-api.stocktv.top/connect?key={API_KEY}"
def on_message(ws, message):
"""处理实时行情推送"""
data = json.loads(message)
if data.get('type') == 'stock':
print(f"[{data['symbol']}] 价格: {data['last']} 涨跌幅: {data['pcp']}%")
def on_error(ws, error):
print(f"连接异常: {error}")
def on_close(ws, close_status_code, close_msg):
print(f"连接关闭: {close_msg}")
def on_open(ws):
"""连接成功后订阅股票"""
subscribe_msg = json.dumps({
"action": "subscribe",
"symbols": ["RELIANCE", "NSEI"] # 印度信实工业/Nifty50指数
})
ws.send(subscribe_msg)
print("订阅成功,开始接收实时数据...")
2. 启动实时监听(含自动重连)
def start_websocket():
while True:
try:
ws = websocket.WebSocketApp(
WS_URL,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.on_open = on_open
ws.run_forever()
except Exception as e:
print(f"连接异常,5秒后重连: {str(e)}")
time.sleep(5)
# 启动线程持续运行
import threading
threading.Thread(target=start_websocket, daemon=True).start()
3. 添加心跳机制(保持长连接)
def send_heartbeat(ws):
"""每30秒发送心跳包"""
while True:
try:
ws.send(json.dumps({"action": "ping"}))
time.sleep(30)
except Exception as e:
break
# 在on_open函数中启动心跳线程
def on_open(ws):
# ...原有订阅代码...
threading.Thread(target=send_heartbeat, args=(ws,), daemon=True).start()
三、实时数据示例输出
订阅成功,开始接收实时数据...
[RELIANCE] 价格: 2856.15 涨跌幅: +1.23%
[NSEI] 价格: 22985.40 涨跌幅: +0.75%
[RELIANCE] 价格: 2857.80 涨跌幅: +1.35%
四、关键参数说明
字段 | 说明 | 示例值 |
---|---|---|
symbol |
股票/指数代码 | RELIANCE, NSEI |
last |
最新成交价 | 2856.15 |
pcp |
涨跌幅百分比(自动带±号) | +1.23% |
volume |
成交量(股) | 1254875 |
timestamp |
数据时间戳(Unix毫秒级) | 1725002394123 |
五、注意事项
连接稳定性
通过自动重连机制+心跳包保障7×24小时持续运行数据时效性
印度市场交易时段为IST 9:15-15:30(北京时间11:45-18:00),非交易时段无实时数据推送性能优化
建议使用异步处理框架(如asyncio
)避免数据堆积,实测单连接可承载100+标的实时推送