华为云设备接入

发布于:2025-02-27 ⋅ 阅读:(11) ⋅ 点赞:(0)

原生接入方式

支持设备通过MQTT、HTTP、LWM2M、CoAP、WebSocket、QUIC等通用通信协议对接物联网平台。


MQTT(S)协议接入

概述

MQTT消息由固定报头(Fixed header)、可变报头(Variable header)和有效载荷(Payload)三部分组成。

其中固定报头(Fixed header)和可变报头(Variable header)格式的填写请参考MQTT标准规范,有效载荷(Payload)的格式(须使用UTF-8编码格式)由应用定义,即由设备和物联网平台之间定义

常见MQTT消息类型主要有CONNECT、SUBSCRIBE、PUBLISH。

  • CONNECT:指客户端发起与服务端的连接请求。有效载荷(Payload)的主要参数,参考设备连接鉴权填写。
  • SUBSCRIBE:指客户端发起订阅的请求。有效载荷(Payload)中的主要参数“Topic name”,参考Topic定义中订阅者为设备的Topic。
  • PUBLISH:平台发布消息。
    • 可变报头(Variable header)中的主要参数“Topic name”,指当设备上报到物联网平台,发布者为设备时所对应的Topic。详细请参考Topic定义
    • 有效载荷(Payload)中的主要参数为完整的数据上报和命令下发的消息内容,目前是一个JSON对象。
业务流程
img
  1. 设备接入前,需创建产品(可通过控制台创建或者使用应用侧API创建产品)。
  2. 产品创建完毕后,需注册设备(可通过控制台注册单个设备或者使用应用侧API注册设备创建)。
  3. 设备注册完毕后,可以按照图中流程实现消息/属性上报、接收命令/属性/消息、OTA升级、自定义Topic等功能。关于平台预置Topic可参考Topic定义
具体接入步骤

您可以通过mqtt.fx进行原生协议接入调测,可以参考快速体验mqtt接入

上述内容只列出相关重要内容,详细内容查看:

MQTT/MQTTS协议接入:https://support.huaweicloud.com/usermanual-iothub/iot_01_0128.html


HTTPS协议接入

概述

HTTPS是基于HTTP协议,通过SSL加密的一种安全通信协议。物联网平台支持HTTPS协议通信。

业务流程
  1. 设备接入前,需创建产品(可通过控制台创建或者使用应用侧API创建产品)。

  2. 产品创建完毕后,需注册设备(可通过控制台注册单个设备或者使用应用侧API注册设备创建)。

  3. 设备注册完毕后,通过设备鉴权接口获取设备的access_token。
    点击放大

  4. 获取到access_token之后,可以使用消息/属性上报等功能。其中access_token放于消息头中,下面示例为上报属性:

    点击放大
    点击放大

具体接入步骤

HTTPS协议接入:

https://support.huaweicloud.com/usermanual-iothub/iot_01_00129.html


LwM2M/CoAP协议接入

概述

LwM2M(Lightweight M2M,轻量级M2M),由开发移动联盟(OMA)提出,是一种轻量级的、标准通用的物联网设备管理协议,可用于快速部署客户端/服务器模式的物联网业务。LwM2M为物联网设备的管理和应用建立了一套标准,它提供了轻便小巧的安全通信接口及高效的数据模型,以实现M2M设备管理和服务支持。物联网平台支持加密与非加密两种接入设备接入方式,其中加密业务数据交互端口为5684端口,采用DTLS+CoAP协议通道接入,非加密端口为5683,接入协议为CoAP。物联网平台从安全角度考虑,强烈建议采用安全接入方式。

具体步骤

流程与步骤和上述两种基本相同。

物联网平台的Endpoint请参见:地区和终端节点

使用“设备接入-> CoAP (5683)| CoAPS (5684)”对应的Endpoint,端口为5683(非加密接入方式)或者5684(加密接入方式)。

上述内容只列出相关重要内容,详细内容查看:

LwM2M协议接入:https://support.huaweicloud.com/usermanual-iothub/iot_01_0138.html


泛协议接入

概述

目前平台支持基于MQTT/HTTP/LwM2M等标准协议接入,为解决用户自定义协议设备快速接入IoT平台的诉求。华为云IoT提供泛协议适配机制,您可使用泛协议对接SDK,快速构建协议插件,进行设备或平台与IoT的双向数据通信。泛协议插件开发指导可参见泛协议插件开发

使用场景

  • 设备只支持某种类型协议,而平台目前不支持该协议。

  • 设备与其接入服务器之间已有通信网络,您希望在不修改设备和协议的情况下,将设备接入IoT平台。

  • 由于设备硬件资源或者网络限制,设备无法直接接入IoT平台。

架构框图

img

