微信小程序通过http通信控制庐山派

发布于:2025-02-19 ⋅ 阅读:(17) ⋅ 点赞:(0)

首先要形成同一个局域网才能够实现

这是微信小程序的代码

Page({

  data: {

    isConnected: false, // WiFi连接状态

    serverUrl: 'http://192.168.1.225:8081/command' // 服务器地址

  },

  onLoad() {

    this.authorizeLocation();

  },

  // 授权地理位置

  authorizeLocation() {

    wx.authorize({

      scope: 'scope.userLocation',

      success: () => {

        this.startWifi();

      },

      fail: () => {

        wx.showToast({

          title: '授权失败',

          icon: 'none'

        });

      }

    });

  },

  // 开启WiFi模块

  startWifi() {

    wx.startWifi({

      success: () => {

        wx.connectWifi({

          SSID: 'DS',

          password: 'desen8888.',

          maunal: true, // 跳转到系统设置页进行连接

          success: () => {

            this.setData({ isConnected: true });

            wx.showToast({ title: 'WiFi连接成功' });

          },

          fail: (err) => {

            console.error('WiFi连接失败', err);

            wx.showToast({ title: 'WiFi连接失败', icon: 'none' });

          }

        });

      },

      fail: (err) => {

        console.error('WiFi模块启动失败', err);

        wx.showToast({ title: 'WiFi模块启动失败', icon: 'none' });

      }

    });

  },

  // 发送HTTP请求

  sendHttpRequest(data) {

    const { serverUrl } = this.data;

    wx.request({

      url: serverUrl,

      method: 'POST',

      data: data,

      header: {

        'Content-Type': 'application/json'

      },

      success: (res) => {

        console.log('请求成功:', res.data);

        wx.showToast({ title: '指令发送成功' });

      },

      fail: (err) => {

        console.error('请求失败:', err);

        wx.showToast({ title: '指令发送失败', icon: 'none' });

      }

    });

  },

  // 控制逻辑

  onForward() {

    this.sendHttpRequest({ command: 'forward' });

  },

  onBackward() {

    this.sendHttpRequest({ command: 'backward' });

  },

  onTurnLeft() {

    this.sendHttpRequest({ command: 'turn_left' });

  },

  onTurnRight() {

    this.sendHttpRequest({ command: 'turn_right' });

  },

  onStop() {

    this.sendHttpRequest({ command: 'stop' });

  },

  onButton1Click() {

    this.sendHttpRequest({ command: 'action_1' });

  },

  onButton2Click() {

    this.sendHttpRequest({ command: 'action_2' });

  }

});

这是庐山派的代码

import time
import os
import sys
import socket
import network
import ujson
import aicube
from media.sensor import *
from media.display import *
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
from machine import UART, FPIOA, Pin
from machine import Timer

# 创建FPIOA对象,用于初始化引脚功能配置
fpioa = FPIOA()

# 配置UART1引脚
fpioa.set_function(3, FPIOA.UART1_TXD)
fpioa.set_function(4, FPIOA.UART1_RXD)

# 配置UART2引脚
fpioa.set_function(5, FPIOA.UART2_TXD)
fpioa.set_function(6, FPIOA.UART2_RXD)

# 将62、20、63号引脚映射为GPIO,用于控制RGB灯的三个颜色通道
fpioa.set_function(62, FPIOA.GPIO62)
fpioa.set_function(20, FPIOA.GPIO20)
fpioa.set_function(63, FPIOA.GPIO63)

LED_R = Pin(62, Pin.OUT, pull=Pin.PULL_NONE, drive=7)  # 红色LED
LED_G = Pin(20, Pin.OUT, pull=Pin.PULL_NONE, drive=7)  # 绿色LED
LED_B = Pin(63, Pin.OUT, pull=Pin.PULL_NONE, drive=7)  # 蓝色LED

# 初始化时先关闭所有LED灯
LED_R.high()
LED_G.high()
LED_B.high()

# 初始化UART1,波特率115200,8位数据位,无校验,1位停止位
uart1_DriveMotor = UART(UART.UART1, baudrate=115200, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)

# 初始化UART2,波特率115200,8位数据位,无校验,1位停止位
uart1_ToPC = UART(UART.UART2, baudrate=115200, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)

# 指令映射表
INSTRUCTION_MAP = {
    "Straight lane": '{"T":1,"L":0.5,"R":0.5}\n',
    "Straight ahead arrow": '{"T":1,"L":0.5,"R":0.5}\n'
}

# 连接网络
def network_use_wlan(is_wlan=True):
    if is_wlan:
        sta = network.WLAN(0)
        sta.connect("DS", "desen8888.")
        print(sta.status())
        while sta.ifconfig()[0] == '0.0.0.0':
            os.exitpoint()
        print(sta.ifconfig())
        ip = sta.ifconfig()[0]
        return ip
    else:
        a = network.LAN()
        if not a.active():
            raise RuntimeError("LAN interface is not active.")
        a.ifconfig("dhcp")
        print(a.ifconfig())
        ip = a.ifconfig()[0]
        return ip

