从音频到Token:构建原神角色语音识别模型的完整实践

发布于:2025-09-11 ⋅ 阅读:(28) ⋅ 点赞:(0)

本文将带你从零实现一个基于音频Token化的角色语音识别系统,完整复现原神角色语音分类任务,包含数据处理、模型训练和推理全流程。

音频波形通过滑动窗口转换为数值Token序列的过程

一、为什么需要音频Token化?

传统音频处理通常依赖MFCC、频谱图等特征,但这些方法:

  • 丢失原始波形细节
  • 难以与现代Transformer架构无缝集成
  • 特征工程复杂度高

创新思路:将原始音频波形直接转换为离散Token序列,使音频能像文本一样被语言模型处理。这种方法:

  • 保留完整波形信息
  • 兼容预训练语言模型架构
  • 简化特征工程流程

二、核心数据处理:wav_to_token函数详解

def wav_to_token(path):
    # 1. 音频标准化:零均值单位方差
    sr, audio = wavfile.read(path)
    left = (audio - np.mean(audio)) / np.std(audio)
    
    # 2. 生成参考信号(关键创新点)
    right = np.linspace(0.1, sr*np.pi, left.size)
    
    # 3. 滑动窗口特征提取
    sim_list = []
    slope_list = []
    for i in range(0, len(left) - 400, 400):
        x, y = left[i:i + 1200], right[i:i + 1200]
        # 确保窗口长度一致
        if len(x) > len(y): x = x[:len(y)]
        
        # 线性回归拟合波形趋势
        slope, intercept = linear_regression(x, y)
        
        # 计算拟合质量(余弦相似度)
        sim_list.append(cosine_similarity(slope * x + intercept, y))
        slope_list.append(slope)
    
    # 4. Token量化
    sim_list = min_max_normalize(sim_list) * 2**6  # 64级量化
    slope_list = min_max_normalize(slope_list) * 2**7  # 128级量化
    sim_list = sim_list.astype(np.int16)
    slope_list = slope_list.astype(np.int16)
    
    # 5. 生成最终Token
    res = sim_list * slope_list  # 特征融合
    return res[res != 0]  # 去除零值
关键设计解析:
  1. 双特征融合机制

    • sim_list:衡量原始波形与线性趋势的匹配度(0-63)
    • slope_list:表征波形局部斜率变化(0-127)
    • 通过乘法融合创建64×128=8192种独特Token
  2. 参考信号设计

    right = np.linspace(0.1, sr*np.pi, left.size)
    

    生成线性增长的参考信号,使回归斜率能反映波形变化速率,避免绝对数值干扰

  3. 滑动窗口参数

    • 窗口大小:1200样本(400步长)
    • 采样率影响:44.1kHz下窗口≈27ms(语音处理常用帧长)
可视化Token生成过程
# 在wav_to_token函数中添加
plt.figure(figsize=(12, 8))
plt.subplot(2,1,1)
plt.plot(left[:2400], 'b', label='原始波形')
plt.plot(right[:2400], 'r--', label='参考信号')
plt.legend()

plt.subplot(2,1,2)
plt.plot(sim_list, 'g', label='相似度特征')
plt.plot(slope_list, 'm', label='斜率特征')
plt.legend()
plt.savefig('token_generation.png', dpi=150)

三、词汇表构建:音频Token与文本标记的融合

# 特殊标记
voc = ["<|pad|>", "<|im_start|>", "<|im_end|>", "<|wav|>"]

# 添加角色名称(从目录名提取)
voc += [i.split("\\")[-1] for i in dirs]  # 如"胡桃", "钟离"

# 添加音频Token空间(0-8197)
voc += [str(i) for i in range(8198)]

# 创建ID映射
voc_x2id = {v: i for i, v in enumerate(voc)}
voc_id2x = {i: v for i, v in enumerate(voc)}
词汇表示例:
Token类型 示例 ID 用途
特殊标记 `< wav >`
角色名称 胡桃 12 分类目标
音频Token 1245 20 波形特征

四、数据集构建:序列化音频样本

# 样本结构:[起始标记] + [音频Token序列] + [角色ID] + [结束标记]
data_set = []
for path in paths:
    tokens = wav_to_token(path)
    token_idx = [voc_x2id[str(i)] for i in tokens]
    role_id = voc_x2id[path.split("\\")[-2]]  # 从路径提取角色名
    
    # 完整序列
    sample = [voc_x2id["<|im_start|>"]] + token_idx + [role_id] + [voc_x2id["<|im_end|>"]]
    data_set.append(sample)
序列化示例:
[3, 1245, 78, 309, ..., 8192, 12, 4] 
  ↑    ↑             ↑    ↑
  |    |             |    |
起始  音频特征      胡桃  结束

未来展望

  • 扩展到语音识别任务(添加字符级Token)
  • 结合对比学习提升特征表示
  • 部署到移动端实现实时角色识别

技术启示:当我们将音频视为“可读的文本”时,语音处理就变成了自然语言处理——这是通往统一多模态模型的重要一步。


附录:关键函数完整实现

import numpy as np
import pandas as pd
from tqdm import tqdm

from shua2 import wav_to_token
from glob import glob
from model3 import SamOut
# from system_model import SamOut
import torch
from torch import nn, optim
import time

torch.manual_seed(42)
np.random.seed(42)
dirs=glob("C:/Users/dfy918/Downloads/yuanshen_zip/yuanshen_zip/*")

paths = np.hstack([glob(dir+"/wav/*.wav") for dir in dirs])
voc = set([str(i) for i in range(8198)])

