import tkinter as tk
from tkinter import ttk
import threading
import queue
import json
import time
import websocket
from SignUtil import SignUtil # 请确保SignUtil.py在同一目录
class WebSocketClient:
def init(self, params, msg_queue, stop_event):
self.params = params
self.msg_queue = msg_queue
self.stop_event = stop_event
self.ws = None
self.heartbeat_interval = 30
def on_message(self, message):
self.msg_queue.put(("info", f"接收消息: {message}"))
def run(self):
try:
# 生成签名
sign_util = SignUtil()
timestamp = str(int(time.time() * 1000))
sign = sign_util.get_sign(
self.params["system_id"],
self.params["system_key"],
timestamp
)
# 构造连接URL
url = f"{self.params['url']}systemId={self.params['system_id']}×tamp={timestamp}&sign={sign}"
self.ws = websocket.create_connection(url)
# 发送订阅请求
topics = [t.strip() for t in self.params["topics"].split(",")]
subscription = json.dumps({"cmd": "subscribe", "topics": topics})
self.ws.send(subscription)
self.msg_queue.put(("info", "订阅请求已发送"))
# 主循环
last_heartbeat = time.time()
while not self.stop_event.is_set():
try:
message = self.ws.recv()
self.on_message(message)
# 心跳处理
if time.time() - last_heartbeat > self.heartbeat_interval:
self.ws.send(json.dumps({"cmd": "keepAlive", "data": "heartbeat"}))
last_heartbeat = time.time()
except websocket.WebSocketConnectionClosedException:
self.msg_queue.put(("error", "连接已关闭"))
break
except Exception as e:
self.msg_queue.put(("error", f"接收错误: {str(e)}"))
break
except Exception as e:
self.msg_queue.put(("error", f"连接失败: {str(e)}"))
finally:
if self.ws:
self.ws.close()
self.stop_event.set()
class WebSocketGUI:
def init(self, root):
self.root = root
self.root.title(“WebSocket 自动化平台”)
# 控制变量
self.is_connected = False
self.stop_event = threading.Event()
self.msg_queue = queue.Queue()
# 创建UI
self.create_widgets()
# 启动消息检查
self.root.after(100, self.process_messages)
def create_widgets(self):
# 输入框框架
input_frame = ttk.LabelFrame(self.root, text="连接参数")
input_frame.pack(padx=10, pady=5, fill=tk.X)
# URL
ttk.Label(input_frame, text="URL:").grid(row=0, column=0, sticky=tk.W)
self.url_entry = ttk.Entry(input_frame, width=50)
self.url_entry.grid(row=0, column=1, padx=5, pady=2)
# System ID
ttk.Label(input_frame, text="System ID:").grid(row=1, column=0, sticky=tk.W)
self.system_id_entry = ttk.Entry(input_frame, width=50)
self.system_id_entry.grid(row=1, column=1, padx=5, pady=2)
# System Key
ttk.Label(input_frame, text="System Key:").grid(row=2, column=0, sticky=tk.W)
self.system_key_entry = ttk.Entry(input_frame, width=50)
self.system_key_entry.grid(row=2, column=1, padx=5, pady=2)
# Topics
ttk.Label(input_frame, text="Topics (逗号分隔):").grid(row=3, column=0, sticky=tk.W)
self.topics_entry = ttk.Entry(input_frame, width=50)
self.topics_entry.grid(row=3, column=1, padx=5, pady=2)
# 按钮框架
btn_frame = ttk.Frame(self.root)
btn_frame.pack(pady=5)
self.start_btn = ttk.Button(btn_frame, text="开始", command=self.start_connection)
self.start_btn.pack(side=tk.LEFT, padx=5)
self.stop_btn = ttk.Button(btn_frame, text="停止",
command=self.stop_connection,
state=tk.DISABLED)
self.stop_btn.pack(side=tk.LEFT, padx=5)
# 日志输出
log_frame = ttk.LabelFrame(self.root, text="日志输出")
log_frame.pack(padx=10, pady=5, fill=tk.BOTH, expand=True)
self.log_text = tk.Text(log_frame, height=15, state=tk.DISABLED)
self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
def start_connection(self):
params = {
"url": self.url_entry.get(),
"system_id": self.system_id_entry.get(),
"system_key": self.system_key_entry.get(),
"topics": self.topics_entry.get()
}
if not all(params.values()):
self.display_message("error", "所有字段必须填写")
return
self.stop_event.clear()
self.client_thread = threading.Thread(
target=WebSocketClient(
params,
self.msg_queue,
self.stop_event
).run,
daemon=True
)
self.client_thread.start()
self.start_btn.config(state=tk.DISABLED)
self.stop_btn.config(state=tk.NORMAL)
self.display_message("info", "正在建立连接...")
def stop_connection(self):
self.stop_event.set()
self.start_btn.config(state=tk.NORMAL)
self.stop_btn.config(state=tk.DISABLED)
self.display_message("info", "正在断开连接...")
def process_messages(self):
while not self.msg_queue.empty():
msg_type, message = self.msg_queue.get()
self.display_message(msg_type, message)
self.root.after(100, self.process_messages)
def display_message(self, msg_type, message):
self.log_text.config(state=tk.NORMAL)
tag = "error" if msg_type == "error" else "info"
self.log_text.insert(tk.END, message + "\n", tag)
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
# 配置标签样式
self.log_text.tag_config("error", foreground="red")
self.log_text.tag_config("info", foreground="black")
if name == “main”:
root = tk.Tk()
app = WebSocketGUI(root)
root.geometry(“800x600”)
root.mainloop()
打包命令:pyinstaller --onefile --windowed WebsocketTest.py