协议转换网关是一个网关,可以部署在云上或者本地。第三方协议设备作为协议转换网关的子设备接入平台。

协议转换网关一般由三部分组成:

  1. **第三方协议接入。**完成第三方协议的解析,鉴权。
  2. **协议转换。**负责完成第三方协议数据和平台格式数据的互相转换。
    • 上行:把第三方协议数据转成平台格式数据,并调用设备SDK接口进行上报。
    • 下行:收到平台下行数据时,转换为第三方协议数据转发给第三方协议设备。
  3. **设备SDK。**即平台提供的设备接入SDK,提供了网关的通用功能实现,用户可以在此基础上实现自己的网关。

业务流程

  1. 实现泛协议设备接入一般流程

    img
  2. 基于华为云SDK实现泛协议具体流程

img
  1. 在物联网平台上注册网关,详细方法请参考设备注册
  2. 网关上电,连接到平台,连接所需的鉴权参数在注册网关时获取。
  3. 用户在平台上注册子设备时,平台下发添加子设备事件到网关。网关收到后,保存子设备信息到本地并持久化(SDK提供了默认的持久化实现,用户可以自定义扩展)。
  4. 第三方协议设备连接到网关。网关根据子设备信息对设备进行鉴权。
  5. 设备上报数据到网关。网关转换为平台格式数据后,调用SDK的上报子设备属性/消息 的接口上报给平台。
  6. 平台向设备下发命令。网关收到后,转换为第三方协议,转发给子设备。设备收到后对命令进行处理。

具体接入步骤

以下代码以java举例,具体的内容/代码 可根据文末网址到GitHub sdk相关页面查看

1. 搭建网桥,用于udp设备接入网桥、与云平台进行交互

使用协议网桥来实现UDP协议设备接入。网桥需要为每个UDP设备创建一个客户端(IotClient),使用设备的身份和平台进行通讯。网桥功能包括:

  • 初始化网桥SDK

        public void init() {
    
            // 网桥启动初始化
            BridgeBootstrap bridgeBootstrap = new BridgeBootstrap();
    
            // 从环境变量获取配置进行初始化
            bridgeBootstrap.initBridge();
    
            bridgeClient = bridgeBootstrap.getBridgeDevice().getClient();
    
            // 设置平台下行数据监听器
            DownLinkHandler downLinkHandler = new DownLinkHandler();
            bridgeClient.setBridgeCommandListener(downLinkHandler)   // 设置平台命令下发监听器
                .setBridgeDeviceMessageListener(downLinkHandler)    // 设置平台消息下发监听器
                .setBridgeDeviceDisConnListener(downLinkHandler);   // 设置平台通知网桥主动断开设备连接的监听器
        }
    
  • 设备登录上线

    private void login(Channel channel, BaseMessage message) {
        if (!(message instanceof DeviceLoginMessage)) {
        return;
        }
    
        String deviceId = message.getMsgHeader().getDeviceId();
        String secret = ((DeviceLoginMessage) message).getSecret();
        DeviceSession deviceSession = new DeviceSession();
    
        int resultCode = BridgeService.getBridgeClient().loginSync(deviceId, secret, 5000);
    
        // 登录成功保存会话信息
        if (resultCode == 0) {
        deviceSession.setDeviceId(deviceId);
        deviceSession.setChannel(channel);
        DeviceSessionManger.getInstance().createSession(deviceId, deviceSession);
        NettyUtils.setDeviceId(channel, deviceId);
        }
    
        // 构造登录响应的消息头
        MsgHeader msgHeader = new MsgHeader();
        msgHeader.setDeviceId(deviceId);
        msgHeader.setFlowNo(message.getMsgHeader().getFlowNo());
        msgHeader.setDirect(Constants.DIRECT_CLOUD_RSP);
        msgHeader.setMsgType(Constants.MSG_TYPE_DEVICE_LOGIN);
    
        // 调用网桥login接口,向平台发起登录请求
        DefaultActionListenerImpl defaultLoginActionListener = new DefaultActionListenerImpl("login");
        BridgeService.getBridgeClient()
        .loginAsync(deviceId, secret, message.getMsgHeader().getFlowNo(),
        defaultLoginActionListener);
    }
    
  • 设备数据上传

    private void reportProperties(Channel channel, BaseMessage message) {
        String deviceId = message.getMsgHeader().getDeviceId();
        DeviceSession deviceSession = DeviceSessionManger.getInstance().getSession(deviceId);
        if (deviceSession == null) {
            log.warn("device={} is not login", deviceId);
            sendResponse(channel, message, 1);
            return;
        }
    
        ServiceProperty serviceProperty = new ServiceProperty();
        serviceProperty.setServiceId("Location");
        serviceProperty.setProperties(
            JsonUtil.convertJsonStringToObject(JsonUtil.convertObject2String(message), Map.class));
    
        // 调用网桥reportProperties接口,上报设备属性数据
        BridgeService.getBridgeClient()
            .reportProperties(deviceId, Collections.singletonList(serviceProperty), new ActionListener() {
                @Override
                public void onSuccess(Object context) {
                    sendResponse(channel, message, 0);
                }
    
                @Override
                public void onFailure(Object context, Throwable var2) {
                    log.warn("device={} reportProperties failed: {}", deviceId, ExceptionUtil.getBriefStackTrace(var2));
                    sendResponse(channel, message, 1);
                }
            });
    }
    
  • 平台指令下发

        @Override
        public void onCommand(String deviceId, String requestId, BridgeCommand bridgeCommand) {
            log.info("onCommand deviceId={}, requestId={}, bridgeCommand={}", deviceId, requestId, bridgeCommand);
            DeviceSession session = DeviceSessionManger.getInstance().getSession(deviceId);
            if (session == null) {
                log.warn("device={} session is null", deviceId);
                return;
            }
    
            // 设置位置上报的周期
            if (Constants.MSG_TYPE_FREQUENCY_LOCATION_SET.equals(bridgeCommand.getCommand().getCommandName())) {
                processLocationSetCommand(session, requestId, bridgeCommand);
            }
        }
    
  • 设备离线

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            String deviceId = NettyUtils.getDeviceId(ctx.channel());
            if (deviceId == null) {
                return;
            }
            DeviceSession deviceSession = DeviceSessionManger.getInstance().getSession(deviceId);
            if (deviceSession == null) {
                return;
            }
    
            // 调用网桥的logout接口,通知平台设备离线
            DefaultActionListenerImpl defaultLogoutActionListener = new DefaultActionListenerImpl("logout");
            BridgeService.getBridgeClient()
                .logoutAsync(deviceId, UUID.randomUUID().toString(), defaultLogoutActionListener);
            DeviceSessionManger.getInstance().deleteSession(deviceId);
    
            ctx.close();
        }
    
