esp32 挂载mpu6050实现加速度计

发布于:2025-07-25 ⋅ 阅读:(13) ⋅ 点赞:(0)

使用Nordic uart service传输数据给app。

mpu6050.py

import _thread
import time
import uasyncio as asyncio
from machine import Pin, I2C, PWM
import math

# 定义I2C接口 - 尝试使用不同的引脚组合
i2c = I2C(0, scl=Pin(0), sda=Pin(1), freq=400000)  # 常用的I2C引脚组合

# MPU6050的I2C地址
MPU6050_ADDR = 0x68
# MPU6050寄存器地址
PWR_MGMT_1 = 0x6B
ACCEL_XOUT_H = 0x3B
GYRO_XOUT_H = 0x43
WHO_AM_I = 0x75

# 添加I2C设备扫描函数
def scan_i2c_devices():
    print("scan I2C bus devices...")
    devices = i2c.scan()
    if len(devices) == 0:
        print("not find I2C device!")
        #print("请检查:")
        #print("1. MPU6050传感器是否正确连接")
        #print("2. I2C引脚配置是否正确")
        #print("3. 传感器电源是否正常")
        return False
    else:
        print(f"find {len(devices)} I2C device")
        for device in devices:
            print(f"device addr: 0x{device:02X}")
        if MPU6050_ADDR in devices:
            print(f"find one MPU6050, it's addr:0x{MPU6050_ADDR:02X}")
            return True
        else:
            print(f"can not find MPU6050, addr:0x{MPU6050_ADDR:02X}")
            return False

def mpu6050_init():
    try:
        # 先扫描I2C总线,确认设备存在
        if not scan_i2c_devices():
            return False

        # 唤醒MPU6050(默认处于睡眠模式)
        print("尝试唤醒MPU6050...")
        i2c.writeto_mem(MPU6050_ADDR, PWR_MGMT_1, b'\x00')
        time.sleep_ms(100)

        # 读取并验证设备ID
        device_id = i2c.readfrom_mem(MPU6050_ADDR, WHO_AM_I, 1)[0]
        if device_id == 0x68:
            print(f"MPU6050设备ID验证通过: 0x{device_id:02X}")
        else:
            print(f"设备ID不匹配!预期0x68,实际返回0x{device_id:02X}")
            print("这可能表示:")
            print("1. 传感器不是MPU6050")
            print("2. 传感器损坏")
            print("3. I2C通信受到干扰")
            return False

        # 配置采样率和滤波器
        i2c.writeto_mem(MPU6050_ADDR, 0x19, b'\x07')  # SMPLRT_DIV寄存器
        i2c.writeto_mem(MPU6050_ADDR, 0x1A, b'\x00')  # CONFIG寄存器
        i2c.writeto_mem(MPU6050_ADDR, 0x1B, b'\x00')  # GYRO_CONFIG寄存器
        i2c.writeto_mem(MPU6050_ADDR, 0x1C, b'\x00')  # ACCEL_CONFIG寄存器 (±2g量程)

        print("MPU6050初始化成功!")
        return True
    except OSError as e:
        print(f"初始化MPU6050时出错: {e}")
        print("这通常表示:")
        print("1. I2C通信失败")
        print("2. 传感器未正确连接")
        print("3. 传感器地址不正确")
        return False

def read_word(adr):
    high = i2c.readfrom_mem(MPU6050_ADDR, adr, 1)[0]
    low = i2c.readfrom_mem(MPU6050_ADDR, adr + 1, 1)[0]
    val = (high << 8) + low
    return val

def read_word_2c(adr):
    val = read_word(adr)
    if (val >= 0x8000):
        return -((65535 - val) + 1)
    else:
        return val

def read_accel_x():
    return read_word_2c(ACCEL_XOUT_H)

def read_accel_y():
    return read_word_2c(ACCEL_XOUT_H + 2)

def read_accel_z():
    return read_word_2c(ACCEL_XOUT_H + 4)

def read_gyro_x():
    return read_word_2c(GYRO_XOUT_H)

def read_gyro_y():
    return read_word_2c(GYRO_XOUT_H + 2)

def read_gyro_z():
    return read_word_2c(GYRO_XOUT_H + 4)

