C#-mqtt通讯,服务端和客户端,以及esp32-mqtt

发布于:2025-08-30 ⋅ 阅读:(15) ⋅ 点赞:(0)

c#-mqtt服务端

    internal class MqttServer
    {
        private List<string> clientId = new List<string>();
        private IMqttServer mqttServer = new MqttFactory().CreateMqttServer();
        string pwd = "123456";
        string username  = "admin";

        private static MqttServer _Instance = null;
        public static MqttServer Instance
        {
            get
            {
                if (_Instance == null)
                    _Instance = new MqttServer();
                _Instance.MqttServerInit();
                return _Instance;
            }
        }

        void MqttServerInit() {
            try
            {
                mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(ServerStarted);//启动
                mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(ServerStop);//关闭
                mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(GetClientId);//客户端连接
                mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(delClient);//客户端断开连接
                mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(GetMessage);//接收客户端信息
                mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(ClientSubscribed);//客户端订阅
                mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(ClientUnsubscribed);//客户端取消订阅
            }
            catch (Exception ex) {
                MqttLog.m.MqttOpen().Info("Mqtt启动失败:" + ex.Message);
            }

        }



        [Obsolete]
        public void MqttOpen()
        {
            try {
                // 声明一个服务端配置建立
                MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder();
                //绑定IP地址
                mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(Server.LocalIP);
                //绑定端口号
                mqttServerOptionsBuilder.WithDefaultEndpointPort(Server.MqttPort);
                //客户端验证(账号和密码)
                mqttServerOptionsBuilder.WithConnectionValidator(ConnectionValidator);//验证
                                                                                      
                IMqttServerOptions options = mqttServerOptionsBuilder.Build();//将配置建立


                //开启服务
                mqttServer.StartAsync(options);
                

                //停止服务
                //mqttServer.StopAsync();
            }
            catch (Exception ex) {
                MqttLog.m.MqttOpen().Info("Mqtt启动失败2:" + ex.Message);
            }
        }
        public void TimerCallback(object state)
        {
            try
            {
                if (mqttServer.IsStarted)
                {
                    var mqttMessage = new MqttApplicationMessageBuilder()
                    .WithTopic("system/admin")
                    .WithPayload("这是系统主题,每5秒推送是一次")
                    .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                    .Build();
                    mqttServer.PublishAsync(mqttMessage);
                }
            }
            catch (Exception ex)
            {
                MqttLog.m.MqttOpen().Info("推送失败"+ ex.Message);
            }
            // 业务逻辑
        }



        public void StopSetver()
        {
            mqttServer.StopAsync();
        }

        //服务端对客户端验证
        [Obsolete]
        void ConnectionValidator(MqttConnectionValidatorContext context)
        {
            if (context != null && context.Password == pwd && context.Username == username)
            {
                context.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionAccepted;//连接进入
            }else{
                context.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;//连接失败账号或者密码错误
            }
            
        }


        //启动
        void ServerStarted(EventArgs e)
        {
            MqttLog.m.MqttOpen().Info("mqtt服务已开启,等待用户连接...");
            Debug.WriteLine("mqtt服务已开启,等待用户连接...");
        }
        //停止
        void ServerStop(EventArgs e)
        {
            MqttLog.m.MqttOpen().Info("mqtt服务停止...");
            Debug.WriteLine("mqtt服务停止...");
        }
        //客户端连接
        void GetClientId(MqttServerClientConnectedEventArgs e)
        {
            clientId.Add(e.ClientId);
            MqttLog.m.MqttOpen().Info($"客户端:{e.ClientId}已连接");
            Debug.WriteLine($"客户端:{e.ClientId}已连接");
        }
        void delClient(MqttServerClientDisconnectedEventArgs e)
        {
            clientId.Remove(e.ClientId);
            MqttLog.m.MqttOpen().Info($"客户端:{e.ClientId}断开连接");
            Debug.WriteLine($"客户端:{e.ClientId}断开连接");
        }

        //接收客户端信息
        void GetMessage(MqttApplicationMessageReceivedEventArgs message)
        {
            //this.Invoke(new Action(() =>
            //{

            //}));
            MqttLog.m.MqttOpen().Info($"客户端:{message.ClientId}\\n\\n发送:{Encoding.Default.GetString(message.ApplicationMessage.Payload)}");
            Debug.WriteLine($"客户端:{message.ClientId}");
            Debug.WriteLine($"发送:{Encoding.Default.GetString(message.ApplicationMessage.Payload)}");
        }

        //客户端订阅
        void ClientSubscribed(MqttServerClientSubscribedTopicEventArgs e)
        {
            MqttLog.m.MqttOpen().Info($"客户端:{e.ClientId}---订阅{e.TopicFilter.Topic})");
            Debug.WriteLine($"客户端:{e.ClientId}---订阅{e.TopicFilter.Topic})");
        }

        //客户端取消订阅
        void ClientUnsubscribed(MqttServerClientUnsubscribedTopicEventArgs e)
        {
            MqttLog.m.MqttOpen().Info($"客户端:{e.ClientId}---取消订阅{e.TopicFilter})");
            Debug.WriteLine($"客户端:{e.ClientId}---取消订阅{e.TopicFilter})");
        }
    }

