简单的springboot使用sse功能

发布于:2024-12-06 ⋅ 阅读:(62) ⋅ 点赞:(0)

什么是sse?

1、SSE 是Server-Sent Events(服务器发送事件)

2、SSE是一种允许服务器主动向客户端推送实时更新的技术。

3、它基于HTTP协议,并使用了其长连接特性,在客户端与服务器之间建立一条持久化的连接。 通过这条连接,服务器可以实时地向客户端发送事件流,而客户端可以监听这些事件并作出相应的处理。

4、SSE是单向通信机制,即只能由服务器向客户端推送数据,客户端不能通过SSE向服务器发送数据。

5、SSE在现代浏览器和移动设备上得到了广泛的支持,是实现实时Web应用的一种有效方式。

使用流程(经测试,此方式不会丢失消息,靠谱能用!

1、引入springboot的web基本依赖,这里不细说

2、controller中

 /**
     * 订阅sse消息
     *
     * @return
     */
    @CrossOrigin
    @RequestMapping(path = "/subscribe/{userId}")
    public SseEmitter subscribe(@PathVariable String userId) {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        return SSEServer.connect(userId);
    }

3、SSEServer类

package com.orison.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
 * @ClassName SSEServer
 * @Description TODO
 * @Author xiaoli
 * @Date 2022-10-26 18:00
 * @Version 1.0
 **/
@Slf4j
public class SSEServer {

    /**
     * 当前连接数
     */
    private static AtomicInteger count = new AtomicInteger(0);

    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    public static SseEmitter connect(String userId){
        //设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常
        SseEmitter sseEmitter = new SseEmitter(0L);
        //注册回调
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeOutCallBack(userId));
        sseEmitterMap.put(userId,sseEmitter);
        //数量+1
        count.getAndIncrement();
        log.info("create new sse connect ,current user:{}",userId);
        return sseEmitter;
    }
    /**
     * 给指定用户发消息
     */
    public static void sendMessage(String userId, String message){
        if(sseEmitterMap.containsKey(userId)){
            try{
                sseEmitterMap.get(userId).send(message);
            }catch (IOException e){
                log.error("user id:{}, send message error:{}",userId,e.getMessage());
                log.error("Exception:",e);
            }
        }
    }

    /**
     * 想多人发送消息,组播
     */
    public static void groupSendMessage(String groupId, String message){
        if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){
            sseEmitterMap.forEach((k,v) -> {
                try{
                    if(k.startsWith(groupId)){
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                }catch (IOException e){
                    log.error("user id:{}, send message error:{}",groupId,message);
                    removeUser(k);
                }
            });
        }
    }
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k,v)->{
            try{
                v.send(message,MediaType.APPLICATION_JSON);
            }catch (IOException e){
                log.error("user id:{}, send message error:{}",k,e.getMessage());
                removeUser(k);
            }
        });
    }
    /**
     * 群发消息
     */
    public static void batchSendMessage(String message, Set<String> userIds){
        userIds.forEach(userId->sendMessage(userId,message));
    }
    public static void removeUser(String userId){
        sseEmitterMap.remove(userId);
        //数量-1
        count.getAndDecrement();
        log.info("remove user id:{}",userId);
    }
    public static List<String> getIds(){
        return new ArrayList<>(sseEmitterMap.keySet());
    }
    public static int getUserCount(){
        return count.intValue();
    }
    private static Runnable completionCallBack(String userId) {
        return () -> {
            log.info("结束连接,{}",userId);
            removeUser(userId);
        };
    }
    private static Runnable timeOutCallBack(String userId){
        return ()->{
            log.info("连接超时,{}",userId);
            removeUser(userId);
        };
    }
    private static Consumer<Throwable> errorCallBack(String userId){
        return throwable -> {
            log.error("连接异常,{}",userId);
            removeUser(userId);
        };
    }
}

 

4、前端

<script>

        function createEventSource() {
            const eventSource = new EventSource('http://localhost:13330/device/cameraDevice/subscribe/'+getRandomString(5));

            eventSource.onmessage = function(event) {
                console.log("sse连接中");
                if (event.data){
                    console.log(event);
                   //这里就是请求streamEvents接口返回的值,此时就可以通过Ajax展示出来了
                }
            };


            eventSource.onerror = function(event) {
                console.error("sse连接失败,每5秒尝试重新连接");
                // 关闭当前 EventSource 实例
                eventSource.close();
                // 尝试在 5 秒后重新连接(可以根据需要调整重连间隔)
                setTimeout(createEventSource, 5000);
            };

            return eventSource;
        }

        // 初始化 EventSource 连接
        createEventSource();

function getRandomString(len) {
            const _charStr = 'abacdefghjklmnopqrstuvwxyzABCDEFGHJKLMNOPQRSTUVWXYZ0123456789';
            let min = 0, max = _charStr.length - 1, _str = '';
            //判断是否指定长度,否则默认长度为15
            len = len || 15;
            //循环生成字符串
            for (var i = 0, index; i < len; i++) {
                index = RandomIndex(min, max, i);
                _str += _charStr[index];
            }
            return _str;
        }

        /**
         * 随机生成索引
         * @param min 最小值
         * @param max 最大值
         * @param i 当前获取位置
         */
        function RandomIndex(min, max, i) {
            const _charStr = 'abacdefghjklmnopqrstuvwxyzABCDEFGHJKLMNOPQRSTUVWXYZ0123456789';
            let index = Math.floor(Math.random() * (max - min + 1) + min),
                numStart = _charStr.length - 10;
            //如果字符串第一位是数字,则递归重新获取
            if (i == 0 && index >= numStart) {
                index = RandomIndex(min, max, i);
            }
            //返回最终索引值
            return index;
        }
    </script>


网站公告

今日签到

点亮在社区的每一天
去签到

热门文章