def calculate_angles():
    while True:
        ax = read_accel_x()
        ay = read_accel_y()
        az = read_accel_z()

        # 计算俯仰角(Pitch)
        pitch = math.atan2(ax, math.sqrt(ay * ay + az * az)) * 180 / math.pi
        # 计算翻滚角(Roll)
        roll = math.atan2(ay, math.sqrt(ax * ax + az * az)) * 180 / math.pi

        print(f"Pitch: {pitch:.2f}°, Roll: {roll:.2f}°")
        time.sleep(0.1)

def pwm_thread():
    # 创建LED控制对象
    led = PWM(Pin(12), freq=1000)
    while True:
        # 渐亮
        for i in range(0, 1024):
            led.duty(i)
            time.sleep_ms(1)
        # 渐暗
        for i in range(1023, 0, -1):
            led.duty(i)
            time.sleep_ms(1)

# 新增:将加速度值打包为字节数组(大端模式)
def pack_accel_data(ax, ay, az):
    """将加速度值(short类型)打包为6字节大端格式字节数组"""
    data = bytearray(6)
    data[0] = (ax >> 8) & 0xFF  # X轴高字节
    data[1] = ax & 0xFF          # X轴低字节
    data[2] = (ay >> 8) & 0xFF  # Y轴高字节
    data[3] = ay & 0xFF          # Y轴低字节
    data[4] = (az >> 8) & 0xFF  # Z轴高字节
    data[5] = az & 0xFF          # Z轴低字节
    return data

def register_speed_ble_callback(callback):
    global _ble_callback
    _ble_callback = callback

async def acc_read():
    # 初始化MPU6050
    if not mpu6050_init():
        print("MPU6050 init fail, exit process")
        return
    else:
        while True:
            try:
                ax = read_accel_x()
                ay = read_accel_y()
                az = read_accel_z()

                # 计算俯仰角(Pitch)
                pitch = math.atan2(ax, math.sqrt(ay * ay + az * az)) * 180 / math.pi
                # 计算翻滚角(Roll)
                roll = math.atan2(ay, math.sqrt(ax * ax + az * az)) * 180 / math.pi

                print(f"Pitch: {pitch:.2f}, Roll: {roll:.2f}")

                # 打包并发送加速度数据
                if _ble_callback:
                    accel_data = pack_accel_data(ax, ay, az)
                    _ble_callback(accel_data)
                    #print(f"发送加速度: X={ax}, Y={ay}, Z={az}")
                else:
                    print("BLE callback not registered, data not sent")

                await asyncio.sleep(0.1)
            except Exception as e:
                print(f"读取传感器数据失败: {e}")
                await asyncio.sleep(1)

if __name__ == "__main__":
    print('mpu6050 main')
    asyncio.run(acc_read())

main.py

import uasyncio as asyncio
import machine
import ubinascii
import time
from ble_advertising import advertising_payload
import mpu6050
# import at24c02
# import CD4067BMT
from micropython import const
import bluetooth
import _thread

# 定义LED引脚
led = machine.PWM(machine.Pin(12))
led.freq(1000)

# 蓝牙相关常量
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)

# 服务和特征的UUID
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
    bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
    bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_UART_RX = (
    bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
    bluetooth.FLAG_WRITE,
)
_UART_SERVICE = (
    _UART_UUID,
    (_UART_TX, _UART_RX),
)

# 设备名称服务UUID和特征
_SPEED_AND_CADENCE_SERVICE_UUID = bluetooth.UUID("00001814-0000-1000-8000-00805f9b34fb")  # speedAndCadence Access Service
_SPEED_AND_CADENCE_CHAR_UUID = bluetooth.UUID("00002a67-0000-1000-8000-00805f9b34fb")   # speedAndCadence Characteristic

_SPEED_AND_CADENCE_CHAR = (
    _SPEED_AND_CADENCE_CHAR_UUID,
    bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,  # 允许通知
)

_SPEED_AND_CADENCE_SERVICE = (
    _SPEED_AND_CADENCE_SERVICE_UUID,
    (_SPEED_AND_CADENCE_CHAR,),
)

# 全局标志
pwm_running = False
program_running = True


