纯onnxruntime运行SileroVad——基于机器学习的高性能VAD

发布于:2024-12-05 ⋅ 阅读:(71) ⋅ 点赞:(0)

什么是VAD?

VAD(Voice Activity Detection,语音活动检测)是一种技术,用于识别音频流中语音和非语音的区域。它能够区分语音和背景噪声,从而提高后续处理的效率和准确性。

VAD的主要功能

  1. 检测语音区域:识别音频流中的语音部分,判断对话是否结束。
  2. 过滤静音和噪声:去除静音、背景噪音和音乐等非语音部分。
  3. 减少处理成本:避免对无关音频进行进一步处理(例如,语音识别或传输),从而提高系统效率。

SileroVAD

传统上,VAD库如WebRTC VAD是Google提供的一种算法实现。随着AI技术的发展,基于机器学习的VAD库应运而生,其中SileroVAD就是今天的主角🔗
image.png
从图中可以看出,SileroVAD在召回率和精准率方面均优于WebRTC VAD。

纯ONNXRuntime运行

原始官方文档仅提供了基于PyTorch的实现。由于PyTorch主要是一个训练框架,依赖性较多,且在生产环境中往往过于庞大(Linux环境通常需要几个GB的空间)。为了解决这个问题,我选择使用微软推出的轻量级推理框架——ONNX Runtime。与PyTorch相比,ONNX Runtime的推理速度更快,依赖性更小,仅为28MB。

实现

我实现了一个名为SileroVadDetector的工具,它支持对单个音频片段(chunk)进行语音检测,同时也能进行流式语音中断检测。该工具提供了调整灵敏度和时间窗口等配置选项。

经过测试(在MacBook M1,Python 3.12环境下),单线程运行时约占用3%的CPU资源。如果需求较高,可以通过多进程优化来提高并发处理能力。

from typing import Final

import numpy as np
import onnxruntime


_RATE: Final = 16000  # Khz
_MAX_WAV: Final = 32767

_ONNX_PATH: Final = "Silero_VAD_16k.onnx"
_CONTEXT_SIZE: Final = 64  # 16Khz
_CHUNK_SAMPLES: Final = 512
_CHUNK_BYTES: Final = _CHUNK_SAMPLES * 2  # 16-bit
_PER_CHUNK_TIME = _CHUNK_SAMPLES / _RATE


class SileroVadDetector:
    """Detects speech/silence using Silero VAD.

    https://github.com/snakers4/silero-vad
    """

    _shared_session = None

    @property
    def session(self) -> onnxruntime.InferenceSession:
        if self._shared_session is None:
            opts = onnxruntime.SessionOptions()
            opts.inter_op_num_threads = 1
            opts.intra_op_num_threads = 1
            self._shared_session = onnxruntime.InferenceSession(
                _ONNX_PATH, providers=["CPUExecutionProvider"], sess_options=opts
            )
        return self._shared_session

    def __init__(
        self,
        prob_threshold: float = 0.5,
        silence_threshold: float = 4.0,
    ) -> None:
        """Initialize the Silero VAD detector.

        Args:
            prob_threshold (float, optional): 声音检测敏感度,越高对人声可能性要求越高。
            silence_threshold (float, optional): 人声停止检测窗口时间,用于判断是否出现了人声停止。
        """
        self._context = np.zeros((1, _CONTEXT_SIZE), dtype=np.float32)
        self._state = np.zeros((2, 1, 128), dtype=np.float32)
        self._sr = np.array(_RATE, dtype=np.int64)
        self.prob_threshold = prob_threshold
        self.silence_threshold = silence_threshold
        self.buffer = bytearray()
        self.silence_start_time = None
        self.activity_flag = False
        self.chunk_count = 0
        self.last_end = 0

    def process_chunk(self, audio: bytes) -> float:
        """Return probability of speech [0-1] in a single audio chunk."""
        if len(audio) != _CHUNK_BYTES:
            # Window size is fixed at 512 samples in v5
            raise Exception("Invalid chunk size")

        audio_array = np.frombuffer(audio, dtype=np.int16).astype(np.float32) / _MAX_WAV

        # Add batch dimension and context
        audio_array = np.concatenate(
            (self._context, audio_array[np.newaxis, :]), axis=1
        )
        self._context = audio_array[:, -_CONTEXT_SIZE:]

        ort_inputs = {
            "input": audio_array[:, : _CHUNK_SAMPLES + _CONTEXT_SIZE],
            "state": self._state,
            "sr": self._sr,
        }
        ort_outs = self.session.run(None, ort_inputs)
        out, self._state = ort_outs

        return out.squeeze()

    def need_switch_speaker(self, audio_chunk: bytes) -> bool:
        """
        Process an audio chunk and return True if it is time to switch speakers.

        This function buffers the audio chunks, processes them in chunks of
        `_CHUNK_BYTES` size, and checks if the probability of speech is above
        `prob_threshold`. If it is, it resets the silence start time and sets the
        activity flag to True. If the activity flag is set and the probability of
        speech is below `prob_threshold`, it checks if the current time minus the
        silence start time is greater than or equal to `silence_threshold`. If it
        is, it returns True. Otherwise, it returns False.

        Args:
            audio_chunk (bytes): The audio chunk to process

        Returns:
            bool: Whether it is time to switch speakers
        """
        self.buffer.extend(audio_chunk)

        while len(self.buffer) >= _CHUNK_BYTES:
            chunk = bytes(self.buffer[:_CHUNK_BYTES])
            prob = self.process_chunk(chunk)
            current_time = self.chunk_count * _PER_CHUNK_TIME

            if prob >= self.prob_threshold:
                if self.silence_start_time:
                    self.silence_start_time = None

                if not self.activity_flag:
                    self.activity_flag = True

            elif self.activity_flag:
                if self.silence_start_time:
                    if current_time - self.silence_start_time >= self.silence_threshold:
                        return True
                else:
                    self.silence_start_time = current_time

            self.buffer = self.buffer[_CHUNK_BYTES:]
            self.chunk_count += 1

        return False


网站公告

今日签到

点亮在社区的每一天
去签到

热门文章