2. 设备发送udp消息给泛协议插件

要求首条消息是鉴权消息,携带设备标识nodeId

3. 定义解码类

上行数据的消息解码,将原始码流转换为具体对象

/**
 * 上行数据的UDP消息解码器,将原始码流转换为具体对象
 */
public class UdpMessageDecoder extends SimpleChannelInboundHandler<String> {
    private static final Logger log = LogManager.getLogger(UdpMessageDecoder.class);

    /**
     * 处理UDP消息,解码并将结果传递给下一个处理器
     *
     * @param ctx 上下文对象,用于获取通道信息和传递消息
     * @param msg 接收到的原始UDP消息字符串
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        log.info("UdpMessageDecoder msg={}", msg);
        
        BaseMessage message = decodeMessage(msg);
        if (message == null) {
            log.warn("decode message failed");
            return;
        }
        ctx.fireChannelRead(message); // 将解码后的消息传递给下一个处理器
    }

    /**
     * 解码UDP消息
     *
     * @param message 接收到的原始UDP消息字符串
     * @return 解码后的消息对象,如果解码失败则返回null
     */
    private BaseMessage decodeMessage(String message) {
        MsgHeader header = decodeHeader(message); // 解码消息头部
        if (header == null) {
            return null;
        }
        BaseMessage baseMessage = decodeBody(header, message.substring(message.lastIndexOf(",") + 1)); // 解码消息体
        if (baseMessage == null) {
            return null;
        }
        baseMessage.setMsgHeader(header); // 将头部信息设置到消息对象中
        return baseMessage;
    }

    /**
     * 解码消息头部
     *
     * @param message 接收到的原始UDP消息字符串
     * @return 解码后的消息头部对象,如果解码失败则返回null
     */
    private MsgHeader decodeHeader(String message) {
        String[] splits = message.split(Constants.HEADER_PARS_DELIMITER); // 按分隔符拆分头部信息
        if (splits.length <= 4) { // 判断头部信息是否完整
            return null;
        }

        MsgHeader msgHeader = new MsgHeader();
        msgHeader.setDeviceId(splits[0]); // 设置设备ID
        msgHeader.setFlowNo(splits[1]); // 设置流水号
        msgHeader.setMsgType(splits[2]); // 设置消息类型
        msgHeader.setDirect(Integer.parseInt(splits[3])); // 设置方向(整数值)
        return msgHeader;
    }

