物联网iot、mqtt协议与华为云平台的综合实践(万字0基础保姆级教程)

发布于:2025-07-21 ⋅ 阅读:(20) ⋅ 点赞:(0)

        本学期的物联网技术与应用课程,其结课设计内容包含:mqtt、华为云、PyQT5和MySQL等结合使用,完成了从华为云配置产品信息以及转发规则,到mqtt命令转发,再到python编写逻辑代码实现相关功能,最后用PyQT5实现面向用户的可视化操作界面,博主将其中的详细流程进行了一个整理和分享,希望可以帮助到大家。

一、华为云平台配置

引言

接下来的内容是基于已经注册华为云账号

首先进入华为云官网,选择 产品 -> IoT物联网 -> 设备接入IoTDA

进入该界面,选择 免费试用 创建实例,其步骤较简单,大家可以参考其他资料

创建好实例后点击进入

认证后,华为云获取ak/sk的步骤文档:

获取AK/SK_AK/SK签名认证操作指导_API签名指南_API签名指南-华为云 (huaweicloud.com)https://support.huaweicloud.com/devg-apisign/api-sign-provide-aksk.html

测试ak/sk的python代码,命名为:ak_sk_data

# -*- coding: utf-8 -*-

def get_ak():
    ak = ' '
    return ak

def get_sk():
    sk = ' '
    return sk

def print_hello(name):
    print('{}欢迎您!'.format(name))

def print_hello2(name='智能温度系统'):
    print('{}欢迎您!'.format(name))

if __name__=='__main__':
    print(get_ak())
    
    print(get_sk())
    
    print_hello('智能温度系统')
    
    print_hello2()

1.1 创建产品

创建好实例后,点击左侧导航栏,点击 创建产品

创建产品的具体参数,选择MQTT协议,JSON格式

1.2 添加服务

创建产品后,点击 详情

进入产品详情界面,点击 自定义产品(此处定义产品后,界面改变,故产品不一致)

点击后为此界面,根据需求定义服务的各参数

根据产品功能设置其对应的 属性(MQTT传输的参数),此处为空气温度,数据类型设置为 可读可写 ,后续需要通过MQTT写入

根据以上步骤配置后,如图所示

1.3 添加设备

产品创建后,创建设备,点击左侧导航栏 设备 ->  所以设备 -> 注册设备

注册设备参数如下所示,设备会继承所属产品的属性,自定义密钥

点击确定后,显示设备创建成功

1.4 制定规则

接下来我们制定规则,以此实现MQTT数据转发

点击左侧导航栏 规则 -> 数据转发 -> 创建规则

首先设置转发数据基本信息,数据来源选择 设备属性 ,触发事件选择 设备属性上报

设置添加转发目标,转发目标选择 MQTT推送消息队列自定义推送Topic名称,一般是与产品设备相关

最好创建启动规则即可

二、mqtt的使用 

2.1 mqtt三元组

通过mqtt三元组与华为云平台设备连接,以下是获取网址:

Huaweicloud IoTDA Mqtt ClientId GeneratorSource code generated using layoutit.comhttps://iot-tool.obs-website.cn-north-4.myhuaweicloud.com/Deviced是设备id,DeviceSecret是创建设备时设置的密码

此外还需获取接入信息的 MQTTS 的接入地址,如下图所示:

左侧导航栏:总览 -> 接入信息 -> 设备接入 -> MQTT接入地址

2.2 mqtt配置

接下来我们以MQTT.fx为例进行配置

自定义Proflie Name名称,Profile Type选择 MQTT Broker

Broker Address粘贴 MQTTS接入地址,Broker Port选择 1883

后面三个依次填入 三元组生成的信息

在配置好后保存,回到首页,点击Connect进行连接

若连接成功,右侧会解锁亮绿灯,且华为云平台也会亮灯显示设备在线,若失败则红灯

2.3 命令发送

在通过MQTT.fx成功与华为云平台连接后,我们查看参数,准备进行命令下发

如下图所示,返回产品详情,服务为 Bulb,其属性参数为 brightness 和 color_temp

接下来,我们配置数据上报