# 呼吸灯效果函数
def breathing_led():
    global pwm_running
    while pwm_running:
        # 亮度从0到1023逐渐增加
        print('亮度从 0 到 1023 逐渐增加')
        for i in range(0, 1024):
            if not pwm_running:
                break
            led.duty(i)
            time.sleep_ms(1)
        # 亮度从1023到0逐渐减小
        print('亮度从 1023 到 0 逐渐减小')
        for i in range(1023, -1, -1):
            if not pwm_running:
                break
            led.duty(i)
            time.sleep_ms(1)


# 蓝牙类
class BLEUART:
    def __init__(self, ble, name="mpy-uart"):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)

        # 处理 MAC 地址
        mac_tuple = ble.config("mac")  # 获取 (addr_type, addr_bytes)
        print(mac_tuple)
        mac_bytes = mac_tuple[1]       # 提取字节部分(6字节)
        mac_str = ubinascii.hexlify(mac_bytes).decode().upper()  # 转换为十六进制字符串
        short_mac = mac_str[-6:]       # 取后6位(如 "FFDC22")
        self.current_device_name = f"F-{short_mac}"  # 生成唯一名称:"left-FFDC22"
        #self.current_device_name = name
        # ble.gap_set_name(f"right-{short_mac}")  # 示例名称:"right-63E1D5"

        # 休眠/唤醒
        self._last_connection_time = time.ticks_ms()  # 记录最后连接时间
        self._wakeup_pin = machine.Pin(6, mode=machine.Pin.IN, pull=machine.Pin.PULL_UP)  # 配置唤醒引脚
        self._wakeup_pin.irq(trigger=machine.Pin.IRQ_FALLING, handler=self._wakeup_handler)  # 下降沿触发唤醒
        self._is_sleeping = False

        # 注册UART服务和加速度服务
        ((self._handle_tx, self._handle_rx), (self._handle_accel,)) = self._ble.gatts_register_services(
            (_UART_SERVICE, _SPEED_AND_CADENCE_SERVICE)
        )


        self._is_connected = False
        self._connections = set()
        self._write_callback = None
        self._payload = advertising_payload(name=self.current_device_name, services=[_UART_UUID,])
        print(f"Advertising data length: {len(self._payload)} bytes")  # 应 ≤ 31字节
        self._start_advertising()

    def _wakeup_handler(self, pin):
        """唤醒处理函数"""
        print("Wakeup key(gpio0) triggered")
        if self._is_sleeping:
            print("Wakeup detected from GPIO5")
            self._is_sleeping = False

            # 停止呼吸灯线程(需全局标志控制)
            global led
            global pwm_running
            pwm_running = False

            # 重新初始化LED
            led.deinit()  # 先释放旧资源
            led = machine.PWM(machine.Pin(12))
            led.freq(1000)

            # 恢复广播
            self._start_advertising()
            self._last_connection_time = time.ticks_ms()  # 重置连接时间戳

    def _enter_sleep(self):
        """进入休眠模式"""
        print("Entering sleep mode...")
        self._stop_advertising()  # 停止广播
        self._is_sleeping = True

        # 关闭非必要外设(示例:关闭LED)
        led.deinit()

        # 进入深度睡眠,等待唤醒
        machine.deepsleep()

    def _check_sleep_condition(self):
        """检查是否满足休眠条件:10分钟无连接"""
        if self._is_connected:
            return  # 连接中不休眠

        current_time = time.ticks_ms()
        elapsed_time = time.ticks_diff(current_time, self._last_connection_time)

        if elapsed_time >= 600000:  # 10分钟(600000毫秒)
            self._enter_sleep()

    def _irq(self, event, data):
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _ = data
            print("New connection", conn_handle)
            self._connections.add(conn_handle)
            self._is_connected = True
            self._stop_advertising()
            self._last_connection_time = time.ticks_ms()  # 更新最后连接时间
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _ = data
            print("Disconnected", conn_handle)
            self._connections.remove(conn_handle)
            self._is_connected = False
            self._start_advertising()
            self._last_connection_time = time.ticks_ms()  # 断开时刷新时间戳
        elif event == _IRQ_GATTS_WRITE:
            conn_handle, value_handle = data
            if value_handle == self._handle_rx:
                data = self._ble.gatts_read(self._handle_rx)
                if self._write_callback:
                    self._write_callback(data)

    def send(self, data):
        # 创建连接句柄的副本,避免在迭代过程中修改原始集合
        connections = list(self._connections)

        for conn_handle in connections:
            try:
                self._ble.gatts_notify(conn_handle, self._handle_tx, data)
            except Exception as e:
                print(f"Notify failed for conn_handle {conn_handle}: {e}")
                # 从连接集合中移除无效的句柄
                if conn_handle in self._connections:
                    self._connections.remove(conn_handle)

                # 如果没有有效连接了,更新连接状态
                if not self._connections:
                    self._is_connected = False
                    self._start_advertising()

    # 新增:发送加速度数据的方法
    def send_acceleration(self, data):
        """通过加速度特征值发送数据"""
        connections = list(self._connections)
        for conn_handle in connections:
            try:
                self._ble.gatts_notify(conn_handle, self._handle_accel, data)
            except Exception as e:
                print(f"加速度数据发送失败: {e}")
                if conn_handle in self._connections:
                    self._connections.remove(conn_handle)
                if not self._connections:
                    self._is_connected = False
                    self._start_advertising()

    def is_connected(self):
        return len(self._connections) > 0

    def _start_advertising(self, interval_us=500000):
        """开始广播"""
        if not self._is_sleeping:
            print(f"Starting advertising with name: {self.current_device_name}")
            self._payload = advertising_payload(name=self.current_device_name, services=[_UART_UUID,])
            self._ble.gap_advertise(interval_us, adv_data=self._payload)
        else:
            print("Still sleeping, skip advertising")

    def _stop_advertising(self):
        """停止广播"""
        print("Stopping advertising")
        self._ble.gap_advertise(None)

    def on_write(self, callback):
        self._write_callback = callback

    def _change_device_name(self, new_name):
        # 限制名称长度
        if len(new_name) > 20:
            new_name = new_name[:20]
        self.current_device_name = new_name

        # 如果没有连接,重新开始广播使用新名称
        if not self.is_connected():
            self._start_advertising()

        print(f"Device name changed to: {new_name}")


