Spring Boot WebSocket实时在线人数统计

发布于:2025-08-13 ⋅ 阅读:(12) ⋅ 点赞:(0)

一、技术架构设计

1.1 系统组件

  • 前端:浏览器客户端(支持SockJS)
  • WebSocket服务:Spring Boot + STOMP协议
  • 状态存储:内存存储或Redis集群
  • 消息代理:SimpleBroker或RabbitMQ/ActiveMQ

1.2 通信流程

浏览器客户端 Spring Boot服务 消息代理 WebSocket连接(/ws) 连接确认 订阅/topic/onlineUsers 发送/app/hello消息 广播在线人数 推送人数更新 浏览器客户端 Spring Boot服务 消息代理

二、完整实现方案

2.1 依赖配置

Maven依赖

<dependencies>
    <!-- Web支持 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- WebSocket支持 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <!-- Redis集成(可选) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
</dependencies>

WebSocket配置类

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 客户端订阅前缀
        registry.enableSimpleBroker("/topic");
        // 服务端接收前缀
        registry.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOrigins("*")
                .withSockJS();
    }
}

2.2 核心业务实现

在线用户服务

@Service
public class OnlineUserService {
    private final Set<String> onlineUsers = ConcurrentHashMap.newKeySet();

    // 分布式场景使用Redis
    // @Autowired
    // private RedisTemplate<String, String> redisTemplate;

    public void userConnected(String sessionId, String username) {
        onlineUsers.add(username);
        // redisTemplate.opsForSet().add("online:users", username);
        broadcastCount();
    }

    public void userDisconnected(String username) {
        onlineUsers.remove(username);
        // redisTemplate.opsForSet().remove("online:users", username);
        broadcastCount();
    }

    public int getOnlineCount() {
        return onlineUsers.size();
        // return redisTemplate.opsForSet().size("online:users").intValue();
    }

    private void broadcastCount() {
        messagingTemplate.convertAndSend("/topic/onlineUsers",
            new OnlineUserCount(getOnlineCount()));
    }

    @Autowired
    private SimpMessagingTemplate messagingTemplate;
}

WebSocket控制器

@Controller
@RequiredArgsConstructor
public class WebSocketController {
    private final OnlineUserService onlineUserService;

    @MessageMapping("/hello")
    public void handleGreeting(HelloMessage message,
                             SimpMessageHeaderAccessor headerAccessor) {
        String sessionId = headerAccessor.getSessionId();
        onlineUserService.userConnected(sessionId, message.getUsername());
    }

    @EventListener
    public void handleDisconnect(SessionDisconnectEvent event) {
        String username = event.getUser().getName();
        onlineUserService.userDisconnected(username);
    }
}

2.3 前端实现

HTML页面

<!DOCTYPE html>
<html>
<head>
    <title>在线人数统计</title>
    <script src="<https://cdn.jsdelivr.net/sockjs/1.1.4/sockjs.min.js>"></script>
    <script src="<https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js>"></script>
</head>
<body>
    <div>当前在线人数: <span id="count">0</span></div>
    <script>
        const socket = new SockJS('/ws');
        const stompClient = Stomp.over(socket);

        stompClient.connect({}, function(frame) {
            // 订阅人数更新
            stompClient.subscribe('/topic/onlineUsers', function(response) {
                const data = JSON.parse(response.body);
                document.getElementById('count').textContent = data.count;
            });

            // 发送登录消息
            stompClient.send("/app/hello", {},
                JSON.stringify({'username': 'user_123'}));
        });
    </script>
</body>
</html>

三、高级功能扩展

3.1 分布式解决方案

Redis集成配置

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(
            RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

集群感知服务

@Service
public class ClusterOnlineUserService {
    private static final String ONLINE_USERS_KEY = "online:users";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void userConnected(String userId) {
        redisTemplate.opsForSet().add(ONLINE_USERS_KEY, userId);
        redisTemplate.expire(ONLINE_USERS_KEY, 1, TimeUnit.HOURS);
    }

    public void userDisconnected(String userId) {
        redisTemplate.opsForSet().remove(ONLINE_USERS_KEY, userId);
    }

    public long getOnlineCount() {
        Long size = redisTemplate.opsForSet().size(ONLINE_USERS_KEY);
        return size != null ? size : 0;
    }
}

3.2 心跳检测机制

服务端配置

@Configuration
public class WebSocketHeartbeatConfig {
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxSessionIdleTimeout(30000L); // 30秒超时
        return container;
    }
}

前端心跳

// 每20秒发送心跳
setInterval(() => {
    stompClient.send("/app/heartbeat", {}, "ping");
}, 20000);

四、生产环境优化

4.1 性能调优参数

application.yml配置

spring:
  websocket:
    max-text-message-buffer-size: 8192
    max-binary-message-buffer-size: 8192

server:
  compression:
    enabled: true
    mime-types: text/html,text/css,application/javascript,application/json

4.2 监控指标暴露

Actuator集成

@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCustomizer() {
    return registry -> {
        registry.gauge("websocket.sessions", onlineUserService,
            OnlineUserService::getOnlineCount);
    };
}

4.3 安全防护

CSRF防护配置

@Configuration
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
            .csrf().disable() // 禁用CSRF以便WebSocket连接
            .authorizeRequests()
                .antMatchers("/ws/**").permitAll()
                .anyRequest().authenticated();
    }
}

限流保护

@Bean
public WebSocketRateLimiter webSocketRateLimiter() {
    return new WebSocketRateLimiter(100, 1, TimeUnit.MINUTES);
}

@ControllerAdvice
public class WebSocketExceptionHandler {
    @MessageExceptionHandler
    @SendToUser("/queue/errors")
    public String handleException(Exception ex) {
        return "错误: " + ex.getMessage();
    }
}

五、测试验证方案

5.1 JMeter压力测试

测试计划配置

  1. 创建WebSocket连接采样器
  2. 添加100个并发用户
  3. 配置消息发送间隔
  4. 添加聚合报告监听器

5.2 自动化测试

Spring Boot测试类

@SpringBootTest
@AutoConfigureMockMvc
class WebSocketTests {

    @Autowired
    private WebSocketHandler handler;

    @Test
    void testOnlineCount() throws Exception {
        Mockito.when(onlineUserService.getOnlineCount()).thenReturn(5);

        WebSocketSession session = mock(WebSocketSession.class);
        handler.afterConnectionEstablished(session);

        assertEquals(5, handler.getOnlineCount());
    }
}

六、常见问题解决方案

6.1 连接不稳定问题

  • 症状:频繁断开连接
  • 解决方案:
    1. 调整心跳间隔
    2. 增加网络超时时间
    3. 检查Nginx配置(如果使用反向代理)

6.2 内存泄漏问题

  • 症状:内存持续增长
  • 解决方案:
    1. 确保正确清理断开连接的会话
    2. 使用WeakReference存储会话对象
    3. 定期强制GC并监控

6.3 集群同步延迟

  • 症状:不同节点人数不一致
  • 解决方案:
    1. 使用Redis Pub/Sub实现实时同步
    2. 增加心跳同步机制
    3. 采用最终一致性设计

通过以上完整实现方案,可以构建一个高性能、可靠的实时在线人数统计系统。根据实际业务需求,可以选择内存存储或Redis集群方案,并灵活调整各项配置参数。


网站公告

今日签到

点亮在社区的每一天
去签到