java配置webSocket、前端使用uniapp连接

发布于:2025-05-20 ⋅ 阅读:(13) ⋅ 点赞:(0)

一、这个管理系统是基于若依框架,配置webSocKet的maven依赖

<!--websocket-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

二、配置类配置webSocket的端点和相关的参数

1、WebSocketConfig - webSocket配置类

注意:ws://yourdomain:port/ws/order?token=yourTokenValue。
可以使用cpolar 工具把IP地址解析成可访问的域名。

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private WebSocketHandler webSocketHandler;

    /**
     * 注册websocket的端点
     * 客户端连接格式: ws://yourdomain:port/ws/order?token=yourTokenValue
     * token参数必须提供,系统会通过token从Redis获取对应的openId用于用户识别
     * @param registry WebSocketHandlerRegistry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(webSocketHandler, "/ws/order")
                .setAllowedOrigins("*"); // 允许跨域访问
    }
    
    /**
     * 配置WebSocket服务器的参数
     * 包括:连接超时时间、心跳超时时间、最大消息大小等
     * @return ServletServerContainerFactoryBean
     */
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        // 设置异步发送超时时间为25秒
        container.setAsyncSendTimeout(25000L);
        // 设置最大会话空闲时间为60秒
        container.setMaxSessionIdleTimeout(60000L);
        // 设置最大文本消息缓冲区大小为8KB
        container.setMaxTextMessageBufferSize(8192);
        // 设置最大二进制消息缓冲区大小为8KB
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }
}

2、WebSocketHandler - webSocket处理器
 

