一、技术架构设计
1.1 系统组件
- 前端:浏览器客户端(支持SockJS)
- WebSocket服务:Spring Boot + STOMP协议
- 状态存储:内存存储或Redis集群
- 消息代理:SimpleBroker或RabbitMQ/ActiveMQ
1.2 通信流程
二、完整实现方案
2.1 依赖配置
Maven依赖
<dependencies>
<!-- Web支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- WebSocket支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Redis集成(可选) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
WebSocket配置类
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 客户端订阅前缀
registry.enableSimpleBroker("/topic");
// 服务端接收前缀
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*")
.withSockJS();
}
}
2.2 核心业务实现
在线用户服务
@Service
public class OnlineUserService {
private final Set<String> onlineUsers = ConcurrentHashMap.newKeySet();
// 分布式场景使用Redis
// @Autowired
// private RedisTemplate<String, String> redisTemplate;
public void userConnected(String sessionId, String username) {
onlineUsers.add(username);
// redisTemplate.opsForSet().add("online:users", username);
broadcastCount();
}
public void userDisconnected(String username) {
onlineUsers.remove(username);
// redisTemplate.opsForSet().remove("online:users", username);
broadcastCount();
}
public int getOnlineCount() {
return onlineUsers.size();
// return redisTemplate.opsForSet().size("online:users").intValue();
}
private void broadcastCount() {
messagingTemplate.convertAndSend("/topic/onlineUsers",
new OnlineUserCount(getOnlineCount()));
}
@Autowired
private SimpMessagingTemplate messagingTemplate;
}
WebSocket控制器
@Controller
@RequiredArgsConstructor
public class WebSocketController {
private final OnlineUserService onlineUserService;
@MessageMapping("/hello")
public void handleGreeting(HelloMessage message,
SimpMessageHeaderAccessor headerAccessor) {
String sessionId = headerAccessor.getSessionId();
onlineUserService.userConnected(sessionId, message.getUsername());
}
@EventListener
public void handleDisconnect(SessionDisconnectEvent event) {
String username = event.getUser().getName();
onlineUserService.userDisconnected(username);
}
}
2.3 前端实现
HTML页面
<!DOCTYPE html>
<html>
<head>
<title>在线人数统计</title>
<script src="<https://cdn.jsdelivr.net/sockjs/1.1.4/sockjs.min.js>"></script>
<script src="<https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js>"></script>
</head>
<body>
<div>当前在线人数: <span id="count">0</span></div>
<script>
const socket = new SockJS('/ws');
const stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
// 订阅人数更新
stompClient.subscribe('/topic/onlineUsers', function(response) {
const data = JSON.parse(response.body);
document.getElementById('count').textContent = data.count;
});
// 发送登录消息
stompClient.send("/app/hello", {},
JSON.stringify({'username': 'user_123'}));
});
</script>
</body>
</html>
三、高级功能扩展
3.1 分布式解决方案
Redis集成配置
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(
RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}
集群感知服务
@Service
public class ClusterOnlineUserService {
private static final String ONLINE_USERS_KEY = "online:users";
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void userConnected(String userId) {
redisTemplate.opsForSet().add(ONLINE_USERS_KEY, userId);
redisTemplate.expire(ONLINE_USERS_KEY, 1, TimeUnit.HOURS);
}
public void userDisconnected(String userId) {
redisTemplate.opsForSet().remove(ONLINE_USERS_KEY, userId);
}
public long getOnlineCount() {
Long size = redisTemplate.opsForSet().size(ONLINE_USERS_KEY);
return size != null ? size : 0;
}
}
3.2 心跳检测机制
服务端配置
@Configuration
public class WebSocketHeartbeatConfig {
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxSessionIdleTimeout(30000L); // 30秒超时
return container;
}
}
前端心跳
// 每20秒发送心跳
setInterval(() => {
stompClient.send("/app/heartbeat", {}, "ping");
}, 20000);
四、生产环境优化
4.1 性能调优参数
application.yml配置
spring:
websocket:
max-text-message-buffer-size: 8192
max-binary-message-buffer-size: 8192
server:
compression:
enabled: true
mime-types: text/html,text/css,application/javascript,application/json
4.2 监控指标暴露
Actuator集成
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCustomizer() {
return registry -> {
registry.gauge("websocket.sessions", onlineUserService,
OnlineUserService::getOnlineCount);
};
}
4.3 安全防护
CSRF防护配置
@Configuration
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.csrf().disable() // 禁用CSRF以便WebSocket连接
.authorizeRequests()
.antMatchers("/ws/**").permitAll()
.anyRequest().authenticated();
}
}
限流保护
@Bean
public WebSocketRateLimiter webSocketRateLimiter() {
return new WebSocketRateLimiter(100, 1, TimeUnit.MINUTES);
}
@ControllerAdvice
public class WebSocketExceptionHandler {
@MessageExceptionHandler
@SendToUser("/queue/errors")
public String handleException(Exception ex) {
return "错误: " + ex.getMessage();
}
}
五、测试验证方案
5.1 JMeter压力测试
测试计划配置
- 创建WebSocket连接采样器
- 添加100个并发用户
- 配置消息发送间隔
- 添加聚合报告监听器
5.2 自动化测试
Spring Boot测试类
@SpringBootTest
@AutoConfigureMockMvc
class WebSocketTests {
@Autowired
private WebSocketHandler handler;
@Test
void testOnlineCount() throws Exception {
Mockito.when(onlineUserService.getOnlineCount()).thenReturn(5);
WebSocketSession session = mock(WebSocketSession.class);
handler.afterConnectionEstablished(session);
assertEquals(5, handler.getOnlineCount());
}
}
六、常见问题解决方案
6.1 连接不稳定问题
- 症状:频繁断开连接
- 解决方案:
- 调整心跳间隔
- 增加网络超时时间
- 检查Nginx配置(如果使用反向代理)
6.2 内存泄漏问题
- 症状:内存持续增长
- 解决方案:
- 确保正确清理断开连接的会话
- 使用WeakReference存储会话对象
- 定期强制GC并监控
6.3 集群同步延迟
- 症状:不同节点人数不一致
- 解决方案:
- 使用Redis Pub/Sub实现实时同步
- 增加心跳同步机制
- 采用最终一致性设计
通过以上完整实现方案,可以构建一个高性能、可靠的实时在线人数统计系统。根据实际业务需求,可以选择内存存储或Redis集群方案,并灵活调整各项配置参数。