sanic + webSocket:股票实时行情推送服务实现

发布于:2024-08-19 ⋅ 阅读:(78) ⋅ 点赞:(0)

💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
在这里插入图片描述


1、Sanic WebSocket 实现股票实时推送服务

在本文中,我们将详细介绍如何使用 Sanic 框架来实现一个股票实时推送服务,并且展示如何在客户端通过 HTML 和 JavaScript 代码接收这些实时更新。Sanic 是一个异步的 Python 框架,非常适合构建高性能的 Web 应用程序和 API。我们将通过以下几个步骤来实现这个系统:

  1. 设置 Sanic WebSocket 服务端
  2. 创建 HTML 客户端代码
  3. 实现客户端与服务端的交互

1.1. 设置 Sanic WebSocket 服务端

首先,我们需要安装 Sanic 和相关的 WebSocket 库。可以使用以下命令来安装:

pip install sanic sanic_cors websockets

接下来,我们创建一个简单的 Sanic 应用程序,并使用 WebSocket 实现实时的股票数据推送。

import json
import uuid
from json import JSONDecodeError

from sanic import Sanic, response
from sanic_cors import CORS, cross_origin
from sanic.websocket import WebSocketProtocol
from websockets import ConnectionClosedError, ConnectionClosedOK
import asyncio
import time
import random

app = Sanic("StockWebSocket")
CORS(app)

# 以股票代码为键的字典,存储订阅者列表
subscriptions = {}

# 模拟股票行情数据,实际开发中用真实行情源代替
stock_data = {
    "a": {"stock_name": "阿里", "price": 150.00, "change": 0.25},
    "b": {"stock_name": "百度", "price": 2750.00, "change": 0.25},
    "t": {"stock_name": "腾讯", "price": 3400.00, "change": 0.25}
}


class SubscriptObject(object):
    def __init__(self, register_id, ws):
        self.register_id = register_id
        self.ws = ws

    async def recv(self):
        try:
            while True:
                msg = await self.ws.recv()
                if not msg:
                    continue
                data = json.loads(msg)
                action = data.get('action')
                stock_symbol = data.get('stock_symbol')
                if action == "subscribe":
                    if stock_symbol in stock_data:
                        if not subscriptions.get(stock_symbol):
                            subscriptions[stock_symbol] = set()
                        subscriptions[stock_symbol].add(self.ws)
                    else:
                        await self.ws.send(json.dumps({"error": "Unknown stock symbol"}))
                else:
                    print(f"当前状态 {data}")
        except ConnectionClosedError:
            print(f'{self.register_id} 断开连接, 取消监听')
        except ConnectionClosedOK:
            print(f'{self.register_id} 断开连接, 取消监听')
        except JSONDecodeError:
            print(f'收到的数据解析失败')
        finally:
            # 清理客户端订阅
            for symbol in subscriptions:
                subscriptions[symbol].discard(self.ws)
            print("Client disconnected")


async def stock_data_generator():
    while True:
        # 模拟股票数据更新
        stock_code_list = list(subscriptions.keys())
        for symbol in stock_code_list:
            ws_list = subscriptions.get(symbol, []).copy()
            dd = stock_data.get(symbol)
            data = dict()
            data['stock_name'] = dd['stock_name']
            data['price'] = dd['price'] + random.uniform(-5, 5)
            data['change'] = dd['change'] + random.uniform(-1, 1)
            data['timestamp'] = time.time()
            for ws in ws_list:
                try:
                    await ws.send(json.dumps(data))
                except ConnectionClosedOK:
                    print("连接取消前", subscriptions[symbol])
                    subscriptions[symbol].discard(ws)
                    print("连接取消后", subscriptions[symbol])
        await asyncio.sleep(1)  # 每2秒更新一次


@app.websocket("/ws/stock")
@cross_origin(app)  # 为这个路由启用CORS
async def stock_websocket(request, ws):
    # 模拟实时更新股票价格
    register_id = uuid.uuid4().hex
    sub_obj = SubscriptObject(register_id, ws)
    await sub_obj.recv()


# 启动股票数据更新任务
app.add_task(stock_data_generator())


@app.route('/')
async def index(request):
    return await response.file('index.html')


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, protocol=WebSocketProtocol)

1.2. 创建 HTML 客户端代码

我们需要一个简单的 HTML 页面,允许用户选择股票符号,并通过 WebSocket 连接到服务端以接收实时更新。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Stock Real-time Update</title>
    <style>
        body {
            font-family: Arial, sans-serif;
        }

        .stock-data {
            margin: 20px;
            padding: 10px;
            border: 1px solid #ddd;
        }

        select {
            margin: 20px;
            padding: 5px;
        }
    </style>
</head>
<body>
<h1>实时股票行情</h1>
<select id="stock-select">
    <option value="a">阿里</option>
    <option value="b">百度</option>
    <option value="t">腾讯</option>
</select>
<div id="stock-container">
    <!-- 初始化一个空的 stock-data 元素,用于动态添加新的 stock-data 元素 -->
    <div class="stock-data" id="stock-data-template" style="display: none;">
        股票名称: <span class="stock-name"></span><br>
        涨跌幅: <span class="change"></span><br>
        时间戳: <span class="timestamp"></span><br>
    </div>
</div>

<script>
    const stockSelect = document.getElementById('stock-select');
    const stockContainer = document.getElementById('stock-container');
    const stockDataTemplate = document.getElementById('stock-data-template');

    function updateStock(symbol) {
        const ws = new WebSocket(`ws://localhost:8000/ws/stock`);
        const stockDataElement = stockDataTemplate.cloneNode(true); // 复制 stock-data 模板元素
        stockDataElement.id = `stock-data-${symbol}`; // 根据股票代码设置 ID
        stockDataElement.style.display = 'block'; // 显示元素
        stockContainer.appendChild(stockDataElement); // 将元素添加到容器中

        ws.onopen = function () {
            console.log("WebSocket connection established.");
            ws.send(JSON.stringify({
                action: 'subscribe',
                stock_symbol: symbol
            }));
        };

        ws.onmessage = function (event) {
            const data = JSON.parse(event.data);
            const stockData = stockDataElement.getElementsByClassName('stock-data')[0];
            if (data.error) {
                stockData.innerHTML = `<p>${data.error}</p>`;
            } else {
                console.log("ddddddd", data)
                stockDataElement.getElementsByClassName('stock-name')[0].textContent = data.stock_name;
                stockDataElement.getElementsByClassName('change')[0].textContent = data.change;
                stockDataElement.getElementsByClassName('timestamp')[0].textContent = data.timestamp;
            }
        };

        ws.onerror = function (error) {
            console.log("WebSocket Error: ", error);
        };
    }

    stockSelect.addEventListener('change', function () {
        const selectedSymbol = stockSelect.value;
        updateStock(selectedSymbol);
    });

    // 初始化加载第一个股票的数据
    updateStock(stockSelect.value);
</script>
</body>
</html>

访问地址:http://localhost:8000

2. 实现客户端与服务端的交互

在上面的代码中,客户端通过 WebSocket 连接到服务端,并根据输入的股票符号接收实时更新。服务端定期推送模拟的股票数据到所有订阅该股票符号的客户端。

3. 总结

通过以上步骤,我们实现了一个基于 Sanic 的 WebSocket 股票实时推送服务,并创建了一个简单的 HTML 客户端以接收这些实时更新。这个示例可以根据实际需求进行扩展和优化,如连接管理、数据来源的整合、错误处理等。希望这个示例能帮助你在实际项目中实现实时数据推送功能。


🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

💖The End💖点点关注,收藏不迷路💖