Python+QT远程控制助手-ver2

发布于:2025-06-16 ⋅ 阅读:(12) ⋅ 点赞:(0)

程序示例精选
Python+QT远程控制助手
如需安装运行环境或远程调试,见文章底部个人QQ名片,由专业技术人员远程协助!

前言

这篇博客针对《Python+QT远程控制助手》编写代码,代码整洁,规则,易读。 学习与应用推荐首选。


文章目录

一、所需工具软件
二、使用步骤
       1. 主要代码
       2. 运行结果
三、在线协助

一、所需工具软件

       1. Python
       2. Pycharm

二、使用步骤

代码如下(示例):
def hash_password(password):
    return hashlib.sha256((password + PASSWORD_SALT).encode('utf-8')).hexdigest()
def send_message(sock, message):
    data = json.dumps(message).encode('utf-8')
    sock.sendall(len(data).to_bytes(4, 'big') + data)
def recv_message(sock):
    numT2 = 0
    length_bytes = sock.recv(4)
    if not length_bytes:
        return None
    length = int.from_bytes(length_bytes, 'big')
    data = b''
    while len(data) < length:
        chunk = sock.recv(min(4096, length - len(data)))
        if not chunk:
            return None
        data += chunk
        numT2 += 1
        print("recv_message : ", numT2)
    return json.loads(data.decode('utf-8'))
class RemoteControlClient(QWidget):
    def __init__(self):
        super().__init__()
    def send_keepalive(self):
        while True:
            time.sleep(30)
            if self.sock:
                try:
                    send_message(self.sock, {
                        'type': 'keepalive',
                        'timestamp': int(time.time())
                    })
                    # print("主控端发送心跳包")
                except Exception as e:
                    print("主控端心跳包发送失败:", e)
                    break
    def connect_remote(self):
        self.target_id = self.id_input.text().strip()
        password = self.pwd_input.text().strip()
        if not self.target_id or not password:
            QMessageBox.warning(self, "错误", "请输入目标ID和密码")
            return
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            self.sock.connect((SERVER_HOST, SERVER_PORT))
            self.sock.settimeout(300)  # 这里加上
        except Exception as e:
            QMessageBox.warning(self, "错误", f"连接服务器失败: {e}")
            return
        # 注册(主控端用随机ID即可)
        #my_id = "CTRL" + str(int(time.time()))[-6:]
        my_id = "CTRL_MAIN"
        send_message(self.sock, {
            'type': 'register',
            'data': {'device_id': my_id, 'password_hash': hash_password(password)},
            'timestamp': int(time.time())
        })
        # 等待注册确认
        while True:
            msg = recv_message(self.sock)
            if msg and msg.get('type') == 'register_ack' and msg.get('status') == 'success':
                break
        #注册成功后启动心跳包
        threading.Thread(target=self.send_keepalive, daemon=True).start()
        # 发起连接请求
        send_message(self.sock, {
            'type': 'peer_message',
            'target_device_id': self.target_id,
            'data': json.dumps({
                'type': 'CONNECT_REQUEST',
                'source_device_id': my_id,
                'password': hash_password(password),
                'timestamp': int(time.time())
            }),
            'timestamp': int(time.time())
        })
        self.label.setText("等待对方接受...")
        threading.Thread(target=self.recv_loop, daemon=True).start()
    def recv_loop(self):
        print("recv_loop")
        while True:
            try:
                msg = recv_message(self.sock)
                if not msg:
                    print("连接断开,准备重连...")
                    self.auto_reconnect()
                    break
                if msg.get('type') == 'peer_message':
                    data = msg.get('data')
                    + data.get('reason', ''))
                        self.connected = False
                    elif dtype == 'SCREEN_DATA':
                        self.show_screen(data)
                    elif dtype == 'SCREEN_DIFF':
                        self.show_screen_diff(data)
                print("recv_loop clsoe")
            except socket.timeout:
                print("接收超时,准备重连...")
                self.auto_reconnect()
                break
            except Exception as e:
                print("recv error:", e)
                self.auto_reconnect()
                break
    def auto_reconnect(self):
        self.label.setText("断线,正在重连...")
        try:
            if self.sock:
                self.sock.close()
        except:
            pass
        time.sleep(3)  # 等待5秒再重连
        self.connect_remote()
    def request_screen(self):
        if self.connected and self.sock:
            send_message(self.sock, {
                'type': 'peer_message',
                'target_device_id': self.target_id,
                'data': json.dumps({'type': 'REQUEST_SCREEN'}),
                'timestamp': int(time.time())
            })
    def show_screen(self, data):
        try:
            img_data = base64.b64decode(data['data'])
            img_bytes = BytesIO(img_data)
            from PIL import Image
            pil_img = Image.open(img_bytes)
            w, h = pil_img.size
            self.last_screen_size = (w, h)
            pil_img = pil_img.convert("RGB")
            arr = np.array(pil_img)
            h, w, ch = arr.shape
            bytes_per_line = ch * w
            qimg = QImage(arr.data, w, h, bytes_per_line, QImage.Format_RGB888)
            pix = QPixmap.fromImage(qimg)
            self.label.setPixmap(pix.scaled(self.label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation))
            self.label.setScaledContents(True)
            # 差异重组用
            self.full_img = arr.copy()
        except Exception as e:
            print("show_screen error:", e)
    def show_screen_diff(self, data):
        try:
            w, h = data.get('width'), data.get('height')
            block_size = data.get('block_size', 256)
            if self.full_img is None or self.full_img.shape[0] != h or self.full_img.shape[1] != w:
                self.full_img = np.zeros((h, w, 3), dtype=np.uint8)
            updated = False
            for blk in data['blocks']:
                bx, by = blk['x'], blk['y']
                bw, bh = blk['w'], blk['h']
                img_data = base64.b64decode(blk['data'])
                from PIL import Image
                pil_img = Image.open(BytesIO(img_data)).convert("RGB")
                arr = np.array(pil_img)
                self.full_img[by:by + bh, bx:bx + bw, :] = arr
                updated = True
            if updated:
                self.last_screen_size = (w, h)
                arr = self.full_img
                h, w, ch = arr.shape
                bytes_per_line = ch * w
                qimg = QImage(arr.data, w, h, bytes_per_line, QImage.Format_RGB888)
                pix = QPixmap.fromImage(qimg)
                self.label.setPixmap(pix.scaled(self.label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation))
                self.showT+=1
                print("showT : ",self.showT)
        except Exception as e:
            print("show_screen_diff error:", e)


