数据处理 | 加速度数据和陀螺仪数据对齐时间戳

发布于:2025-04-10 ⋅ 阅读:(32) ⋅ 点赞:(0)

要将 加速度计数据(Accelerometer) 和 陀螺仪数据(Gyroscope) 对齐时间戳,有几种常用方法,取决于你需要的精度、数据频率是否一致、是否异步采样等。下面是几个常用的方法

✅ 1. 简单最近邻插值法(常用)
📌 适合:两种数据频率接近,但时间戳不完全对齐
假设你有:

acc_data:  [(t0, ax0, ay0, az0), (t1, ax1, ay1, az1), ...]
gyro_data: [(t0', wx0, wy0, wz0), (t1', wx1, wy1, wz1), ...]

你想得到统一时间戳的数据对:

(t, acc_interp, gyro_interp)

可选择以加速度计或陀螺仪为主时间戳,使用最近的对方数据或线性插值对齐。

✅ 方法一:线性插值(推荐)
C++伪代码(适合 IMU 融合、预积分等场景):

// 假设 accData 是 std::vector<pair<double, Vector3d>>,gyroData 也是类似结构
Vector3d interpolate(const std::vector<std::pair<double, Vector3d>>& data, double t_query) {
    for (size_t i = 1; i < data.size(); ++i) {
        if (t_query < data[i].first) {
            double t0 = data[i - 1].first;
            double t1 = data[i].first;
            Vector3d v0 = data[i - 1].second;
            Vector3d v1 = data[i].second;
            double alpha = (t_query - t0) / (t1 - t0);
            return (1 - alpha) * v0 + alpha * v1;
        }
    }
    return data.back().second;  // fallback
}


for (const auto& acc : accData) {
    double t = acc.first;
    Vector3d acc_val = acc.second;
    Vector3d gyro_val = interpolate(gyroData, t);
    // 保存 (t, acc_val, gyro_val)
}

✅ 方法二:最近邻(粗略一点)
适合快速预处理:

Vector3d find_nearest(const std::vector<std::pair<double, Vector3d>>& data, double t_query) {
    double min_diff = 1e6;
    Vector3d best_val;
    for (const auto& d : data) {
        double dt = std::abs(d.first - t_query);
        if (dt < min_diff) {
            min_diff = dt;
            best_val = d.second;
        }
    }
    return best_val;
}

✅ 2. Python 示例:用 numpy 插值