# 初始化摄像头
def init_camera():
    global sensor
    try:
        print("camera_test")
        # 根据默认配置构建 Sensor 对象
        sensor = Sensor()
        # 复位 sensor
        sensor.reset()

        # 设置通道 0 分辨率为 1920x1080
        sensor.set_framesize(Sensor.FHD)
        # 设置通道 0 格式为 YUV420SP
        sensor.set_pixformat(Sensor.YUV420SP)
        # 绑定通道 0 到显示 VIDEO1 层
        bind_info = sensor.bind_info()
        Display.bind_layer(**bind_info, layer=Display.LAYER_VIDEO1)

        # 设置通道 1 分辨率和格式
        sensor.set_framesize(width=640, height=480, chn=CAM_CHN_ID_1)
        sensor.set_pixformat(Sensor.RGB888, chn=CAM_CHN_ID_1)

        # 设置通道 2 分辨率和格式
        sensor.set_framesize(width=640, height=480, chn=CAM_CHN_ID_2)
        sensor.set_pixformat(Sensor.RGB565, chn=CAM_CHN_ID_2)

        # 初始化 HDMI 和 IDE 输出显示,若屏幕无法点亮,请参考 API 文档中的 K230_CanMV_Display 模块 API 手册进行配置
        Display.init(Display.LT9611, to_ide=True, osd_num=2)
        # 初始化媒体管理器
        MediaManager.init()
        # 启动 sensor
        sensor.run()
        print("Camera initialized successfully.")
        return True
    except Exception as e:
        print(f"Camera initialization error: {e}")
        return False

# 启动 Web 服务器
def start_web_server():
    ip = network_use_wlan(True)

    s = socket.socket()
    ai = socket.getaddrinfo("0.0.0.0", 8081)
    print("Bind address info:", ai)
    addr = ai[0][-1]
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(addr)
    s.listen(5)
    print("Listening, connect your browser to http://%s:8081/" % (ip))

    while True:
        res = s.accept()
        client_sock = res[0]
        client_addr = res[1]
        print("Client address:", client_addr)
        print("Client socket:", client_sock)
        client_sock.setblocking(False)
        client_stream = client_sock
        # 非阻塞读取请求数据
        request = b''
        while True:
            try:
                chunk = client_stream.recv(1024)
                if chunk:  # 有数据
                    request += chunk
                    if b'\r\n\r\n' in request:  # 检测到请求头结束
                        break
                else:  # 无数据,继续视频流
                    break
            except OSError:  # 无数据可读
                break

        # 如果有完整的请求头
        if request and b'\r\n\r\n' in request:
            headers = request.split(b'\r\n\r\n')[0].decode()
            print('Received request headers:', headers)

            # 处理POST请求
            if 'POST /command' in headers:
                try:
                    # 获取Content-Length
                    content_length = 0
                    for line in headers.split('\r\n'):
                        if line.lower().startswith('content-length:'):
                            content_length = int(line.split(': ')[1])
                            break

                    # 读取剩余请求体
                    body = request.split(b'\r\n\r\n')[1]
                    while len(body) < content_length:
                        chunk = client_stream.recv(content_length - len(body))
                        if not chunk:
                            break
                        body += chunk

                    # 解析JSON
                    data = ujson.loads(body.decode())
                    command = data.get('command')
                    if command:
                        print('Executing command:', command)
                        send_instruction(command)
                        response = ('HTTP/1.1 200 OK\r\n'
                                   'Content-Type: application/json\r\n'
                                   '\r\n'
                                   '{"status": "success"}')
                    else:
                        response = ('HTTP/1.1 400 Bad Request\r\n'
                                   'Content-Type: application/json\r\n'
                                   '\r\n'
                                   '{"error": "Missing command"}')
                except Exception as e:
                    print('Error processing command:', e)
                    response = ('HTTP/1.1 400 Bad Request\r\n'
                               'Content-Type: application/json\r\n'
                               '\r\n'
                               '{"error": "Invalid request"}')

                # 发送响应
                client_stream.write(response.encode())
                client_stream.close()
                continue

# 发送指令时亮灯,间隔一秒关灯
def send_instruction(instruction):
    if instruction in INSTRUCTION_MAP:
        cmd = INSTRUCTION_MAP[instruction]
        uart1_DriveMotor.write(bytes(cmd, 'utf-8'))  # 发送到UART1
        uart1_ToPC.write(bytes(cmd, 'utf-8'))  # 同时发送到UART2
        print(f"已发送指令: {cmd}")
        LED_G.low()  # 点亮绿灯
        time.sleep(1)
        LED_G.high()  # 关闭绿灯

# 主函数
def main():
    if not init_camera():
        print("Camera initialization failed. Exiting...")
        return
    start_web_server()

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt as e:
        print("用户停止: ", e)
    except BaseException as e:
        print(f"异常: {e}")
    finally:
        # 停止 sensor
        if isinstance(sensor, Sensor):
            sensor.stop()
        # 销毁显示
        Display.deinit()
        os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
        time.sleep_ms(100)
        # 释放媒体缓冲区
        MediaManager.deinit()

庐山派打印窗口这样是成功的


网站公告

今日签到

点亮在社区的每一天
去签到