运行结果

在这里插入图片描述

三、在线协助:

如需安装运行环境或远程调试,见文章底部个人 QQ 名片,由专业技术人员远程协助!

1)远程安装运行环境,代码调试
2)Visual Studio, Qt, C++, Python编程语言入门指导
3)界面美化
4)软件制作
5)云服务器申请
6)网站制作

当前文章连接:https://blog.csdn.net/alicema1111/article/details/132666851
个人博客主页https://blog.csdn.net/alicema1111?type=blog
博主所有文章点这里:https://blog.csdn.net/alicema1111?type=blog

博主推荐:
Python人脸识别考勤打卡系统:
https://blog.csdn.net/alicema1111/article/details/133434445
Python果树水果识别https://blog.csdn.net/alicema1111/article/details/130862842
Python+Yolov8+Deepsort入口人流量统计:https://blog.csdn.net/alicema1111/article/details/130454430
Python+Qt人脸识别门禁管理系统:https://blog.csdn.net/alicema1111/article/details/130353433
Python+Qt指纹录入识别考勤系统:https://blog.csdn.net/alicema1111/article/details/129338432
Python Yolov5火焰烟雾识别源码分享:https://blog.csdn.net/alicema1111/article/details/128420453
Python+Yolov8路面桥梁墙体裂缝识别:https://blog.csdn.net/alicema1111/article/details/133434445
Python+Yolov5道路障碍物识别:https://blog.csdn.net/alicema1111/article/details/129589741
Python+Yolov5跌倒检测 摔倒检测 人物目标行为 人体特征识别:https://blog.csdn.net/alicema1111/article/details/129272048


网站公告

今日签到

点亮在社区的每一天
去签到