$oc/devices/{设备id}/sys/properties/report
{
  "services": [
    {
      "service_id": "XXX",
      "properties": {
        "属性名称1": XXX,
        "属性名称2": XXX
      }
    }
  ]
}

如图填入后,点击Publish进行上报

左侧导航栏选择设备,接入详情刷新即可查看上报的数据信息

如上图所示,已经成功上报数据

三、MySQL的创建

在数据库内创建数据表并设置相关参数

-- 创建数据库
CREATE DATABASE IF NOT EXISTS XXX;

-- 使用数据库
USE XXX;

-- 创建 XXX 表(带自动更新时间)
CREATE TABLE IF NOT EXISTS air (
    XXX INT,
    XXX INT,
    student_id INT DEFAULT 123,
    student_name VARCHAR(50) DEFAULT '小明',
    -- 自动更新时间
    record_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

四、PyQT5的配置与简单使用

4.1 创建虚拟环境

可以先创建一个虚拟环境,专门用于此项目库和包的下载安装

详情可以参考此博客,主页也包含内核切换和pycharm配置版本conda等常见问题anaconda虚拟环境的安装_vmware安装anaconda-CSDN博客文章浏览阅读428次。本文指导如何在Windows系统中通过AnacondaPrompt设置PyTorch的CPU或GPU版本环境,遇到问题时推荐观看土堆教程的详细视频链接。 https://blog.csdn.net/m0_73544543/article/details/137649088?spm=1001.2014.3001.5502

4.2 下载pyqt5相关包

下载pyqt5

pip install PyQt5-i https://pypi.tuna.tsinghua.edu.cn/simple

下载PyQt5 Tools

pip install PyQt5-tools-i https://pypi.tuna.tsinghua.edu.cn/simple

下载PyQtChart

pip install PyQtChart

下载PyMySQL

pip install PyMySQL

下载pyserial

pip install pyserial

4.3 Pycharm配置PyQT5相关工具

在pycharm中点击 File,选择 Settings 进入设置

选择工具 Tools,选择 External Tools

新建工具,点击 +号,选择program启动程序的路径

找到安装路径,例如

其他PyUIC以及PyRcc插件同理,注意后两框的填入

$FileName$-o $FileNameWithoutExtension$.py
$FileDir$

4.4 使用QT Designer

菜单栏点击 Tools -> External Tools -> Qt Designer

启动后创建项目即可,博客在此选择的是 Widget 进行项目创建,左侧是控件可直接进行拖拽,使用方法类似于Java GUI,对应着学习即可

4.5 使用PyUIC

在保存QT Designer的项目后,会生成一个ui文件,此时需要通过PyUIC将ui文件转化为py文件

右击ui文件,选择External Tools,点击PyUIC等待转换即可,会生成一个同名的py文件

4.6 使用PyRcc

在项目文件夹下创建一个image文件夹,将图片文件放入,并创建一个qrc文件

格式如下:

<RCC>
  <qresource prefix="xxx">
    <file>XXX.png</file>
  </qresource>
</RCC>

创建完整后通过PyRcc工具进行转化,步骤同PyUIC

将qrc文件转化为可识别的py文件,将图片转为十六进制格式,如图所示

然后在QT中使用,在资源浏览器中选择画笔

进入编辑资源中,点击红框按钮,在文件夹中选择相应的qrc文件

再按照下图对应顺序配置图片路径

在配置完成后进行使用,例如我们在此使用label控件,右击选择 改变多信息文本

选择图片资源,选择配置的图片即可

4.7 测试程序

我们通过以下代码进行测试:

 from PyQt5 import QtWidgets, QtGui
 import sys
 app = QtWidgets.QApplication(sys.argv)
 window = QtWidgets.QWidget();
 window.show()
 sys.exit(app.exec_())

测试成功

五、测试

5.1 影子设备调试

此外我们可以通过影子设备进行调试,我们在搜索框搜索:API Explorer,选择 设备接入IOTDA

进入后,在设备接入选择设备影子下的查询设备影子数据,选择只看必选项

instance_id为实例id,project_id为“我的凭证”中的项目id,device_id为设备id

在配置成功后再次进行数据上报可得,右侧为详细信息

5.2 代码测试python与华为云连接

在测试前,想要安装相应的库实现python连接华为云

安装核心库

pip install huaweicloudsdkcore

安装IoTDA服务库

pip install huaweicloudsdkiotda

华为云提供的官方使用指导指南:

应用侧Python SDK使用指南_应用侧SDK_SDK参考_设备接入 IoTDA-华为云物联网平台提供Python语言的应用侧SDK供开发者使用。本文介绍应用侧 Python SDK的安装和配置,及使用Python SDK调用应用侧API的示例。访问Python官网,下载并按说明安装Python开发环境。华为云应用侧 Python SDK 支持 Python3 及以上版本。访问pip官网,下载并按说明安装pip工具。执行如下https://support.huaweicloud.com/sdkreference-iothub/iot_10_10003.html根据官网提供的模板:

各参数填入的具体步骤在上方 # 号中给出

# -*- coding: utf-8 -*-
import ak_sk_data
import json
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkcore.region.region import Region
from huaweicloudsdkiotda.v5 import *
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkcore.auth.credentials import DerivedCredentials

if __name__ == "__main__":
    # 认证用的ak和sk直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全;
    # 本示例以ak和sk保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。
    ak = ak_sk_data.get_ak()
    sk = ak_sk_data.get_sk()
    #右上角用户,选择 我的凭证 - API凭证 - 项目ID(所属区域对应)
    project_id = " "
    # region_id:如果是上海一,请填写"cn-east-3";如果是北京四,请填写"cn-north-4";如果是华南广州,请填写"cn-south-1"
    region_id = "cn-north-4"
    # endpoint:请在控制台的"总览"界面的"平台接入地址"中查看"应用侧"的https接入地址
    endpoint = " "

    # 标准版/企业版:需自行创建Region对象
    REGION = Region(region_id, endpoint)

    # 创建认证
    # 创建BasicCredentials实例并初始化
    credentials = BasicCredentials(ak, sk, project_id)

    # 标准版/企业版需要使用衍生算法,基础版请删除配置"with_derived_predicate"
    credentials.with_derived_predicate(DerivedCredentials.get_default_derived_predicate())

    # 基础版:请选择IoTDAClient中的Region对象 如: .with_region(IoTDARegion.CN_NORTH_4)
    # 标准版/企业版:需要使用自行创建的Region对象
    # 配置是否忽略SSL证书校验, 默认不忽略: .with_http_config(HttpConfig(ignore_ssl_verification=True)) \
    client = IoTDAClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(REGION) \
        .build()

    try:
        '''
        # 实例化请求对象
        request = ListDevicesRequest()
        # 调用查询设备列表接口
        response = client.list_devices(request)
        '''
        request = ShowDeviceShadowRequest()
        #设备id
        request.device_id = " "
        response = client.show_device_shadow(request)

        # 将响应对象转换为字典
        response_dict = json.loads(str(response))
        # 提取shadow数据
        shadow_data = response_dict["shadow"][0]
        # 填入产品属性参数
        # 例如博主是提取brightness,  color_temp
        brightness = shadow_data["reported"]["properties"]["brightness"]
        color_temp = shadow_data["reported"]["properties"]["color_temp"]

        # 打印提取的值
        print("完整的响应数据:", response_dict)
        print("亮度值(brightness):", brightness)
        print("色温值(color_temp):", color_temp)

    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)