# voc = ["<|pad|>", "<|im_start|>", "<|im_end|>", "<|wav|>", "<|cat|>", "<|cow|>", "<|dog|>", "<|pig|>"]+[i.split("\\")[-1] for i  in dirs] + list(voc)
voc = ["<|pad|>","<|im_start|>","<|im_end|>", "<|wav|>"]+[i.split("\\")[-1] for i  in dirs] + list(voc)
voc_x2id = {v: i for i, v in enumerate(voc)}
voc_id2x = {i: v for i, v in enumerate(voc)}
voc_size = len(voc)
# data_set = pd.read_pickle("wav.pkl")
data_set = []
for path in tqdm(paths):
    tokens = wav_to_token(path)
    token_idx = []
    for i in tokens.astype("str").tolist():
        token_idx.append(voc_x2id[i])
    data_set.append([1] + token_idx + [voc_x2id[path.split("\\")[-2].split("/")[0]]]+[2])
pd.to_pickle(data_set, "wav.pkl")
np.random.shuffle(data_set)

train_data_set = data_set[:int(len(data_set) * 0.8)]
val_data_set = data_set[int(len(data_set) * 0.8):]
num_layers = 2
hidden_size = 2 ** 6 * num_layers
num_heads = num_layers
learning_rate = 0.001
batch_size = 32
num_epochs = 1000

model = SamOut(voc_size=voc_size, hidden_size=hidden_size, num_heads=num_heads, num_layers=num_layers)
params = 0
for i in model.parameters():
    if i.shape != torch.Size([]):
        params += i.numel()
print(f"Total parameters: {params}")

criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)



start_time = time.time()
for epoch in range(num_epochs):
    # 训练阶段
    np.random.shuffle(train_data_set)
    model.train()
    bar=tqdm(range(0, len(train_data_set), batch_size))
    for i in bar:
        batch_data = train_data_set[i:i + batch_size]
        # 填充批次使所有序列长度相同
        max_len = max(len(seq) for seq in batch_data)
        padded_batch = []
        for seq in batch_data:
            padded_seq = seq + [0] * (max_len - len(seq))  # 使用0(<|pad|>)进行填充
            padded_batch.append(padded_seq)

        data = torch.tensor(padded_batch, dtype=torch.long)

        input_tensor = data[:, :-1]
        target_tensor = data[:, 1:]

        output, _ = model(input_tensor)
        output = output.reshape(-1, voc_size)
        target_tensor = target_tensor.reshape(-1)

        loss = criterion(output, target_tensor)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        bar.set_description(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

    # 验证阶段
    model.eval()
    val_loss = 0
    correct = []
    total = 0

    with torch.no_grad():
        for i in range(0, len(val_data_set), batch_size):
            batch_data = val_data_set[i:i + batch_size]
            max_len = max(len(seq) for seq in batch_data)
            padded_batch = []
            for seq in batch_data:
                padded_seq = seq + [0] * (max_len - len(seq))  # 使用0(<|pad|>)进行填充
                padded_batch.append(padded_seq)

            data = torch.tensor(padded_batch, dtype=torch.long)

            input_tensor = data[:, :-1]
            target_tensor = data[:, 1:]

            output, _ = model(input_tensor)
            acc=np.mean((torch.argmax(output,-1)==target_tensor).numpy())
            correct.append(acc)
            output = output.reshape(-1, voc_size)
            target_tensor_flat = target_tensor.reshape(-1)

            # 计算验证损失
            val_loss += criterion(output, target_tensor_flat).item()

            # 计算准确率 - 只关注倒数第二个token(类别标记)
            # 获取每个序列的倒数第二个位置



    avg_val_loss = val_loss / (len(val_data_set) / batch_size)
    acc = np.mean(correct)

    print(
        f'Epoch [{epoch + 1}/{num_epochs}], Val Loss: {avg_val_loss:.4f}, Val Acc: {acc:.4f}, Time: {time.time() - start_time:.2f}s')

print("Training complete.")
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile

# 中文显示支持
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei']
plt.rcParams['axes.unicode_minus'] = False


def cosine_similarity(a, b):
    """计算余弦相似度"""
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))


def linear_regression(x, y):
    """最小二乘法线性回归"""
    n = len(x)
    sum_x, sum_y = np.sum(x), np.sum(y)
    sum_xy = np.sum(x * y)
    sum_x2 = np.sum(x ** 2)
    slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
    intercept = (sum_y - slope * sum_x) / n
    return slope, intercept


def min_max_normalize(data):
    min_val = np.min(data)
    max_val = np.max(data)
    return (data - min_val) / (max_val - min_val + 10 ** -8)




def wav_to_token(path):
    # 数据读取
    sr, audio = wavfile.read(path)
    left = (audio - np.mean(audio)) / np.std(audio)


    right = np.linspace(0.1,sr*np.pi,left.size)

    # 滑动窗口分析
    sim_list = []
    slope_list = []
    for i in range(0, len(left) - 400, 400):
        x, y = left[i:i + 1200], right[i:i + 1200]
        if len(x) > len(y): x = x[:len(y)]
        slope, intercept = linear_regression(x, y)
        sim_list.append(cosine_similarity(slope * x + intercept, y))
        slope_list.append(slope)

    # 可视化
    sim_list = np.array(sim_list)
    slope_list = np.array(slope_list)
    # token 化
    sim_list = min_max_normalize(sim_list) * 2 ** 6
    slope_list = min_max_normalize(slope_list) * 2 ** 7
    sim_list = sim_list.astype(np.int16)
    slope_list = slope_list.astype(np.int16)
    res = sim_list * slope_list

    return res[res != 0]

# 归一化 的最大值最小值应该是  全局考虑而不是 一段考虑 最好是精度极限


网站公告

今日签到

点亮在社区的每一天
去签到