首先要形成同一个局域网才能够实现
这是微信小程序的代码
Page({
data: {
isConnected: false, // WiFi连接状态
serverUrl: 'http://192.168.1.225:8081/command' // 服务器地址
},
onLoad() {
this.authorizeLocation();
},
// 授权地理位置
authorizeLocation() {
wx.authorize({
scope: 'scope.userLocation',
success: () => {
this.startWifi();
},
fail: () => {
wx.showToast({
title: '授权失败',
icon: 'none'
});
}
});
},
// 开启WiFi模块
startWifi() {
wx.startWifi({
success: () => {
wx.connectWifi({
SSID: 'DS',
password: 'desen8888.',
maunal: true, // 跳转到系统设置页进行连接
success: () => {
this.setData({ isConnected: true });
wx.showToast({ title: 'WiFi连接成功' });
},
fail: (err) => {
console.error('WiFi连接失败', err);
wx.showToast({ title: 'WiFi连接失败', icon: 'none' });
}
});
},
fail: (err) => {
console.error('WiFi模块启动失败', err);
wx.showToast({ title: 'WiFi模块启动失败', icon: 'none' });
}
});
},
// 发送HTTP请求
sendHttpRequest(data) {
const { serverUrl } = this.data;
wx.request({
url: serverUrl,
method: 'POST',
data: data,
header: {
'Content-Type': 'application/json'
},
success: (res) => {
console.log('请求成功:', res.data);
wx.showToast({ title: '指令发送成功' });
},
fail: (err) => {
console.error('请求失败:', err);
wx.showToast({ title: '指令发送失败', icon: 'none' });
}
});
},
// 控制逻辑
onForward() {
this.sendHttpRequest({ command: 'forward' });
},
onBackward() {
this.sendHttpRequest({ command: 'backward' });
},
onTurnLeft() {
this.sendHttpRequest({ command: 'turn_left' });
},
onTurnRight() {
this.sendHttpRequest({ command: 'turn_right' });
},
onStop() {
this.sendHttpRequest({ command: 'stop' });
},
onButton1Click() {
this.sendHttpRequest({ command: 'action_1' });
},
onButton2Click() {
this.sendHttpRequest({ command: 'action_2' });
}
});
这是庐山派的代码
import time
import os
import sys
import socket
import network
import ujson
import aicube
from media.sensor import *
from media.display import *
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
from machine import UART, FPIOA, Pin
from machine import Timer
# 创建FPIOA对象,用于初始化引脚功能配置
fpioa = FPIOA()
# 配置UART1引脚
fpioa.set_function(3, FPIOA.UART1_TXD)
fpioa.set_function(4, FPIOA.UART1_RXD)
# 配置UART2引脚
fpioa.set_function(5, FPIOA.UART2_TXD)
fpioa.set_function(6, FPIOA.UART2_RXD)
# 将62、20、63号引脚映射为GPIO,用于控制RGB灯的三个颜色通道
fpioa.set_function(62, FPIOA.GPIO62)
fpioa.set_function(20, FPIOA.GPIO20)
fpioa.set_function(63, FPIOA.GPIO63)
LED_R = Pin(62, Pin.OUT, pull=Pin.PULL_NONE, drive=7) # 红色LED
LED_G = Pin(20, Pin.OUT, pull=Pin.PULL_NONE, drive=7) # 绿色LED
LED_B = Pin(63, Pin.OUT, pull=Pin.PULL_NONE, drive=7) # 蓝色LED
# 初始化时先关闭所有LED灯
LED_R.high()
LED_G.high()
LED_B.high()
# 初始化UART1,波特率115200,8位数据位,无校验,1位停止位
uart1_DriveMotor = UART(UART.UART1, baudrate=115200, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)
# 初始化UART2,波特率115200,8位数据位,无校验,1位停止位
uart1_ToPC = UART(UART.UART2, baudrate=115200, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)
# 指令映射表
INSTRUCTION_MAP = {
"Straight lane": '{"T":1,"L":0.5,"R":0.5}\n',
"Straight ahead arrow": '{"T":1,"L":0.5,"R":0.5}\n'
}
# 连接网络
def network_use_wlan(is_wlan=True):
if is_wlan:
sta = network.WLAN(0)
sta.connect("DS", "desen8888.")
print(sta.status())
while sta.ifconfig()[0] == '0.0.0.0':
os.exitpoint()
print(sta.ifconfig())
ip = sta.ifconfig()[0]
return ip
else:
a = network.LAN()
if not a.active():
raise RuntimeError("LAN interface is not active.")
a.ifconfig("dhcp")
print(a.ifconfig())
ip = a.ifconfig()[0]
return ip
# 初始化摄像头
def init_camera():
global sensor
try:
print("camera_test")
# 根据默认配置构建 Sensor 对象
sensor = Sensor()
# 复位 sensor
sensor.reset()
# 设置通道 0 分辨率为 1920x1080
sensor.set_framesize(Sensor.FHD)
# 设置通道 0 格式为 YUV420SP
sensor.set_pixformat(Sensor.YUV420SP)
# 绑定通道 0 到显示 VIDEO1 层
bind_info = sensor.bind_info()
Display.bind_layer(**bind_info, layer=Display.LAYER_VIDEO1)
# 设置通道 1 分辨率和格式
sensor.set_framesize(width=640, height=480, chn=CAM_CHN_ID_1)
sensor.set_pixformat(Sensor.RGB888, chn=CAM_CHN_ID_1)
# 设置通道 2 分辨率和格式
sensor.set_framesize(width=640, height=480, chn=CAM_CHN_ID_2)
sensor.set_pixformat(Sensor.RGB565, chn=CAM_CHN_ID_2)
# 初始化 HDMI 和 IDE 输出显示,若屏幕无法点亮,请参考 API 文档中的 K230_CanMV_Display 模块 API 手册进行配置
Display.init(Display.LT9611, to_ide=True, osd_num=2)
# 初始化媒体管理器
MediaManager.init()
# 启动 sensor
sensor.run()
print("Camera initialized successfully.")
return True
except Exception as e:
print(f"Camera initialization error: {e}")
return False
# 启动 Web 服务器
def start_web_server():
ip = network_use_wlan(True)
s = socket.socket()
ai = socket.getaddrinfo("0.0.0.0", 8081)
print("Bind address info:", ai)
addr = ai[0][-1]
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(5)
print("Listening, connect your browser to http://%s:8081/" % (ip))
while True:
res = s.accept()
client_sock = res[0]
client_addr = res[1]
print("Client address:", client_addr)
print("Client socket:", client_sock)
client_sock.setblocking(False)
client_stream = client_sock
# 非阻塞读取请求数据
request = b''
while True:
try:
chunk = client_stream.recv(1024)
if chunk: # 有数据
request += chunk
if b'\r\n\r\n' in request: # 检测到请求头结束
break
else: # 无数据,继续视频流
break
except OSError: # 无数据可读
break
# 如果有完整的请求头
if request and b'\r\n\r\n' in request:
headers = request.split(b'\r\n\r\n')[0].decode()
print('Received request headers:', headers)
# 处理POST请求
if 'POST /command' in headers:
try:
# 获取Content-Length
content_length = 0
for line in headers.split('\r\n'):
if line.lower().startswith('content-length:'):
content_length = int(line.split(': ')[1])
break
# 读取剩余请求体
body = request.split(b'\r\n\r\n')[1]
while len(body) < content_length:
chunk = client_stream.recv(content_length - len(body))
if not chunk:
break
body += chunk
# 解析JSON
data = ujson.loads(body.decode())
command = data.get('command')
if command:
print('Executing command:', command)
send_instruction(command)
response = ('HTTP/1.1 200 OK\r\n'
'Content-Type: application/json\r\n'
'\r\n'
'{"status": "success"}')
else:
response = ('HTTP/1.1 400 Bad Request\r\n'
'Content-Type: application/json\r\n'
'\r\n'
'{"error": "Missing command"}')
except Exception as e:
print('Error processing command:', e)
response = ('HTTP/1.1 400 Bad Request\r\n'
'Content-Type: application/json\r\n'
'\r\n'
'{"error": "Invalid request"}')
# 发送响应
client_stream.write(response.encode())
client_stream.close()
continue
# 发送指令时亮灯,间隔一秒关灯
def send_instruction(instruction):
if instruction in INSTRUCTION_MAP:
cmd = INSTRUCTION_MAP[instruction]
uart1_DriveMotor.write(bytes(cmd, 'utf-8')) # 发送到UART1
uart1_ToPC.write(bytes(cmd, 'utf-8')) # 同时发送到UART2
print(f"已发送指令: {cmd}")
LED_G.low() # 点亮绿灯
time.sleep(1)
LED_G.high() # 关闭绿灯
# 主函数
def main():
if not init_camera():
print("Camera initialization failed. Exiting...")
return
start_web_server()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt as e:
print("用户停止: ", e)
except BaseException as e:
print(f"异常: {e}")
finally:
# 停止 sensor
if isinstance(sensor, Sensor):
sensor.stop()
# 销毁显示
Display.deinit()
os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
time.sleep_ms(100)
# 释放媒体缓冲区
MediaManager.deinit()
庐山派打印窗口这样是成功的