运行结果为:

5.3 代码测试mqtt与华为云连接

安装mqtt的依赖

pip install paho-mqtt==2.0.0

根据华为云文档构建 ClientConf、MqttClient、MqttDemo三个py文件

Python Demo使用说明_使用MQTT转发_数据转发至第三方应用_规则引擎_用户指南_设备接入 IoTDA-华为云本文以Python语言为例,介绍应用通过MQTTS协议接入平台,接收服务端订阅消息的示例。熟悉Python语言开发环境配置,熟悉Python语言基本语法。本示例使用了Python 3.8.8版本。本示例使用的Python语言的Mqtt依赖为paho-mqtt(本示例使用版本为2.0.0),可以通过以下命令下载依赖。ClientConf代码https://support.huaweicloud.com/usermanual-iothub/iot_01_00115.html

其中MQTTS8883接入凭证获取方式为:

MQTT客户端接入说明_使用MQTT转发_数据转发至第三方应用_规则引擎_用户指南_设备接入 IoTDA-华为云在调用创建规则触发条件、创建规则动作和修改规则触发条件配置并激活规则后,您需要参考本文将MQTT客户端接入物联网平台,成功接入后,在您的服务端运行MQTT客户端,即可接收订阅的消息。MQTT客户端接入物联网平台的连接地址和连接认证参数说明如下:MQTT接入域名每个账号会自动生成,请前往控制台-接入信息页面获取。接入信息-应用侧MQTT接入https://support.huaweicloud.com/usermanual-iothub/iot_01_00113.html其中MqttDemo的具体配置为:

