💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~
专栏导航
- Python系列: Python面试题合集,剑指大厂
- Git系列: Git操作技巧
- GO系列: 记录博主学习GO语言的笔记,该笔记专栏尽量写的试用所有入门GO语言的初学者
- 数据库系列: 详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等
- 运维系列: 总结好用的命令,高效开发
- 算法与数据结构系列: 总结数据结构和算法,不同类型针对性训练,提升编程思维
非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨
💖The Start💖点点关注,收藏不迷路💖
1、Sanic WebSocket 实现股票实时推送服务
在本文中,我们将详细介绍如何使用 Sanic 框架来实现一个股票实时推送服务,并且展示如何在客户端通过 HTML 和 JavaScript 代码接收这些实时更新。Sanic 是一个异步的 Python 框架,非常适合构建高性能的 Web 应用程序和 API。我们将通过以下几个步骤来实现这个系统:
- 设置 Sanic WebSocket 服务端
- 创建 HTML 客户端代码
- 实现客户端与服务端的交互
1.1. 设置 Sanic WebSocket 服务端
首先,我们需要安装 Sanic 和相关的 WebSocket 库。可以使用以下命令来安装:
pip install sanic sanic_cors websockets
接下来,我们创建一个简单的 Sanic 应用程序,并使用 WebSocket 实现实时的股票数据推送。
import json
import uuid
from json import JSONDecodeError
from sanic import Sanic, response
from sanic_cors import CORS, cross_origin
from sanic.websocket import WebSocketProtocol
from websockets import ConnectionClosedError, ConnectionClosedOK
import asyncio
import time
import random
app = Sanic("StockWebSocket")
CORS(app)
# 以股票代码为键的字典,存储订阅者列表
subscriptions = {}
# 模拟股票行情数据,实际开发中用真实行情源代替
stock_data = {
"a": {"stock_name": "阿里", "price": 150.00, "change": 0.25},
"b": {"stock_name": "百度", "price": 2750.00, "change": 0.25},
"t": {"stock_name": "腾讯", "price": 3400.00, "change": 0.25}
}
class SubscriptObject(object):
def __init__(self, register_id, ws):
self.register_id = register_id
self.ws = ws
async def recv(self):
try:
while True:
msg = await self.ws.recv()
if not msg:
continue
data = json.loads(msg)
action = data.get('action')
stock_symbol = data.get('stock_symbol')
if action == "subscribe":
if stock_symbol in stock_data:
if not subscriptions.get(stock_symbol):
subscriptions[stock_symbol] = set()
subscriptions[stock_symbol].add(self.ws)
else:
await self.ws.send(json.dumps({"error": "Unknown stock symbol"}))
else:
print(f"当前状态 {data}")
except ConnectionClosedError:
print(f'{self.register_id} 断开连接, 取消监听')
except ConnectionClosedOK:
print(f'{self.register_id} 断开连接, 取消监听')
except JSONDecodeError:
print(f'收到的数据解析失败')
finally:
# 清理客户端订阅
for symbol in subscriptions:
subscriptions[symbol].discard(self.ws)
print("Client disconnected")
async def stock_data_generator():
while True:
# 模拟股票数据更新
stock_code_list = list(subscriptions.keys())
for symbol in stock_code_list:
ws_list = subscriptions.get(symbol, []).copy()
dd = stock_data.get(symbol)
data = dict()
data['stock_name'] = dd['stock_name']
data['price'] = dd['price'] + random.uniform(-5, 5)
data['change'] = dd['change'] + random.uniform(-1, 1)
data['timestamp'] = time.time()
for ws in ws_list:
try:
await ws.send(json.dumps(data))
except ConnectionClosedOK:
print("连接取消前", subscriptions[symbol])
subscriptions[symbol].discard(ws)
print("连接取消后", subscriptions[symbol])
await asyncio.sleep(1) # 每2秒更新一次
@app.websocket("/ws/stock")
@cross_origin(app) # 为这个路由启用CORS
async def stock_websocket(request, ws):
# 模拟实时更新股票价格
register_id = uuid.uuid4().hex
sub_obj = SubscriptObject(register_id, ws)
await sub_obj.recv()
# 启动股票数据更新任务
app.add_task(stock_data_generator())
@app.route('/')
async def index(request):
return await response.file('index.html')
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, protocol=WebSocketProtocol)
1.2. 创建 HTML 客户端代码
我们需要一个简单的 HTML 页面,允许用户选择股票符号,并通过 WebSocket 连接到服务端以接收实时更新。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Stock Real-time Update</title>
<style>
body {
font-family: Arial, sans-serif;
}
.stock-data {
margin: 20px;
padding: 10px;
border: 1px solid #ddd;
}
select {
margin: 20px;
padding: 5px;
}
</style>
</head>
<body>
<h1>实时股票行情</h1>
<select id="stock-select">
<option value="a">阿里</option>
<option value="b">百度</option>
<option value="t">腾讯</option>
</select>
<div id="stock-container">
<!-- 初始化一个空的 stock-data 元素,用于动态添加新的 stock-data 元素 -->
<div class="stock-data" id="stock-data-template" style="display: none;">
股票名称: <span class="stock-name"></span><br>
涨跌幅: <span class="change"></span><br>
时间戳: <span class="timestamp"></span><br>
</div>
</div>
<script>
const stockSelect = document.getElementById('stock-select');
const stockContainer = document.getElementById('stock-container');
const stockDataTemplate = document.getElementById('stock-data-template');
function updateStock(symbol) {
const ws = new WebSocket(`ws://localhost:8000/ws/stock`);
const stockDataElement = stockDataTemplate.cloneNode(true); // 复制 stock-data 模板元素
stockDataElement.id = `stock-data-${symbol}`; // 根据股票代码设置 ID
stockDataElement.style.display = 'block'; // 显示元素
stockContainer.appendChild(stockDataElement); // 将元素添加到容器中
ws.onopen = function () {
console.log("WebSocket connection established.");
ws.send(JSON.stringify({
action: 'subscribe',
stock_symbol: symbol
}));
};
ws.onmessage = function (event) {
const data = JSON.parse(event.data);
const stockData = stockDataElement.getElementsByClassName('stock-data')[0];
if (data.error) {
stockData.innerHTML = `<p>${data.error}</p>`;
} else {
console.log("ddddddd", data)
stockDataElement.getElementsByClassName('stock-name')[0].textContent = data.stock_name;
stockDataElement.getElementsByClassName('change')[0].textContent = data.change;
stockDataElement.getElementsByClassName('timestamp')[0].textContent = data.timestamp;
}
};
ws.onerror = function (error) {
console.log("WebSocket Error: ", error);
};
}
stockSelect.addEventListener('change', function () {
const selectedSymbol = stockSelect.value;
updateStock(selectedSymbol);
});
// 初始化加载第一个股票的数据
updateStock(stockSelect.value);
</script>
</body>
</html>
2. 实现客户端与服务端的交互
在上面的代码中,客户端通过 WebSocket 连接到服务端,并根据输入的股票符号接收实时更新。服务端定期推送模拟的股票数据到所有订阅该股票符号的客户端。
3. 总结
通过以上步骤,我们实现了一个基于 Sanic 的 WebSocket 股票实时推送服务,并创建了一个简单的 HTML 客户端以接收这些实时更新。这个示例可以根据实际需求进行扩展和优化,如连接管理、数据来源的整合、错误处理等。希望这个示例能帮助你在实际项目中实现实时数据推送功能。
🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙
💖The End💖点点关注,收藏不迷路💖
|