esp32客户端;

#设备客户端代码
import network
import time
from umqtt.simple import MQTTClient
import ubinascii
import machine
ssid = '24LOU'
password = '12356789'
# 连接wifi
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
    print('connecting to network...')
    wlan.connect(ssid, password)
    while not wlan.isconnected():
        pass
print('网络配置:', wlan.ifconfig())
# ===== 配置参数 =====
MQTT_BROKER_IP = "192.168.3.79"  # 替换为运行Broker的PC的IP地址
MQTT_PORT = 1883
MQTT_CLIENT_ID = ubinascii.hexlify(machine.unique_id())  # 生成唯一客户端ID
PUBLISH_TOPIC = b"esp32/data"  # 发布消息的主题
SUBSCRIBE_TOPIC = b"esp32/command"  # 订阅消息的主题
# ===== 2. MQTT消息回调函数 =====
def mqtt_callback(topic, msg):
    print(f"收到消息 [主题: {topic.decode()}]: {msg.decode()}")
    # 示例:收到"ON"消息时点亮LED(需硬件支持)
    if topic == SUBSCRIBE_TOPIC and msg == b"ON":
        led = machine.Pin(2, machine.Pin.OUT)  # ESP32内置LED通常对应GPIO2
        led.value(0)
# ===== 3. 连接MQTT Broker并发布消息 =====
def connect_mqtt():
    client = MQTTClient(
        client_id=MQTT_CLIENT_ID,
        server=MQTT_BROKER_IP,
        port=MQTT_PORT
    )
    client.set_callback(mqtt_callback)
    try:
        client.connect()
        print(f"已连接MQTT Broker: {MQTT_BROKER_IP}:{MQTT_PORT}")
        client.subscribe(SUBSCRIBE_TOPIC)
        print(f"已订阅主题: {SUBSCRIBE_TOPIC.decode()}")
        return client
    except Exception as e:
        print("MQTT连接失败:", e)
        return None

# ===== 4. 主循环 =====
def main():
    mqtt_client = connect_mqtt()
    if not mqtt_client:
        return
    publish_count = 0
    try:
        while True:
            # 每5秒发布一次数据
            publish_count += 1
            print('------------------5---------------------')
            message = f"Hello Broker! 计数: {publish_count}"
            mqtt_client.publish(PUBLISH_TOPIC, message.encode())
            print(f"已发布: {PUBLISH_TOPIC.decode()} -> {message}")
            # 检查订阅消息(非阻塞)
            mqtt_client.check_msg()
            time.sleep(5)  # 降低CPU占用
    finally:
        mqtt_client.disconnect()
        print("MQTT连接已断开")

if __name__ == "__main__":
    main()

mqtt客户端调试通讯工具