from ClientConf import ClientConf
from MqttClient import MqttClient
import os
from typing import Optional

def main():
    client_conf = ClientConf()
    # host-id:总览 -> 接入信息 -> 应用接入 -> MQTTS(8883)
    client_conf.host = " "
    client_conf.port = 8883

    # 规则 -> 数据转发 -> 详情 -> 2设置转发目标 -> 配置:推送Topic
    client_conf.topic = " "

    # MQTTS8883接入凭证的key和code
    client_conf.access_key = " "
    client_conf.access_code = " "

    #instance_id:总览 -> 详情 -> 左上角ID
    client_conf.instance_id = " "
    mqtt_client = MqttClient(client_conf)
    if mqtt_client.connect() != 0:
        print("init failed")
        return

if __name__ == "__main__":
    main()

运行结果如下图所示,成功连接

连接成功后,我们再次提供MQTT.fx进行数据上报(命令在2.3 命令发送 中有模板)

发送后,成功接收,实时更新数据

5.4 代码测试mysql连接

在上述代码基础上完善

在 MqttClient.py 中修改 on_message 方法

    def _on_message(self, client, userdata, message: mqtt.MQTTMessage):
        msg = message.payload.decode()
        print("topic " + self.__topic + " Received message: " + msg)

        # 解析 JSON 数据
        try:
            data = json.loads(msg)
            properties = data["notify_data"]["body"]["services"][0]["properties"]

            # 提取需要的参数
            smoke_value = properties.get("smoke_value")

            # 如果有回调函数,则调用(检查是否存在)
            if hasattr(self, "on_message_callback") and self.on_message_callback:
                self.on_message_callback({
                    "smoke_value": smoke_value
                })
        except Exception as e:
            print("Failed to parse message:", e)

创建 mysqlset.py 建立数据库连接

import pymysql

