使用Nordic uart service传输数据给app。
mpu6050.py
import _thread
import time
import uasyncio as asyncio
from machine import Pin, I2C, PWM
import math
# 定义I2C接口 - 尝试使用不同的引脚组合
i2c = I2C(0, scl=Pin(0), sda=Pin(1), freq=400000) # 常用的I2C引脚组合
# MPU6050的I2C地址
MPU6050_ADDR = 0x68
# MPU6050寄存器地址
PWR_MGMT_1 = 0x6B
ACCEL_XOUT_H = 0x3B
GYRO_XOUT_H = 0x43
WHO_AM_I = 0x75
# 添加I2C设备扫描函数
def scan_i2c_devices():
print("scan I2C bus devices...")
devices = i2c.scan()
if len(devices) == 0:
print("not find I2C device!")
#print("请检查:")
#print("1. MPU6050传感器是否正确连接")
#print("2. I2C引脚配置是否正确")
#print("3. 传感器电源是否正常")
return False
else:
print(f"find {len(devices)} I2C device")
for device in devices:
print(f"device addr: 0x{device:02X}")
if MPU6050_ADDR in devices:
print(f"find one MPU6050, it's addr:0x{MPU6050_ADDR:02X}")
return True
else:
print(f"can not find MPU6050, addr:0x{MPU6050_ADDR:02X}")
return False
def mpu6050_init():
try:
# 先扫描I2C总线,确认设备存在
if not scan_i2c_devices():
return False
# 唤醒MPU6050(默认处于睡眠模式)
print("尝试唤醒MPU6050...")
i2c.writeto_mem(MPU6050_ADDR, PWR_MGMT_1, b'\x00')
time.sleep_ms(100)
# 读取并验证设备ID
device_id = i2c.readfrom_mem(MPU6050_ADDR, WHO_AM_I, 1)[0]
if device_id == 0x68:
print(f"MPU6050设备ID验证通过: 0x{device_id:02X}")
else:
print(f"设备ID不匹配!预期0x68,实际返回0x{device_id:02X}")
print("这可能表示:")
print("1. 传感器不是MPU6050")
print("2. 传感器损坏")
print("3. I2C通信受到干扰")
return False
# 配置采样率和滤波器
i2c.writeto_mem(MPU6050_ADDR, 0x19, b'\x07') # SMPLRT_DIV寄存器
i2c.writeto_mem(MPU6050_ADDR, 0x1A, b'\x00') # CONFIG寄存器
i2c.writeto_mem(MPU6050_ADDR, 0x1B, b'\x00') # GYRO_CONFIG寄存器
i2c.writeto_mem(MPU6050_ADDR, 0x1C, b'\x00') # ACCEL_CONFIG寄存器 (±2g量程)
print("MPU6050初始化成功!")
return True
except OSError as e:
print(f"初始化MPU6050时出错: {e}")
print("这通常表示:")
print("1. I2C通信失败")
print("2. 传感器未正确连接")
print("3. 传感器地址不正确")
return False
def read_word(adr):
high = i2c.readfrom_mem(MPU6050_ADDR, adr, 1)[0]
low = i2c.readfrom_mem(MPU6050_ADDR, adr + 1, 1)[0]
val = (high << 8) + low
return val
def read_word_2c(adr):
val = read_word(adr)
if (val >= 0x8000):
return -((65535 - val) + 1)
else:
return val
def read_accel_x():
return read_word_2c(ACCEL_XOUT_H)
def read_accel_y():
return read_word_2c(ACCEL_XOUT_H + 2)
def read_accel_z():
return read_word_2c(ACCEL_XOUT_H + 4)
def read_gyro_x():
return read_word_2c(GYRO_XOUT_H)
def read_gyro_y():
return read_word_2c(GYRO_XOUT_H + 2)
def read_gyro_z():
return read_word_2c(GYRO_XOUT_H + 4)
def calculate_angles():
while True:
ax = read_accel_x()
ay = read_accel_y()
az = read_accel_z()
# 计算俯仰角(Pitch)
pitch = math.atan2(ax, math.sqrt(ay * ay + az * az)) * 180 / math.pi
# 计算翻滚角(Roll)
roll = math.atan2(ay, math.sqrt(ax * ax + az * az)) * 180 / math.pi
print(f"Pitch: {pitch:.2f}°, Roll: {roll:.2f}°")
time.sleep(0.1)
def pwm_thread():
# 创建LED控制对象
led = PWM(Pin(12), freq=1000)
while True:
# 渐亮
for i in range(0, 1024):
led.duty(i)
time.sleep_ms(1)
# 渐暗
for i in range(1023, 0, -1):
led.duty(i)
time.sleep_ms(1)
# 新增:将加速度值打包为字节数组(大端模式)
def pack_accel_data(ax, ay, az):
"""将加速度值(short类型)打包为6字节大端格式字节数组"""
data = bytearray(6)
data[0] = (ax >> 8) & 0xFF # X轴高字节
data[1] = ax & 0xFF # X轴低字节
data[2] = (ay >> 8) & 0xFF # Y轴高字节
data[3] = ay & 0xFF # Y轴低字节
data[4] = (az >> 8) & 0xFF # Z轴高字节
data[5] = az & 0xFF # Z轴低字节
return data
def register_speed_ble_callback(callback):
global _ble_callback
_ble_callback = callback
async def acc_read():
# 初始化MPU6050
if not mpu6050_init():
print("MPU6050 init fail, exit process")
return
else:
while True:
try:
ax = read_accel_x()
ay = read_accel_y()
az = read_accel_z()
# 计算俯仰角(Pitch)
pitch = math.atan2(ax, math.sqrt(ay * ay + az * az)) * 180 / math.pi
# 计算翻滚角(Roll)
roll = math.atan2(ay, math.sqrt(ax * ax + az * az)) * 180 / math.pi
print(f"Pitch: {pitch:.2f}, Roll: {roll:.2f}")
# 打包并发送加速度数据
if _ble_callback:
accel_data = pack_accel_data(ax, ay, az)
_ble_callback(accel_data)
#print(f"发送加速度: X={ax}, Y={ay}, Z={az}")
else:
print("BLE callback not registered, data not sent")
await asyncio.sleep(0.1)
except Exception as e:
print(f"读取传感器数据失败: {e}")
await asyncio.sleep(1)
if __name__ == "__main__":
print('mpu6050 main')
asyncio.run(acc_read())
main.py
import uasyncio as asyncio
import machine
import ubinascii
import time
from ble_advertising import advertising_payload
import mpu6050
# import at24c02
# import CD4067BMT
from micropython import const
import bluetooth
import _thread
# 定义LED引脚
led = machine.PWM(machine.Pin(12))
led.freq(1000)
# 蓝牙相关常量
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
# 服务和特征的UUID
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_UART_RX = (
bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
bluetooth.FLAG_WRITE,
)
_UART_SERVICE = (
_UART_UUID,
(_UART_TX, _UART_RX),
)
# 设备名称服务UUID和特征
_SPEED_AND_CADENCE_SERVICE_UUID = bluetooth.UUID("00001814-0000-1000-8000-00805f9b34fb") # speedAndCadence Access Service
_SPEED_AND_CADENCE_CHAR_UUID = bluetooth.UUID("00002a67-0000-1000-8000-00805f9b34fb") # speedAndCadence Characteristic
_SPEED_AND_CADENCE_CHAR = (
_SPEED_AND_CADENCE_CHAR_UUID,
bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY, # 允许通知
)
_SPEED_AND_CADENCE_SERVICE = (
_SPEED_AND_CADENCE_SERVICE_UUID,
(_SPEED_AND_CADENCE_CHAR,),
)
# 全局标志
pwm_running = False
program_running = True
# 呼吸灯效果函数
def breathing_led():
global pwm_running
while pwm_running:
# 亮度从0到1023逐渐增加
print('亮度从 0 到 1023 逐渐增加')
for i in range(0, 1024):
if not pwm_running:
break
led.duty(i)
time.sleep_ms(1)
# 亮度从1023到0逐渐减小
print('亮度从 1023 到 0 逐渐减小')
for i in range(1023, -1, -1):
if not pwm_running:
break
led.duty(i)
time.sleep_ms(1)
# 蓝牙类
class BLEUART:
def __init__(self, ble, name="mpy-uart"):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
# 处理 MAC 地址
mac_tuple = ble.config("mac") # 获取 (addr_type, addr_bytes)
print(mac_tuple)
mac_bytes = mac_tuple[1] # 提取字节部分(6字节)
mac_str = ubinascii.hexlify(mac_bytes).decode().upper() # 转换为十六进制字符串
short_mac = mac_str[-6:] # 取后6位(如 "FFDC22")
self.current_device_name = f"F-{short_mac}" # 生成唯一名称:"left-FFDC22"
#self.current_device_name = name
# ble.gap_set_name(f"right-{short_mac}") # 示例名称:"right-63E1D5"
# 休眠/唤醒
self._last_connection_time = time.ticks_ms() # 记录最后连接时间
self._wakeup_pin = machine.Pin(6, mode=machine.Pin.IN, pull=machine.Pin.PULL_UP) # 配置唤醒引脚
self._wakeup_pin.irq(trigger=machine.Pin.IRQ_FALLING, handler=self._wakeup_handler) # 下降沿触发唤醒
self._is_sleeping = False
# 注册UART服务和加速度服务
((self._handle_tx, self._handle_rx), (self._handle_accel,)) = self._ble.gatts_register_services(
(_UART_SERVICE, _SPEED_AND_CADENCE_SERVICE)
)
self._is_connected = False
self._connections = set()
self._write_callback = None
self._payload = advertising_payload(name=self.current_device_name, services=[_UART_UUID,])
print(f"Advertising data length: {len(self._payload)} bytes") # 应 ≤ 31字节
self._start_advertising()
def _wakeup_handler(self, pin):
"""唤醒处理函数"""
print("Wakeup key(gpio0) triggered")
if self._is_sleeping:
print("Wakeup detected from GPIO5")
self._is_sleeping = False
# 停止呼吸灯线程(需全局标志控制)
global led
global pwm_running
pwm_running = False
# 重新初始化LED
led.deinit() # 先释放旧资源
led = machine.PWM(machine.Pin(12))
led.freq(1000)
# 恢复广播
self._start_advertising()
self._last_connection_time = time.ticks_ms() # 重置连接时间戳
def _enter_sleep(self):
"""进入休眠模式"""
print("Entering sleep mode...")
self._stop_advertising() # 停止广播
self._is_sleeping = True
# 关闭非必要外设(示例:关闭LED)
led.deinit()
# 进入深度睡眠,等待唤醒
machine.deepsleep()
def _check_sleep_condition(self):
"""检查是否满足休眠条件:10分钟无连接"""
if self._is_connected:
return # 连接中不休眠
current_time = time.ticks_ms()
elapsed_time = time.ticks_diff(current_time, self._last_connection_time)
if elapsed_time >= 600000: # 10分钟(600000毫秒)
self._enter_sleep()
def _irq(self, event, data):
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _ = data
print("New connection", conn_handle)
self._connections.add(conn_handle)
self._is_connected = True
self._stop_advertising()
self._last_connection_time = time.ticks_ms() # 更新最后连接时间
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
print("Disconnected", conn_handle)
self._connections.remove(conn_handle)
self._is_connected = False
self._start_advertising()
self._last_connection_time = time.ticks_ms() # 断开时刷新时间戳
elif event == _IRQ_GATTS_WRITE:
conn_handle, value_handle = data
if value_handle == self._handle_rx:
data = self._ble.gatts_read(self._handle_rx)
if self._write_callback:
self._write_callback(data)
def send(self, data):
# 创建连接句柄的副本,避免在迭代过程中修改原始集合
connections = list(self._connections)
for conn_handle in connections:
try:
self._ble.gatts_notify(conn_handle, self._handle_tx, data)
except Exception as e:
print(f"Notify failed for conn_handle {conn_handle}: {e}")
# 从连接集合中移除无效的句柄
if conn_handle in self._connections:
self._connections.remove(conn_handle)
# 如果没有有效连接了,更新连接状态
if not self._connections:
self._is_connected = False
self._start_advertising()
# 新增:发送加速度数据的方法
def send_acceleration(self, data):
"""通过加速度特征值发送数据"""
connections = list(self._connections)
for conn_handle in connections:
try:
self._ble.gatts_notify(conn_handle, self._handle_accel, data)
except Exception as e:
print(f"加速度数据发送失败: {e}")
if conn_handle in self._connections:
self._connections.remove(conn_handle)
if not self._connections:
self._is_connected = False
self._start_advertising()
def is_connected(self):
return len(self._connections) > 0
def _start_advertising(self, interval_us=500000):
"""开始广播"""
if not self._is_sleeping:
print(f"Starting advertising with name: {self.current_device_name}")
self._payload = advertising_payload(name=self.current_device_name, services=[_UART_UUID,])
self._ble.gap_advertise(interval_us, adv_data=self._payload)
else:
print("Still sleeping, skip advertising")
def _stop_advertising(self):
"""停止广播"""
print("Stopping advertising")
self._ble.gap_advertise(None)
def on_write(self, callback):
self._write_callback = callback
def _change_device_name(self, new_name):
# 限制名称长度
if len(new_name) > 20:
new_name = new_name[:20]
self.current_device_name = new_name
# 如果没有连接,重新开始广播使用新名称
if not self.is_connected():
self._start_advertising()
print(f"Device name changed to: {new_name}")
# 定义数据发送函数
def send_acceleration_data_via_ble(data):
if uart.is_connected():
uart.send_acceleration(data)
else:
print("BLE not connected, data not sent")
# 新增全局共享标志(控制扫描线程启停)
flag_scan = True # 默认开启扫描
scan_lock = _thread.allocate_lock() # 添加锁保证线程安全
# 蓝牙控制回调函数
def ble_callback(data):
global pwm_running, program_running, flag_scan
command = data.decode().strip().lower()
print(f"Received command: {command}")
if command == 'on':
with scan_lock:
flag_scan = True
if not pwm_running:
pwm_running = True
#_thread.start_new_thread(breathing_led, ())
elif command == 'off':
with scan_lock:
flag_scan = False
if pwm_running:
pwm_running = False
led.duty(0)
elif command == 'stop':
program_running = False
# 初始化蓝牙
ble = bluetooth.BLE()
uart = BLEUART(ble)
uart.on_write(ble_callback)
# 主循环
#while program_running:
# # if not uart.is_connected() and not uart._is_sleeping:
# # uart._check_sleep_condition() # 主循环中定期检查休眠条件
# time.sleep(1)
async def peripheral_task():
"""蓝牙外设任务,处理广告和连接"""
while True:
print("Peripheral task running...")
await asyncio.sleep(1)
async def central_task():
"""蓝牙中心任务,处理扫描和连接"""
while True:
print("Central task running...")
await asyncio.sleep(1)
async def main():
"""主函数,负责协调所有异步任务"""
try:
# 创建任务
peripheral = asyncio.create_task(peripheral_task())
central = asyncio.create_task(central_task())
acc_task = asyncio.create_task(mpu6050.acc_read())
# 等待任务完成(这里不会完成,因为是无限循环)
await asyncio.gather(peripheral, central, acc_task)
except KeyboardInterrupt:
print("Interrupted by user")
finally:
# 清理工作
print("Cleaning up...")
# 注册回调函数到matrixScan模块
mpu6050.register_speed_ble_callback(send_acceleration_data_via_ble)
if __name__ == "__main__":
print('main')
# 启动主事件循环
asyncio.run(main())