241205_使用本地vosk模型实现流式语音识别
上一篇我们使用了vosk模型实现了本地的语音识别,但是存在一个严重的问题,因为我们采用的是整段音频录制结束之后再进行转文字并进行关键字检索,就会导致识别不实时,在环境噪音较为复杂(或者正在播放音乐时),我们说完了话他还在持续录音状态,识别太慢了,并且有时候他把音乐声音也录进去,导致识别错误。要解决这样的问题,只能采用流式语音识别。
流式语音识别就是,你这边一直说话,电脑这边边录入边识别,就不是整段音频录入再识别了,大大提高了识别效率。
和上一篇的代码相比,我们这里主要修改了weekup函数以及添加了stream_decode函数,并且把相关的逻辑处理方法单独拿出来写了一个函数process_command。采用的还是本地vosk模型。
前面的载入模型,初始化参数,打开音频流都是不变的
model = Model("vosk-model-cn-0.15")
SetLogLevel(-1)
FORMAT = pyaudio.paInt16 # 音频流的格式
RATE = 16000 # 采样率,单位Hz
CHUNK = 4000 # 单位帧
audio = pyaudio.PyAudio()
stream = audio.open(format=FORMAT,
channels=1,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
主要实现方法在这里:
在weekup方法中
创建一个队列 data_queue 来存储音频数据块,并启动一个线程 t 来运行 stream_decode 函数,将模型、队列和 mw 对象作为参数传递给该函数。
data_queue = queue.Queue()
t = threading.Thread(target=stream_decode, args=(model, data_queue, mw))
t.daemon = True
t.start()
不断从音频流中读取数据块,并将其放入队列 data_queue 中。
print("开始录音...")
while True:
data = stream.read(CHUNK, exception_on_overflow=False)
data_queue.put(data)
到这里实现了把我们的语音分块装入队列中,我们还要实现去读取出这个数据块进行语音识别。
编写解码方法stream_decode
初始化一个 Kaldi 识别器,并设置为输出单词级别的识别结果。
rec = KaldiRecognizer(model, 16000)
rec.SetWords(True)
不断从队列 data_queue 中获取音频数据块,并将其传递给识别器进行处理。如果识别器接受到完整的语音数据并生成识别结果,则解析 JSON 格式的识别结果,并调用 process_command 函数处理识别出的文本。
while True:
data = data_queue.get()
if rec.AcceptWaveform(data):
result = json.loads(rec.Result())
if 'text' in result and result['text']:
process_command(result['text'], mw)
因为我们转换出来的结果是单词级别的,所以可能会产生识别的每一个单独的词之间都有空格,我们在process_command方法中需要去除掉这些空格,重新把他拼接成一个句子,实现我们的关键词检索。
完整代码:
import json
import time
import pyaudio
import numpy as np
from vosk import Model, KaldiRecognizer, SetLogLevel
from baidu_tts import baidu_tts_test
from pypinyin import lazy_pinyin
import queue
import threading
dir_path = {
'浏览器': "C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe",
'记事本': "C:\\Windows\\System32\\notepad.exe",
'计算器': "C:\\Windows\\System32\\calc.exe"
}
def stream_decode(model, data_queue,mw):
rec = KaldiRecognizer(model, 16000)
rec.SetWords(True)
while True:
data = data_queue.get()
if rec.AcceptWaveform(data):
result = json.loads(rec.Result())
if 'text' in result and result['text']:
process_command(result['text'],mw)
def process_command(text,mw):
# print(text)
text=text.replace(" ","")
print(text)
if "小爱同学" in text:
answer = "我在"
answermethod(answer)
if "聊天" in text:
answer = "好的,请开始"
answermethod(answer)
from stt import run as stt_run
result = stt_run()
if "打开" in text:
app_name = text.replace("打开", '').strip()
if app_name in dir_path:
app_path = dir_path[app_name]
import subprocess
subprocess.Popen(app_path)
answer = "好的,已经为您打开了"
answermethod(answer)
if "歌" in text:
from testyuncloud import yuncloudMW
mw.playMusic()
# print("播放歌曲")
if "暂停" in text:
mw.playMusic()
if "播放" in text:
mw.playMusic()
if "上一首" in text:
mw.prevMusic()
if "下一首" in text:
mw.nextMusic()
if "音量" in text:
volumn = text.split('百分之')[1]
volumn = hanzi_to_number_pinyin(volumn)
print(volumn)
mw.volumeSet(volumn)
def answermethod(answer):
baidu_tts_test(answer)
time.sleep(1)
def weekup(mw):
model = Model("vosk-model-cn-0.15")
SetLogLevel(-1)
FORMAT = pyaudio.paInt16 # 音频流的格式
RATE = 16000 # 采样率,单位Hz
CHUNK = 4000 # 单位帧
audio = pyaudio.PyAudio()
stream = audio.open(format=FORMAT,
channels=1,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
data_queue = queue.Queue()
t = threading.Thread(target=stream_decode, args=(model, data_queue,mw))
t.daemon = True
t.start()
print("开始录音...")
while True:
data = stream.read(CHUNK, exception_on_overflow=False)
data_queue.put(data)
def hanzi_to_number_pinyin(hanzi_str):
hnd = {
'ling': 0, 'yi': 1, 'er': 2, 'san': 3, 'si': 4, 'wu': 5, 'liu': 6, 'qi': 7, 'ba': 8, 'jiu': 9,
'shi': 10, 'bai': 100, 'qian': 1000, 'wan': 10000, 'yi': 100000000
}
pinyin_list = lazy_pinyin(hanzi_str)
result = 0
temp = 0
for pinyin in pinyin_list:
if pinyin in hnd:
digit = hnd[pinyin]
if digit >= 10:
if temp == 0:
temp = 1
result += temp * digit
temp = 0
else:
temp = temp * 10 + digit
result += temp
return result