【java】websocket对接微软语音实时识别

发布于:2024-05-23 ⋅ 阅读:(32) ⋅ 点赞:(0)

1. pom依赖
<dependency>
    <groupId>com.microsoft.cognitiveservices.speech</groupId>
    <artifactId>client-sdk</artifactId>
    <version>1.37.0</version>
</dependency>
2. websocket接口
package com.learning;

import com.microsoft.cognitiveservices.speech.CancellationReason;
import com.microsoft.cognitiveservices.speech.ResultReason;
import com.microsoft.cognitiveservices.speech.SpeechConfig;
import com.microsoft.cognitiveservices.speech.SpeechRecognizer;
import com.microsoft.cognitiveservices.speech.audio.AudioConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;

@Slf4j
@Controller
@ServerEndpoint("/microsoft")
public class MicrosoftAsrSocketServer {
    private VoiceAudioStream voiceAudioStream;
    Semaphore stopTranslationWithFileSemaphore = new Semaphore(0);

    @OnOpen
    public void onOpen(Session session) {
        voiceAudioStream = new VoiceAudioStream();
        String queryString = session.getQueryString();
        SpeechConfig speechConfig = SpeechConfig.fromSubscription("你的token", "eastus");
        speechConfig.setSpeechRecognitionLanguage("zh-CN");
        AudioConfig audioConfig = AudioConfig.fromStreamInput(this.voiceAudioStream);
        SpeechRecognizer speechRecognizer = new SpeechRecognizer(speechConfig, audioConfig);
        speechRecognizer.recognizing.addEventListener((s, e) -> {
            System.out.println("RECOGNIZING: Text=" + e.getResult().getText());
            this.returnMessage(session, e.getResult().getText());
        });

        speechRecognizer.recognized.addEventListener((s, e) -> {
            if (e.getResult().getReason() == ResultReason.RecognizedSpeech) {
                System.out.println("RECOGNIZED: Text=" + e.getResult().getText());
            }
            else if (e.getResult().getReason() == ResultReason.NoMatch) {
                System.out.println("NOMATCH: Speech could not be recognized.");
            }
        });

        speechRecognizer.canceled.addEventListener((s, e) -> {
            System.out.println("CANCELED: Reason=" + e.getReason());

            if (e.getReason() == CancellationReason.Error) {
                System.out.println("CANCELED: ErrorCode=" + e.getErrorCode());
                System.out.println("CANCELED: ErrorDetails=" + e.getErrorDetails());
                System.out.println("CANCELED: Did you set the speech resource key and region values?");
            }

            stopTranslationWithFileSemaphore.release();
        });

        speechRecognizer.sessionStopped.addEventListener((s, e) -> {
            System.out.println("Session stopped event.");
            stopTranslationWithFileSemaphore.release();
        });

        try {
            speechRecognizer.startContinuousRecognitionAsync().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    @OnMessage
    public void onMessage(byte[] bytes) {
        voiceAudioStream.write(bytes, 0, bytes.length);
    }

    @OnClose
    public void onClose(Session session) throws IOException {
        session.close();
        voiceAudioStream.close();
        stopTranslationWithFileSemaphore.release();
    }

    @OnError
    public void onError(Session session, Throwable error) throws IOException {
        error.printStackTrace();
        voiceAudioStream.close();
        stopTranslationWithFileSemaphore.release();
    }

    /**
     * 回写消息
     * @param session
     * @param message
     */
    private void returnMessage(Session session, String message){
        if (session.isOpen()) {
            log.info("<=======写出数据:{}",message);
            try {
                if(!"".equals(message) && message != null){
                    session.getBasicRemote().sendText(message);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
3. 自定义输入流
package com.learning;
 
import lombok.extern.slf4j.Slf4j;
 
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ConcurrentLinkedDeque;

@Slf4j
public class EchoStream extends InputStream {
 
 
    private ManualResetEvent dataReady = new ManualResetEvent();
    private ConcurrentLinkedDeque<byte[]> buffers = new ConcurrentLinkedDeque<>();
 
 
    public Boolean dataAvailable(){
        return !buffers.isEmpty();
    }
 
    public  void write(byte[] buffer, int offset, int count)
    {
       // log.info("开始write,EchoStream");
        buffers.addLast(buffer);
        if(buffers.size()>1){
            dataReady.set();
        }
    }
 
    @Override
    public int read() throws IOException {
        return 0;
    }
 
    public byte[] getLBuffer(){
        if(buffers.size() != 0){
            return buffers.pollFirst();
        }
        return new byte[0];
    }
 
    public  InputStream Read(byte[] buffer, int offset, int count)
    {
        //log.info("开始read,EchoStream");
        try {
            if(buffers.size() == 0){
                dataReady.waitForever();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        byte[] lBuffer = buffers.pollFirst();
        if (lBuffer == null || lBuffer.length == 0)
        {
            dataReady.reset();
        }
 
        if (!dataAvailable()) {
            dataReady.reset();
        }
        buffer = lBuffer.clone();
        return new ByteArrayInputStream(buffer);
    }
 
    @Override
    public void close() throws IOException {
        super.close();
    }
}
4. 自定义控制
package com.learning;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  
  
public class ManualResetEvent {  
    private final Lock lock = new ReentrantLock();  
    private final Condition condition = lock.newCondition();  
    private AtomicBoolean isSet = new AtomicBoolean(false);
  
    public void set() {  
        lock.lock();  
        try {  
            isSet.set(true);  
            // 唤醒所有等待的线程  
            condition.signalAll();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    public void reset() {  
        lock.lock();  
        try {  
            isSet.set(false);  
            // 注意:我们在这里不唤醒任何线程,因为它们是等待信号被设置  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    public void waitForever() throws InterruptedException {  
        lock.lock();  
        try {  
            while (!isSet.get()) {  
                // 等待信号被设置  
                condition.await();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    // 也可以提供一个带有超时的等待方法  
    public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException {
        lock.lock();  
        try {  
            if (!isSet.get()) {  
                return condition.await(timeout, unit);  
            }  
            return true;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    // 注意:这里没有包含TimeUnit的导入,你需要添加它  
    // import java.util.concurrent.TimeUnit;  
  
    // 示例用法  
    public static void main(String[] args) throws InterruptedException {  
        ManualResetEvent event = new ManualResetEvent();  
  
        // 启动一个线程来模拟生产者设置事件  
        new Thread(() -> {  
            try {  
                Thread.sleep(1000); // 模拟耗时操作  
                event.set(); // 设置事件,允许等待的线程继续执行  
                Thread.sleep(1000); // 再次模拟耗时操作  
                event.reset(); // 重置事件,使等待的线程再次等待  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }).start();  
  
        // 主线程等待事件被设置
        // 等待事件被设置
        event.waitForever();

        // 在这里可以添加更多的逻辑来处理事件被重置的情况  
        // 例如,通过循环等待事件再次被设置  
        // 注意:在实际应用中,你可能需要处理更多的线程和更复杂的逻辑
    }

    public void close() {
        try {
            isSet.set(false);
            // 唤醒所有等待的线程
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}
5. 自定义语音流
package com.xiaoi.xics.avatar.api.interfaces;
 
import com.microsoft.cognitiveservices.speech.audio.PullAudioInputStreamCallback;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;

/**
 * 自定义语音流
 */
@Slf4j
public class VoiceAudioStream extends PullAudioInputStreamCallback {
 
    private EchoStream dataStream = new EchoStream();
    private ManualResetEvent waitForEmptyDataStream = null;
    private InputStream stream;

    /**
     * 服务从PullAudioInputStream中读取数据, 读到0个字节并不会关闭流
     */
    @Override
    public int read(byte[] dataBuffer){
        long  ret = 0;
        // 用户主动close时可以关闭流
        if (waitForEmptyDataStream != null && !dataStream.dataAvailable()){
            waitForEmptyDataStream.set();
            return 0;
        }
        try {
            if(this.stream != null){
                ret = this.stream.read(dataBuffer,0, dataBuffer.length);
                if((int)ret < 1){
                    this.stream = dataStream.Read(dataBuffer, 0, dataBuffer.length);
                    ret = this.stream.read(dataBuffer,0, dataBuffer.length);
                }
            }else{
                this.stream = dataStream.Read(dataBuffer, 0, dataBuffer.length);
                ret = this.stream.read(dataBuffer,0, dataBuffer.length);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return (int)Math.max(0, ret);
    }

    /**
     * Client向PullAudioInputStream写入数据
     */
    public void write(byte[] buffer, int offset, int count){
        dataStream.write(buffer, offset, count);
    }
 
    @Override
    public void close(){
        if (dataStream.dataAvailable()){
            // 通过ManualResetEvent强制流的使用者必须调用close来手动关闭流
            waitForEmptyDataStream = new ManualResetEvent();
            try {
                waitForEmptyDataStream.waitForever();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        if(waitForEmptyDataStream != null){
            this.waitForEmptyDataStream.close();
        }
        try {
            this.dataStream.close();
            this.stream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
6. 说明
  • 1.学习用途,切勿放入工作当中,警惕线程控制类。
  • 2.springboot集成websocket流程省略,前端连上websocket后打开麦克风传入语音输入流。
  • 3.亲测有效。自测参考地址:ws://127.0.0.1:8080/microsoft
  • 4.参考C#的实现过程的,点我跳转。