@Component
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    // 用线程安全的集合来管理所有连接的 WebSocket 会话
    private static final Set<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
    
    // 使用ConcurrentHashMap来存储openId到session的映射关系
    private static final Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();
    
    // 使用ConcurrentHashMap来存储session到openId的映射关系(反向映射)
    private static final Map<WebSocketSession, String> sessionUsers = new ConcurrentHashMap<>();
    
    // 记录每个session最后一次活跃时间
    private static final Map<String, Long> sessionLastActiveTime = new ConcurrentHashMap<>();
    
    // 心跳检查的定时任务执行器
    private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
    
    // 心跳超时时间,单位毫秒
    private static final long HEARTBEAT_TIMEOUT = 50000L; // 50秒
    
    // 用于解析JSON的对象映射器
    private static final ObjectMapper objectMapper = new ObjectMapper();


    /**
     * 构造方法,启动心跳检测任务
     */
    public WebSocketHandler() {
        // 每15秒检查一次心跳
        heartbeatScheduler.scheduleAtFixedRate(this::checkHeartbeats, 15, 15, TimeUnit.SECONDS);
    }

    /**
     * 心跳检查方法,清理那些超时的连接
     */
    private void checkHeartbeats() {
        long currentTime = System.currentTimeMillis();
        
        for (Map.Entry<String, Long> entry : sessionLastActiveTime.entrySet()) {
            String openId = entry.getKey();
            long lastActive = entry.getValue();
            
            // 如果超过超时时间没有活动,则关闭会话
            if (currentTime - lastActive > HEARTBEAT_TIMEOUT) {
                WebSocketSession session = userSessions.get(openId);
                if (session != null && session.isOpen()) {
                    try {
                        log.warn("会话心跳超时,主动断开连接 - openId: {}, 上次活跃: {}ms前", 
                                openId, currentTime - lastActive);
                        session.close(CloseStatus.NORMAL);
                    } catch (IOException e) {
                        log.error("关闭超时WebSocket会话异常 - openId: {}, 错误: {}", openId, e.getMessage());
                    } finally {
                        // 确保从会话映射中移除
                        sessions.remove(session);
                        sessionUsers.remove(session);
                        userSessions.remove(openId);
                        sessionLastActiveTime.remove(openId);
                    }
                } else {
                    // 会话已关闭或不存在,直接清理
                    userSessions.remove(openId);
                    sessionLastActiveTime.remove(openId);
                }
            }
        }
    }

    /**
     * 新客户端连接时,加入到 sessions 集合中
     * @param session 会话
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sessions.add(session);
        // 从URL中获取token参数,格式应为 /ws/order?token=xxx
        String token = extractToken(session);
        if (token != null) {
            // 从Redis中获取对应的openId
            String openId = getOpenIdFromToken(token);
            if (openId != null) {
                userSessions.put(openId, session);
                sessionUsers.put(session, openId);
                sessionLastActiveTime.put(openId, System.currentTimeMillis()); // 记录初始活跃时间
                log.info("WebSocket连接已建立 - token: {}, openId: {}, 当前连接数: {}", 
                         token, openId, sessions.size());
            } else {
                log.warn("找不到token对应的openId,token可能已过期 - token: {}", token);
                // 可以选择关闭这个无效的连接
                session.close(CloseStatus.NOT_ACCEPTABLE);
            }
        } else {
            log.warn("WebSocket连接未提供token参数,无法识别用户");
            // 可以选择关闭这个无效的连接
            session.close(CloseStatus.NOT_ACCEPTABLE);
        }
    }

    /**
     * 客户端断开连接时,从 sessions 集合中移除
     * @param session 会话
     * @param status
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        sessions.remove(session);
        // 从用户会话映射中也移除
        String openId = sessionUsers.remove(session);
        if (openId != null) {
            userSessions.remove(openId);
            sessionLastActiveTime.remove(openId);
            log.info("WebSocket连接已关闭 - openId: {}, 状态: {}", openId, status);
        }
    }
    
    /**
     * 处理收到的文本消息
     * 对于心跳消息进行特殊处理
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String openId = sessionUsers.get(session);
        String payload = message.getPayload();
        
        try {
            // 尝试解析为JSON
            JsonNode jsonNode = objectMapper.readTree(payload);
            
            // 检查是否是心跳消息
            if (jsonNode.has("type") && "ping".equals(jsonNode.get("type").asText())) {
                // 更新最后活跃时间
                if (openId != null) {
                    sessionLastActiveTime.put(openId, System.currentTimeMillis());
                }
                
                // 发送pong响应
                session.sendMessage(new TextMessage("{\"type\":\"pong\",\"time\":" + System.currentTimeMillis() + "}"));
                return;
            }
        } catch (Exception e) {
            // 不是JSON格式的消息,忽略错误继续处理
        }
        
        // 更新最后活跃时间
        if (openId != null) {
            sessionLastActiveTime.put(openId, System.currentTimeMillis());
        }
        
        log.debug("收到消息 - openId: {}, 内容: {}", openId, payload);
        
        // 在这里可以添加其他消息处理逻辑
    }
    
    /**
     * 从WebSocketSession中提取token
     * @param session WebSocket会话
     * @return token,如果不存在则返回null
     */
    private String extractToken(WebSocketSession session) {
        String query = session.getUri().getQuery();
        if (query != null && query.contains("token=")) {
            String[] params = query.split("&");
            for (String param : params) {
                String[] keyValue = param.split("=");
                if (keyValue.length == 2 && "token".equals(keyValue[0])) {
                    log.info("WebSocket连接已获取token - token: {}", keyValue[1]);
                    return keyValue[1];
                }
            }
        }
        return null;
    }
    
    /**
     * 从token获取对应的openId
     * @param token 用户token
     * @return openId,如果token无效则返回null
     */
    private String getOpenIdFromToken(String token) {
        if (token == null || token.isEmpty()) {
            return null;
        }
        try {
            // 从Redis中获取token对应的openId
            return stringRedisTemplate.opsForValue().get(WECHAT_KEY + token);
        } catch (Exception e) {
            log.error("从Redis获取token信息异常 - token: {}, 错误: {}", token, e.getMessage());
            return null;
        }
    }

    /**
     * 发送支付成功的通知给所有连接的客户端
     * @param message 消息体
     */
    public void sendPaymentSuccessNotification(String message) {
        for (WebSocketSession session : sessions) {
            try {
                // 通过 WebSocket 向每个客户端发送消息
                session.sendMessage(new TextMessage(message));
            } catch (IOException e) {
                log.error("发送支付成功通知失败", e);
            }
        }
    }

    /**
     * 向指定用户发送消息
     * @param openId 用户的openId
     * @param message 消息内容
     * @return 是否发送成功
     */
    public boolean sendMessageToUser(String openId, String message) {
        WebSocketSession session = userSessions.get(openId);
        if (session != null && session.isOpen()) {
            try {
                session.sendMessage(new TextMessage(message));
                log.info("消息已发送给用户 - openId: {}", openId);
                return true;
            } catch (IOException e) {
                log.error("发送消息给用户失败 - openId: {}", openId, e);
                return false;
            }
        } else {
            log.info("用户未通过WebSocket连接 - openId: {}", openId);
            return false;
        }
    }
    
    /**
     * 向所有用户发送心跳检测消息
     */
    public void sendHeartbeat() {
        String heartbeatMsg = "{\"type\":\"heartbeat\",\"time\":" + System.currentTimeMillis() + "}";
        for (WebSocketSession session : sessions) {
            if (session.isOpen()) {
                try {
                    session.sendMessage(new TextMessage(heartbeatMsg));
                } catch (IOException e) {
                    log.error("发送心跳消息失败", e);
                }
            }
        }
    }
}

注意:这里发送消息给指定用户需要前端传递token,获取存储在redis中的openId(微信小程序用户标识)

3、发送消息我定义了一个定时器发送消息和心跳测试

3.1、根据自己业务封装的消息体
 

@ApiModel(value = "MessageVo",discriminator = "websocket的消息体")
public class MessageVo {

    @ApiModelProperty(value = "消息标题",dataType = "string")
    private String title;

    @ApiModelProperty(value = "消息内容",dataType = "string")
    private String content;

    @ApiModelProperty(value = "车牌号码",dataType = "string")
    private String plateNumber;

    @ApiModelProperty(value = "订单编号",dataType = "string")
    private String orderNumber;

