tcp接发json字符串

发布于:2025-03-29 ⋅ 阅读:(21) ⋅ 点赞:(0)

因工作需要对接硬件设备,需要通过tcp协议接收发送字符串,而字符串里面全是json字符串,登陆用json对象发送,心跳也用json发送,设备检测到信号后自动推送的也是json字符串,只要登陆后心跳就要每过10秒发送一次,而信号的推送则是在登陆后的任意时间发生.每个json与json之间没有换行符分割,所以让deepseek帮着写了能读取嵌套json的解析代码.

因为是个小工具,因此选择python编写,核心代码如下:

import socket 
import json 
import time 
import threading 


class TCPClient:
    def __init__(self, host='127.0.0.1', port=60, fun_exec=None):
        print(f"host={host} port={port}")
        self.host  = host 
        self.port  = port 
        self.client_socket  = None 
        self.heartbeat_interval  = 10  # 心跳间隔10秒 
        self.is_running  = False 
        self.buffer  = "" 
        self.fun_exec = fun_exec
        
    def connect(self):
        """建立TCP连接并发送登录信息"""
        try:
            # 创建TCP套接字 
            self.client_socket  = socket.socket(socket.AF_INET,  socket.SOCK_STREAM)
            # 设置超时时间(单位:秒)
            self.client_socket.settimeout(100)
            # 连接服务器 
            try:
                print('开始连接')
                self.client_socket.connect((self.host,  self.port)) 
                print(f"已连接") 
            except socket.timeout: 
                print("连接超时,请检查网络或服务器状态")
                return False
            except Exception as e:
                print(f"连接失败: {str(e)}")
                return False
            
            # 发送登录信息 
            login_data = {
                "cmd": "1",
                "username": "账号",
                "password": "密码"
            }
            self._send_json(login_data)
            return True 
        except Exception as e:
            print(f"连接失败: {str(e)}")
            return False 
 
    def _send_json(self, data):
        """发送JSON数据"""
        try:
            json_str = json.dumps(data)
            # print(f"发送信息:{data}")
            self.client_socket.sendall(json_str.encode('utf-8')) 
        except Exception as e:
            print(f"发送数据失败: {str(e)}")
            self.reconnect() 
 
    def _heartbeat_task(self):
        """心跳维持线程"""
        if 'all' == global_leida_log:
            print('开始发送心跳')
        while self.is_running: 
            heartbeat_data = {
                "cmd": "2"
            }
            self._send_json(heartbeat_data)
            time.sleep(self.heartbeat_interval) 
 
    def _process_message(self, message):
        """处理接收到的JSON消息"""
        try:
            data = json.loads(message) 
            print(f"收到服务器响应: {data}")
            self.fun_exec(data)
        except json.JSONDecodeError:
            print("收到无效的JSON数据")
 
    def reconnect(self):
        """断线重连机制"""
        self.stop() 
        print("尝试重新连接...")
        while True:
            if self.connect(): 
                self.start() 
                break 
            time.sleep(5) 
 
    def start(self):
        """启动客户端服务"""
        self.is_running  = True 
        # 启动心跳线程 
        threading.Thread(target=self._heartbeat_task, daemon=True).start()
        # 启动接收线程 
        threading.Thread(target=self._receive_handler, daemon=True).start()
 
    def stop(self):
        """停止客户端"""
        self.is_running  = False 
        if self.client_socket: 
            try:
                self.client_socket.close() 
            except:
                pass 
 
    def _receive_handler(self): 
        """接收数据处理器"""
        # print('开始接收数据:')
        while self.is_running: 
            try: 
                data = self.client_socket.recv(4096)  
                if not data: 
                    print("连接已断开") 
                    self.reconnect()  
                    break 
 
                self.buffer  += data.decode('utf-8')  
 
                # 处理缓冲区中的数据 
                while self.buffer:  
                    try: 
                        # 尝试解析 JSON 对象 
                        obj, end_index = self._parse_json(self.buffer)  
                        if obj is not None: 
                            # 处理解析后的 JSON 对象 
                            self._process_message(json.dumps(obj))  
                            # 移除已处理的数据 
                            self.buffer  = self.buffer[end_index:].lstrip()  
                        else: 
                            break 
                    except json.JSONDecodeError: 
                        break 
 
            except Exception as e: 
                print(f"发生错误: {e}") 

    def _parse_json(self, data): 
        stack = [] 
        start_index = None 
        for i, char in enumerate(data): 
            if char == '{': 
                if not stack: 
                    start_index = i 
                stack.append(char)  
            elif char == '}': 
                if stack: 
                    stack.pop()  
                    if not stack: 
                        try: 
                            json_obj = json.loads(data[start_index:i  + 1]) 
                            return json_obj, i + 1 
                        except json.JSONDecodeError: 
                            pass 
        return None, 0 
 

def temp_fun(d):
    print(d)


if __name__ == "__main__":
    client = TCPClient(host='192.168.0.6', port=60, fun_exec=temp_fun)
    if client.connect(): 
        try:
            client.start() 
            # 保持主线程运行 
            while True:
                time.sleep(1) 
        except KeyboardInterrupt:
            client.stop()