def align_acc_gyro(pc_path, gyro_name, acc_name, bag_data_path):
    def splitLine(line):
        # split a data to timestamp, [data_x, data_y, data_z]
        if line[-1] == '\n':
            line = line.replace(',\n', '')
        else:
             #line = line[:-1]
             return 0.0, np.empty(0)

        data_split = line.split(',')
        print(data_split)
        data_split = [data.split()[0] for data in data_split]
        timestamp = data_split[0]
        del data_split[0]
        data_split = [float(data) for data in data_split]
        timestamp = int(timestamp)
        return timestamp, np.array(data_split)

    def interpolation(acc_timestamp_last, acc_timestamp, gyro_timestamp,
                    acc_data_last, acc_data):
        # 以gyro时间为基准,将acc进行线性插值
        print("acc_timestamp_last =", acc_timestamp_last)
        print("gyro_timestamp =", gyro_timestamp)
        print("acc_timestamp =", acc_timestamp)
        ratio = float(gyro_timestamp - acc_timestamp_last) / float(acc_timestamp - acc_timestamp_last)
        print('ratio = ', ratio)

        if not acc_timestamp_last <= gyro_timestamp <= acc_timestamp:
            print('error')
        result = ratio * (acc_data - acc_data_last) + acc_data_last
        print(acc_data_last)
        print(result)
        print(acc_data)
        return result

    def insertData(gyro_timestamp, gyro_data, acc_data_interpolation, timestamp, omega, alpha):
        timestamp.append(gyro_timestamp)
        for i in range(3):
            omega[i].append(gyro_data[i])
            alpha[i].append(acc_data_interpolation[i])

    if not os.path.exists(pc_path):
        print('dirPath not exist')
        exit(-1)
    # open files
    try:
        acc_file = open(os.path.join(pc_path, acc_name))
        gyro_file = open(os.path.join(pc_path, gyro_name))
    except:
        print("fail to open")
        exit(-1)

    # align acc to gyro, skip the acc_data which timestamp is small than the first gyro
    gyro_line = gyro_file.readline()
    acc_line = acc_file.readline()
    gyro_timestamp, gyro_data = splitLine(gyro_line)
    acc_timestamp, acc_data = splitLine(acc_line)
    acc_timestamp_last, acc_data_last = acc_timestamp, acc_data
    # 保证acc的第一条记录在第一条gyro记录之后
    while acc_timestamp < gyro_timestamp:
        acc_timestamp_last, acc_data_last = acc_timestamp, acc_data
        acc_line = acc_file.readline()
        acc_timestamp, acc_data = splitLine(acc_line)
    if acc_timestamp_last == acc_timestamp:
        acc_line = acc_file.readline()
        acc_timestamp, acc_data = splitLine(acc_line)
    # 丢弃gyro中的无效记录
    while not acc_timestamp_last <= gyro_timestamp <= acc_timestamp:
        gyro_line = gyro_file.readline()
        gyro_timestamp, gyro_data = splitLine(gyro_line)

    timestamp = []
    omega = [[], [], []]
    alpha = [[], [], []]
    while True:
        # if not acc_timestamp_last <= gyro_timestamp <= acc_timestamp:
        #     continue


        acc_data_interpolation = interpolation(acc_timestamp_last, acc_timestamp,
                                               gyro_timestamp, acc_data_last,
                                               acc_data)
        insertData(gyro_timestamp, gyro_data, acc_data_interpolation, timestamp,
                   omega, alpha)

        gyro_line = gyro_file.readline()
        if gyro_line == '':
            break
        gyro_timestamp, gyro_data = splitLine(gyro_line)

        acc_timestamp_last, acc_data_last = acc_timestamp, acc_data
        acc_line = acc_file.readline()
        if acc_line == '':
            break
        acc_timestamp, acc_data = splitLine(acc_line)

        while not acc_timestamp_last <= gyro_timestamp <= acc_timestamp:
            if acc_timestamp_last > gyro_timestamp:
                gyro_line = gyro_file.readline()
                if gyro_line == '':
                    break
                gyro_timestamp, gyro_data = splitLine(gyro_line)
            elif acc_timestamp < gyro_timestamp:
                acc_timestamp_last, acc_data_last = acc_timestamp, acc_data
                acc_line = acc_file.readline()
                if acc_line == '':
                    break
                acc_timestamp, acc_data = splitLine(acc_line)


    # 创建新文件,存储图片名与对应的数据
    csvData = pd.DataFrame({
                              'timestamp': timestamp,
                              'omega_x': omega[0],
                              'omega_y': omega[1],
                              'omega_z': omega[2],
                              'alpha_x': alpha[0],
                              'alpha_y': alpha[1],
                              'alpha_z': alpha[2]
    })
    # print(newData)
    save_path = os.path.join(bag_data_path)
    file_name = 'imu0.csv'

    # make sure dirPath/imu0 is exist
    if not os.path.exists(os.path.join(pc_path, save_path)):
        os.makedirs(os.path.join(pc_path, save_path))

    # create imuData.csv
    if not os.path.isfile(os.path.join(pc_path, save_path, file_name)):
        f = open(os.path.join(pc_path, save_path, file_name), 'w')
        f.close()
    csvData.to_csv(os.path.join(pc_path, save_path, file_name), index=False)

✅ 3. 高级方法(可选)
使用滤波器(如 Kalman/Savitzky-Golay)平滑后再对齐;

若硬件提供同步时间戳,可以直接根据其同步字段进行对齐;

ROS bag 可以用 message_filters::ApproximateTime 对齐 IMU 消息;

多传感器系统(如 Kalibr)用 spline 模型对齐。


网站公告

今日签到

点亮在社区的每一天
去签到