    @ApiModelProperty(value = "创建时间",dataType = "date")
    private Date createTime;

}
/**
     * 定时发送提醒消息给待过磅状态的用户
     * 每1分钟执行一次,提醒用户进行过磅操作
     */
    public void sendWeighingReminder() {
        log.info("开始执行待过磅用户提醒任务");
        
        try {
            // 查询所有待过磅的订单
            WeighingRecords pendingQuery = new WeighingRecords();
            pendingQuery.setStatus(0L); // 待过磅
            List<WeighingRecords> pendingWeighingOrders = weighingRecordsMapper.selectWeighingRecordsList(pendingQuery);
            
            // 如果没有待过磅订单,直接返回
            if (pendingWeighingOrders == null || pendingWeighingOrders.isEmpty()) {
                log.info("没有查询到待过磅订单,跳过发送提醒");
                return;
            }
            
            log.info("查询到 {} 条待过磅订单,开始发送提醒", pendingWeighingOrders.size());
            int successCount = 0;
            
            // 遍历所有待过磅订单,发送提醒消息
            for (WeighingRecords order : pendingWeighingOrders) {
                // 检查是否有有效的openId
                String openId = order.getOpenId();
                if (openId == null || openId.trim().isEmpty()) {
                    log.warn("订单 {} 缺少有效的openId,无法发送提醒", order.getOrderNumber());
                    continue;
                }
                
                // 创建消息体
                MessageVo messageVo = new MessageVo();
                messageVo.setTitle("过磅提醒");
                messageVo.setContent("您有一条待过磅的订单,请及时前往过磅点进行过磅操作。");
                messageVo.setOrderNumber(order.getOrderNumber());
                messageVo.setPlateNumber(order.getPlateNumber()); // 设置车牌号
                messageVo.setCreateTime(DateUtils.getNowDate());
                
                try {
                    // 转换为JSON字符串
                    String messageJson = objectMapper.writeValueAsString(messageVo);
                    
                    // 直接使用openId发送消息(WebSocketHandler内部会通过openId查找对应的会话)
                    boolean sent = webSocketHandler.sendMessageToUser(openId, messageJson);
                    
                    if (sent) {
                        successCount++;
                        log.info("成功向用户 {} 发送过磅提醒消息,订单号: {}", openId, order.getOrderNumber());
                    } else {
                        log.info("用户 {} 未连接WebSocket,无法发送过磅提醒消息,订单号: {}", openId, order.getOrderNumber());
                    }
                } catch (JsonProcessingException e) {
                    log.error("消息序列化异常,订单号: {}, 错误: {}", order.getOrderNumber(), e.getMessage());
                } catch (Exception e) {
                    log.error("发送消息异常,订单号: {}, 错误: {}", order.getOrderNumber(), e.getMessage());
                }
            }
            
            log.info("过磅提醒任务完成,共尝试: {} 条,成功: {} 条", pendingWeighingOrders.size(), successCount);
        } catch (Exception e) {
            log.error("过磅提醒任务异常: {}", e.getMessage(), e);
        }
    }
    
    /**
     * 定期发送心跳消息,保持WebSocket连接活跃
     * 每25秒执行一次,低于WebSocketConfig中设置的60秒超时时间
     */
    public void sendHeartbeat() {
        log.debug("开始执行WebSocket心跳任务");
        try {
            webSocketHandler.sendHeartbeat();
            log.debug("WebSocket心跳消息发送完成");
        } catch (Exception e) {
            log.error("WebSocket心跳任务异常: {}", e.getMessage(), e);
        }
    }

4、由于这个管理系统是基于若依所以需要配置鉴权,否则会被拦截
这个是部分配置代码

@Bean
    protected SecurityFilterChain filterChain(HttpSecurity httpSecurity) throws Exception
    {
        return httpSecurity
            // CSRF禁用,因为不使用session
            .csrf(csrf -> csrf.disable())
            // 禁用HTTP响应标头
            .headers((headersCustomizer) -> {
                headersCustomizer.cacheControl(cache -> cache.disable()).frameOptions(options -> options.sameOrigin());
            })
            // 认证失败处理类
            .exceptionHandling(exception -> exception.authenticationEntryPoint(unauthorizedHandler))
            // 基于token,所以不需要session
            .sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
            // 注解标记允许匿名访问的url
            .authorizeHttpRequests((requests) -> {
                permitAllUrl.getUrls().forEach(url -> requests.antMatchers(url).permitAll());
                // 对于登录login 注册register 验证码captchaImage 允许匿名访问
                requests.antMatchers("/login", "/register", "/captchaImage",
                                    "/weiXin/login","/weiXin/returnNotify","/ws/**").permitAll()
..........

}

注意:端点配置的是“/ws/order",所以在这了配置为”/ws/**“

三、小程序端的部分代码配置

注意:需要在路径上面传递token,为了后端获取openId向指定用户发送消息

这个是小程序的webSocket的地址示例:“wss://5aa7e45c.r11.cpolar.top/ws/order?token=${this.token}”


网站公告

今日签到

点亮在社区的每一天
去签到