# 定义数据发送函数
def send_acceleration_data_via_ble(data):
    if uart.is_connected():
        uart.send_acceleration(data)
    else:
        print("BLE not connected, data not sent")

# 新增全局共享标志(控制扫描线程启停)
flag_scan = True   # 默认开启扫描
scan_lock = _thread.allocate_lock()  # 添加锁保证线程安全


# 蓝牙控制回调函数
def ble_callback(data):
    global pwm_running, program_running, flag_scan
    command = data.decode().strip().lower()
    print(f"Received command: {command}")
    if command == 'on':
        with scan_lock:
            flag_scan = True
        if not pwm_running:
            pwm_running = True
            #_thread.start_new_thread(breathing_led, ())
    elif command == 'off':
        with scan_lock:
            flag_scan = False
        if pwm_running:
            pwm_running = False
            led.duty(0)
    elif command == 'stop':
        program_running = False


# 初始化蓝牙
ble = bluetooth.BLE()
uart = BLEUART(ble)
uart.on_write(ble_callback)

# 主循环
#while program_running:
#    # if not uart.is_connected() and not uart._is_sleeping:
#    #    uart._check_sleep_condition()  # 主循环中定期检查休眠条件
#    time.sleep(1)

async def peripheral_task():
    """蓝牙外设任务,处理广告和连接"""
    while True:
        print("Peripheral task running...")
        await asyncio.sleep(1)

async def central_task():
    """蓝牙中心任务,处理扫描和连接"""
    while True:
        print("Central task running...")
        await asyncio.sleep(1)

async def main():
    """主函数,负责协调所有异步任务"""
    try:
        # 创建任务
        peripheral = asyncio.create_task(peripheral_task())
        central = asyncio.create_task(central_task())
        acc_task = asyncio.create_task(mpu6050.acc_read())

        # 等待任务完成(这里不会完成,因为是无限循环)
        await asyncio.gather(peripheral, central, acc_task)

    except KeyboardInterrupt:
        print("Interrupted by user")
    finally:
        # 清理工作
        print("Cleaning up...")

# 注册回调函数到matrixScan模块
mpu6050.register_speed_ble_callback(send_acceleration_data_via_ble)
if __name__ == "__main__":
    print('main')
    # 启动主事件循环
    asyncio.run(main())

网站公告

今日签到

点亮在社区的每一天
去签到