要将 加速度计数据(Accelerometer) 和 陀螺仪数据(Gyroscope) 对齐时间戳,有几种常用方法,取决于你需要的精度、数据频率是否一致、是否异步采样等。下面是几个常用的方法
✅ 1. 简单最近邻插值法(常用)
📌 适合:两种数据频率接近,但时间戳不完全对齐
假设你有:
acc_data: [(t0, ax0, ay0, az0), (t1, ax1, ay1, az1), ...]
gyro_data: [(t0', wx0, wy0, wz0), (t1', wx1, wy1, wz1), ...]
你想得到统一时间戳的数据对:
(t, acc_interp, gyro_interp)
可选择以加速度计或陀螺仪为主时间戳,使用最近的对方数据或线性插值对齐。
✅ 方法一:线性插值(推荐)
C++伪代码(适合 IMU 融合、预积分等场景):
// 假设 accData 是 std::vector<pair<double, Vector3d>>,gyroData 也是类似结构
Vector3d interpolate(const std::vector<std::pair<double, Vector3d>>& data, double t_query) {
for (size_t i = 1; i < data.size(); ++i) {
if (t_query < data[i].first) {
double t0 = data[i - 1].first;
double t1 = data[i].first;
Vector3d v0 = data[i - 1].second;
Vector3d v1 = data[i].second;
double alpha = (t_query - t0) / (t1 - t0);
return (1 - alpha) * v0 + alpha * v1;
}
}
return data.back().second; // fallback
}
for (const auto& acc : accData) {
double t = acc.first;
Vector3d acc_val = acc.second;
Vector3d gyro_val = interpolate(gyroData, t);
// 保存 (t, acc_val, gyro_val)
}
✅ 方法二:最近邻(粗略一点)
适合快速预处理:
Vector3d find_nearest(const std::vector<std::pair<double, Vector3d>>& data, double t_query) {
double min_diff = 1e6;
Vector3d best_val;
for (const auto& d : data) {
double dt = std::abs(d.first - t_query);
if (dt < min_diff) {
min_diff = dt;
best_val = d.second;
}
}
return best_val;
}
✅ 2. Python 示例:用 numpy 插值
def align_acc_gyro(pc_path, gyro_name, acc_name, bag_data_path):
def splitLine(line):
# split a data to timestamp, [data_x, data_y, data_z]
if line[-1] == '\n':
line = line.replace(',\n', '')
else:
#line = line[:-1]
return 0.0, np.empty(0)
data_split = line.split(',')
print(data_split)
data_split = [data.split()[0] for data in data_split]
timestamp = data_split[0]
del data_split[0]
data_split = [float(data) for data in data_split]
timestamp = int(timestamp)
return timestamp, np.array(data_split)
def interpolation(acc_timestamp_last, acc_timestamp, gyro_timestamp,
acc_data_last, acc_data):
# 以gyro时间为基准,将acc进行线性插值
print("acc_timestamp_last =", acc_timestamp_last)
print("gyro_timestamp =", gyro_timestamp)
print("acc_timestamp =", acc_timestamp)
ratio = float(gyro_timestamp - acc_timestamp_last) / float(acc_timestamp - acc_timestamp_last)
print('ratio = ', ratio)
if not acc_timestamp_last <= gyro_timestamp <= acc_timestamp:
print('error')
result = ratio * (acc_data - acc_data_last) + acc_data_last
print(acc_data_last)
print(result)
print(acc_data)
return result
def insertData(gyro_timestamp, gyro_data, acc_data_interpolation, timestamp, omega, alpha):
timestamp.append(gyro_timestamp)
for i in range(3):
omega[i].append(gyro_data[i])
alpha[i].append(acc_data_interpolation[i])
if not os.path.exists(pc_path):
print('dirPath not exist')
exit(-1)
# open files
try:
acc_file = open(os.path.join(pc_path, acc_name))
gyro_file = open(os.path.join(pc_path, gyro_name))
except:
print("fail to open")
exit(-1)
# align acc to gyro, skip the acc_data which timestamp is small than the first gyro
gyro_line = gyro_file.readline()
acc_line = acc_file.readline()
gyro_timestamp, gyro_data = splitLine(gyro_line)
acc_timestamp, acc_data = splitLine(acc_line)
acc_timestamp_last, acc_data_last = acc_timestamp, acc_data
# 保证acc的第一条记录在第一条gyro记录之后
while acc_timestamp < gyro_timestamp:
acc_timestamp_last, acc_data_last = acc_timestamp, acc_data
acc_line = acc_file.readline()
acc_timestamp, acc_data = splitLine(acc_line)
if acc_timestamp_last == acc_timestamp:
acc_line = acc_file.readline()
acc_timestamp, acc_data = splitLine(acc_line)
# 丢弃gyro中的无效记录
while not acc_timestamp_last <= gyro_timestamp <= acc_timestamp:
gyro_line = gyro_file.readline()
gyro_timestamp, gyro_data = splitLine(gyro_line)
timestamp = []
omega = [[], [], []]
alpha = [[], [], []]
while True:
# if not acc_timestamp_last <= gyro_timestamp <= acc_timestamp:
# continue
acc_data_interpolation = interpolation(acc_timestamp_last, acc_timestamp,
gyro_timestamp, acc_data_last,
acc_data)
insertData(gyro_timestamp, gyro_data, acc_data_interpolation, timestamp,
omega, alpha)
gyro_line = gyro_file.readline()
if gyro_line == '':
break
gyro_timestamp, gyro_data = splitLine(gyro_line)
acc_timestamp_last, acc_data_last = acc_timestamp, acc_data
acc_line = acc_file.readline()
if acc_line == '':
break
acc_timestamp, acc_data = splitLine(acc_line)
while not acc_timestamp_last <= gyro_timestamp <= acc_timestamp:
if acc_timestamp_last > gyro_timestamp:
gyro_line = gyro_file.readline()
if gyro_line == '':
break
gyro_timestamp, gyro_data = splitLine(gyro_line)
elif acc_timestamp < gyro_timestamp:
acc_timestamp_last, acc_data_last = acc_timestamp, acc_data
acc_line = acc_file.readline()
if acc_line == '':
break
acc_timestamp, acc_data = splitLine(acc_line)
# 创建新文件,存储图片名与对应的数据
csvData = pd.DataFrame({
'timestamp': timestamp,
'omega_x': omega[0],
'omega_y': omega[1],
'omega_z': omega[2],
'alpha_x': alpha[0],
'alpha_y': alpha[1],
'alpha_z': alpha[2]
})
# print(newData)
save_path = os.path.join(bag_data_path)
file_name = 'imu0.csv'
# make sure dirPath/imu0 is exist
if not os.path.exists(os.path.join(pc_path, save_path)):
os.makedirs(os.path.join(pc_path, save_path))
# create imuData.csv
if not os.path.isfile(os.path.join(pc_path, save_path, file_name)):
f = open(os.path.join(pc_path, save_path, file_name), 'w')
f.close()
csvData.to_csv(os.path.join(pc_path, save_path, file_name), index=False)
✅ 3. 高级方法(可选)
使用滤波器(如 Kalman/Savitzky-Golay)平滑后再对齐;
若硬件提供同步时间戳,可以直接根据其同步字段进行对齐;
ROS bag 可以用 message_filters::ApproximateTime 对齐 IMU 消息;
多传感器系统(如 Kalibr)用 spline 模型对齐。