    /**
     * 根据消息头部解码消息体
     *
     * @param header  解码后的消息头部对象
     * @param body    原始消息体字符串
     * @return 解码后的具体消息对象,如果解码失败则返回null
     */
    private BaseMessage decodeBody(MsgHeader header, String body) {
        switch (header.getMsgType()) { // 根据消息类型选择解码方式
            case Constants.MSG_TYPE_DEVICE_LOGIN:
                return decodeLoginMessage(body); // 解码设备登录消息

            case Constants.MSG_TYPE_REPORT_LOCATION_INFO:
                return decodeLocationMessage(body); // 解码位置上报消息

            case Constants.MSG_TYPE_FREQUENCY_LOCATION_SET:
                return decodeLocationSetMessage(body); // 解码设置定位频率消息

            default:
                log.warn("invalid msgType"); // 非法消息类型
                return null;
        }
    }

    /**
     * 解码设备登录消息
     *
     * @param body 原始消息体字符串
     * @return 解码后的设备登录消息对象
     */
    private BaseMessage decodeLoginMessage(String body) {
        DeviceLoginMessage loginMessage = new DeviceLoginMessage();
        loginMessage.setSecret(body); // 设置登录密钥
        return loginMessage;
    }

    /**
     * 解码位置上报消息
     *
     * @param body 原始消息体字符串
     * @return 解码后的位置上报消息对象
     */
    private BaseMessage decodeLocationMessage(String body) {
        String[] splits = body.split(Constants.BODY_PARS_DELIMITER); // 按分隔符拆分消息体
        DeviceLocationMessage deviceLocationMessage = new DeviceLocationMessage();
        deviceLocationMessage.setLongitude(splits[0]); // 设置经度
        deviceLocationMessage.setLatitude(splits[1]); // 设置纬度
        return deviceLocationMessage;
    }

    /**
     * 解码设置定位频率消息
     *
     * @param body 原始消息体字符串
     * @return 解码后的设置定位频率响应消息对象
     */
    private BaseMessage decodeLocationSetMessage(String body) {
        CommonResponse commonResponse = new CommonResponse();
        commonResponse.setResultCode(Integer.parseInt(body)); // 设置结果码
        return commonResponse;
    }
}
4.监听udp端口并对上行数据解码,通过搭建的网桥转发消息给平台
/**
 * 一个传输字符串数据的UDP server,接收客户端发送的UDP数据包,首条消息是鉴权消息,携带设备标识nodeId。
 * server将收到的消息通过bridge转发给平台。
 */
public class UdpServer {

    private static final Logger log = LogManager.getLogger(UdpServer.class);

    private final int port;

    UdpServer(int port) {
        this.port = port;
    }

    void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioDatagramChannel.class) // 使用NioDatagramChannel处理UDP
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler(new ChannelInitializerImpl());

            log.info("UDP server start......");

            Channel channel = b.bind(port).sync().channel();
            channel.closeFuture().await();
        } finally {
            group.shutdownGracefully();
            log.info("UDP server close");
        }
    }

    private static class ChannelInitializerImpl extends io.netty.channel.ChannelInitializer<NioDatagramChannel> {

        @Override
        protected void initChannel(NioDatagramChannel ch) throws Exception {
            ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
            ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
            ch.pipeline().addLast("handler", new UdpHandler());

            log.info("initChannel: {}", ch.localAddress());
        }
    }

    public static class UdpHandler extends SimpleChannelInboundHandler<DatagramPacket> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
            String msg = packet.content().toString(CharsetUtil.UTF_8);
            InetSocketAddress sender = packet.sender();
            log.info("channelRead0: {}, msg: {}", sender, msg);

            // 如果是首条消息,创建session
            Session session = Bridge.getInstance().getSessionByChannel(sender.toString());
            if (session == null) {
                Bridge.getInstance().createSession(msg, ctx.channel());
            } else {
                session.getDeviceClient().reportDeviceMessage(new DeviceMessage(msg), null);
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            log.error("caught exception: {}", cause.getMessage());
            ctx.close();
            Bridge.getInstance().removeSession(ctx.channel().id().asLongText());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8888; // 监听的UDP端口
        new UdpServer(port).run();
    }
}

附录网址

华为云设备接入:https://support.huaweicloud.com/usermanual-iothub/iot_01_0128.html

  • 原生接入方式

    • MQTT/MQTTS协议接入:https://support.huaweicloud.com/usermanual-iothub/iot_01_0128.html
    • HTTPS协议接入:https://support.huaweicloud.com/usermanual-iothub/iot_01_00129.html
    • LwM2M协议接入:https://support.huaweicloud.com/usermanual-iothub/iot_01_0138.html
  • 泛协议接入方式

    • 通过协议转换网关实现泛协议设备接入:https://support.huaweicloud.com/bestpractice-iothub/iot_bp_0009.html
      • sdk-java:https://github.com/huaweicloud/huaweicloud-iot-device-sdk-java?tab=readme-ov-file#3.14
      • sdk-c:https://github.com/huaweicloud/huaweicloud-iot-device-sdk-c/blob/master/README_CN.md