def get_connection(database_name):
    try:
        connection = pymysql.connect(
            host='127.0.0.1',
            user=' ',
            password=' ',
            database=' ',  # 使用传入的数据库名
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        return connection
    except pymysql.MySQLError as e:
        print("数据库连接失败:", e)
        return None

在 MqttDemo.py 文件中添加 mysql 语句,实现mqtt发送数据插入数据库

from ClientConf import ClientConf
from MqttClient import MqttClient
import os
from typing import Optional
from mysqlset import get_connection

def handle_message(data):
    print("Received data:")
    print("smoke_value:", data["smoke_value"])

    # 插入数据库
    insert_smoke_data(data)

def insert_smoke_data(data):
    """插入数据到 smoke 表"""
    conn = None
    try:
        conn = get_connection("smoke")  # 替换为你的数据库名
        cursor = conn.cursor()

        # SQL 插入语句
        sql = """
        INSERT INTO smoke (smoke_value)
        VALUES (%s)
        """

        # 执行 SQL
        cursor.execute(sql, (
            data["smoke_value"]
        ))

        conn.commit()
        print("数据插入成功!")
    except Exception as e:
        print("数据库插入失败:", e)
    finally:
        if conn:
            conn.close()
def main():
    # 同上
    client_conf = ClientConf()
    client_conf.host = " "
    client_conf.port = 8883
    client_conf.topic = " "
    client_conf.access_key = " "
    client_conf.access_code = " "
    client_conf.instance_id = " "

    mqtt_client = MqttClient(client_conf)
    mqtt_client.on_message_callback = handle_message  # 设置回调函数

    if mqtt_client.connect() != 0:
        print("init failed")
        return


if __name__ == "__main__":
    main()

运行结果为

六、相关功能实现

自此所有平台的连接已经实现,接下来给大家分享一下博主在此基础上构建的结课设计项目

6.1 概述

6.1.1 项目内容

        根据对物联网架构的理解,依据物联网开发流程,基于华为云平台,实现 设备侧、平台侧及应用侧的开发。根据既定的物联网方案设计,自行选用适合 的开发技术,搭建实验环境,设计并实现一个物联网应用系统。

        提升要求:能够根据所选定应用场景,调研实际需求,设计符合需要的逻 辑合理的功能业务,能够利用人工智能技术智能化处理数据或者大模型的智能 功能,具有一定创新性。

6.1.2 项目要求

        本次项目根据所选用场景进行设备选型,采用 Mqtt.fx 工具模拟终端设备 向华为云平台实现数据上报并进行数据订阅(设备侧采用真实硬件更佳)。采用 华为云物联网平台应用侧的接口获取设备侧上报到平台的设备影子数据,利用 mqtt 协议实现数据转发,并能够进行设备管理等功能。

        可选用Python、Web、Android等技术实现图形化交互界面设计。

6.1.3 项目方案(实现功能)

        基于Python/web/Android 的环境安全监控管理应用系统 本系统需实现两个及以上的设备(烟感模块及自定义模块)的管理:参考 功能如下:

        1 实现华为云平台烟感模块及自定义模块的产品创建和设备注册(含自己 的学号)。 2 通过Mqtt.fx模拟烟感设备及自定义设备进行数据上报。 3 设计基于 pyqt5 的应用界面,展示云平台设备影子数据获取,实现设备 管理,展示设备列表,利用数据转发功能实现数据实时获取等。 4 烟感及自定义模块异常数据处理:应用界面可以展示设备情况,如正常 与否,亦可通过应用界面报警。 5 通过应用界面手动下发报警命令,设备侧能够接收到报警命令。

        6 烟感及自定义设备异常数据存储及查看。(选做) 7 利用人工智能技术使应用具有智能化处理功能。(选做) 8.自定义。

6.2 项目介绍

        博主在基于上述项目内容和要求,实现除订阅下发报警命令外所有功能

        本系统采用模块化设计,主要包含四大功能模块:环境监测模块、数据分析模块、异常预警模块和视觉识别模块。

        环境监测模块负责实时采集空气温湿度、光照强度以及土壤温湿度、PH值、EC值等多维度数据,通过传感器网络实现全天候监控;数据分析模块提供数据可视化功能,支持折线图、柱状图、饼图等多种图表展示,并具备历史数据查询和条件筛选能力,帮助用户全面掌握农业生产环境变化趋势。

        异常预警模块基于预设阈值和智能算法,自动检测环境参数异常,当温度、湿度等关键指标超出安全范围时,系统会触发声光报警机制,并通过可视化界面突出显示异常数据。视觉识别模块采用YOLOv8目标检测算法,可实时识别监控画面中的特定物体(如农药瓶、玻璃容器等),当检测到目标且置信度超过60%时,系统会立即启动蜂鸣器报警,有效预防潜在风险。各模块通过统一的数据总线进行通信,确保系统各部分协同工作,共同构成完整的智慧农业管理解决方案。

6.3 项目展示

项目首页:

数据监测界面:

异常数据监测界面:

在线识别界面:

关于我们:

6.4 部分功能实现代码

异常数据阈值判定

def filter_abnormal_data(table_type):
    """
    通用异常数据过滤函数
    :param table_type: 'air' 或 'soil'
    :return: 插入的记录数
    """
    # 定义各表的异常阈值
    thresholds = {
        'air': {'temp': 70, 'humi': 70, 'lux': 70},
        'soil': {'temp': 70, 'humi': 70, 'ph': 70, 'ec': 70, 'co2': 70}
    }

    try:
        connection = mysqlset.get_connection('agro')
        if not connection:
            raise Exception("数据库连接失败")

        with connection.cursor() as cursor:
            # 1. 检查目标表是否存在
            target_table = f'error_{table_type}'
            cursor.execute(f"SHOW TABLES LIKE '{target_table}';")
            if not cursor.fetchone():
                cursor.execute(f"SHOW CREATE TABLE {table_type};")
                create_sql = cursor.fetchone()['Create Table'].replace(
                    f'CREATE TABLE `{table_type}`',
                    f'CREATE TABLE `{target_table}`'
                )
                cursor.execute(create_sql)
            # 2. 构建动态SQL条件
            conditions = " OR ".join([
                f"{field} > {threshold}"
                for field, threshold in thresholds[table_type].items()
            ])
            # 3. 执行数据过滤插入
            sql = f"""
            INSERT INTO {target_table}
            SELECT * FROM {table_type} 
            WHERE ({conditions})
            AND id NOT IN (SELECT id FROM {target_table})
            """
            cursor.execute(sql)
            connection.commit()
            return cursor.rowcount

    except Exception as e:
        print(f"过滤{table_type}数据失败:", e)
        if connection:
            connection.rollback()
        return 0
    finally:
        if connection:
            connection.close()

部分可视化图表

  def create_pie_chart_view(self, parent=None):
        """创建饼状图,显示各参数占比"""
        soil_data = self.get_all_soil_data()
        if not soil_data:
            return QtWidgets.QLabel("无数据可用",parent)

        latest_data = soil_data[0]

        series = QPieSeries()
        # 使用原始值
        series.append(f"温度\n{latest_data['temp']}°C", float(latest_data['temp']))
        series.append(f"湿度\n{latest_data['humi']}%", float(latest_data['humi']))
        series.append(f"PH值\n{latest_data['ph']}", float(latest_data['ph']))
        series.append(f"EC值\n{latest_data['ec']}", float(latest_data['ec']))
        series.append(f"CO2\n{latest_data['co2']}ppm", float(latest_data['co2']))

        # 设置切片颜色
        colors = [
            QtGui.QColor(255, 0, 0),  # 红-温度
            QtGui.QColor(0, 0, 255),  # 蓝-湿度
            QtGui.QColor(0, 180, 0),  # 绿-PH值
            QtGui.QColor(180, 0, 180),  # 紫-EC值
            QtGui.QColor(255, 165, 0)  # 橙-CO2
        ]

        for i, slice in enumerate(series.slices()):
            slice.setColor(colors[i])
            slice.setLabelVisible(True)
            slice.setLabel(f"{slice.label()} ({slice.percentage():.1f}%)")
            slice.setLabelArmLengthFactor(0.2)

        chart = QChart()
        chart.addSeries(series)
        chart.setTitle("土壤参数占比")
        chart.legend().setVisible(True)
        chart.legend().setAlignment(Qt.AlignBottom)

        chart_view = QChartView(chart,parent)
        chart_view.setRenderHint(QtGui.QPainter.Antialiasing)
        return chart_view

接收设备参数

    def _on_message(self, client, userdata, message: mqtt.MQTTMessage):
        msg = message.payload.decode()
        print("topic " + self.__topic + " Received message: " + msg)

        # 解析 JSON 数据
        try:
            data = json.loads(msg)
            properties = data["notify_data"]["body"]["services"][0]["properties"]

            # 提取需要的参数
            air_data = {
                "temp": properties.get("air_temp"),
                "humi": properties.get("air_humi"),
                "lux": properties.get("lux")
            }

            soil_data = {
                "temp": properties.get("soil_temp"),
                "humi": properties.get("soil_humi"),
                "ph": properties.get("ph"),
                "ec": properties.get("ec"),
                "co2": properties.get("co2")
            }

            # 如果有回调函数,则调用
            if hasattr(self, "on_message_callback") and self.on_message_callback:
                self.on_message_callback({
                    "air_data": air_data,
                    "soil_data": soil_data
                })
        except Exception as e:
            print("Failed to parse message:", e)
            traceback.print_exc()

构建一个过滤器,检查接收的消息是否符合表中格式,实现air和soil相关参数的区分

    def insert_soil_data(data):
        """插入土壤数据到 soil 表"""
        conn = None
        try:
            # 检查是否有有效的土壤数据
            if data["temp"] is None and data["humi"] is None and data["ph"] is None and data["ec"] is None and data[
                "co2"] is None:
                print("土壤数据全为空,跳过插入")
                return False

            conn = get_connection("agro")
            cursor = conn.cursor()

            sql = """
            INSERT INTO soil (temp, humi, ph, ec, co2, record_time)
            VALUES (%s, %s, %s, %s, %s, %s)
            """
            cursor.execute(sql, (
                data["temp"],
                data["humi"],
                data["ph"],
                data["ec"],
                data["co2"],
                datetime.now()
            ))
            conn.commit()
            print("土壤数据插入成功!")
            return True
        except Exception as e:
            if conn:
                conn.rollback()
            print(f"土壤数据插入失败: {e}\n数据内容: {data}")
            return False
        finally:
            if conn:
                conn.close()

搜索功能

 def search_air_data(self, field, keyword):
        if field not in self.field_map:
            return []

        try:
            with self.connection.cursor() as cursor:
                sql = f"""
                SELECT id, temp, humi, lux, student_id, student_name, record_time 
                FROM air 
                WHERE {self.field_map[field]} LIKE %s
                """
                cursor.execute(sql, (f"%{keyword}%",))
                return cursor.fetchall()
        except Exception as e:
            print("搜索数据失败:", e)
            return []

异常数据触发蜂鸣器报警

    def play_beep_sound():
        """播放蜂鸣声(兼容性改进)"""
        try:
            current_time = time.time()
            # 只有当有新异常数据且超过间隔时间时才播放
            if (current_time - ErrorDataService.last_alert_time < ErrorDataService.ALERT_INTERVAL
                    or not ErrorDataService.has_new_errors):
                return

            ErrorDataService.last_alert_time = current_time
            ErrorDataService.has_new_errors = False  # 重置标志位
            system = platform.system()
            if system == "Windows":
                try:
                    import winsound
                    winsound.Beep(1000, 3000)  # 尝试硬件蜂鸣器
                except:
                    # 改用系统提示音
                    import winsound
                    winsound.PlaySound("SystemExclamation", winsound.SND_ALIAS)
            elif system == "Darwin":
                os.system('afplay /System/Library/Sounds/Ping.aiff')
            else:
                os.system('play -nq -t alsa synth 3 sine 1000')
        except Exception as e:
            print("播放蜂鸣声失败:", e)

目标检测与实时报警

    def _update_frame(self):
        """内部方法:处理每一帧"""
        if self.cap is None or not self.cap.isOpened():
            return

        ret, frame = self.cap.read()
        if not ret:
            self.result_signal.emit("无法从摄像头获取帧")
            return

        # YOLO 检测
        try:
            results = self.model(frame, conf=0.6)
            annotated_frame = results[0].plot()  # 带检测框的图像

            # 处理检测结果
            should_alarm = False  # 标记是否需要报警
            for r in results:
                for box in r.boxes:
                    class_name = self.model.names[int(box.cls)]
                    conf = box.conf.item()
                    result_text = f"检测到: {class_name}, 置信度: {conf:.2f}"
                    self.result_signal.emit(result_text)

                    # 检查是否需要报警
                    if (class_name in ['bottle', 'wine glass']) and conf > 0.6:
                        should_alarm = True

            # 触发或停止蜂鸣器
            if should_alarm and not self.alarm_triggered:
                winsound.Beep(1000, 500)  # 频率1000Hz,持续500ms
                self.alarm_triggered = True
                self.result_signal.emit("检测到目标对象,触发报警!")
            elif not should_alarm and self.alarm_triggered:
                self.alarm_triggered = False

格式转化

 # 转换图像格式并发送信号
            rgb_image = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)
            h, w, ch = rgb_image.shape
            bytes_per_line = ch * w
            qt_image = QImage(rgb_image.data, w, h, bytes_per_line, QImage.Format_RGB888)
            pixmap = QPixmap.fromImage(qt_image)
            self.update_signal.emit(pixmap)
        except Exception as e:
            self.result_signal.emit(f"处理帧时出错: {str(e)}")

        部分代码给大家提供一种思路,详细的教程以及步骤提供给大家,希望本篇博客能够帮助到大家,如果对大家有所帮助,希望可以给博主来个三连


网站公告

今日签到

点亮在社区的每一天
去签到