WebSocket分布式实现方案

发布于:2025-05-17 ⋅ 阅读:(15) ⋅ 点赞:(0)
版本使用

Springboot使用3.3.1版本

jdk21

实现逻辑

使用Redis记录用户连接地址,RabbitMQ为每个微服务所在的服务器创建对应的交换机或特定的路由规则,每个微服务监听自己的交换机。当连接不同服务器的两个用户进行通信时,通过MQ将消息进行转发到对应的队列中,由监听该队列的微服务获取消息进行转发到连接该服务器的用户。例如:用户A连接到120.78.2.56服务器,用户B连接到120.78.2.57服务器,当用户A要给用户B发送消息时,用户A请求56服务器的接口,将消息发送到56服务器,56服务器上的应用判断用户B的连接是否在自己这里,如果在自己这里则直接进行消息转发即可,如果不在则查询Redis中用户B连接的服务器地址,然后将消息发送到MQ对应的队列或交换机上,由57服务器监听消息,然后发送给连接在57服务器上的用户B。

代码实现
pom依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.creatar</groupId>
  <artifactId>creatar</artifactId>
  <version>1.0</version>

  <packaging>jar</packaging>

  <properties>
    <maven.compiler.source>21</maven.compiler.source>
    <maven.compiler.target>21</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.3.1</version>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-undertow</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.112.Final</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>42.7.4</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-3-starter -->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid-spring-boot-3-starter</artifactId>
      <version>1.2.23</version>
    </dependency>


    <dependency>
      <groupId>com.baomidou</groupId>
      <artifactId>mybatis-plus-spring-boot3-starter</artifactId>
      <version>3.5.7</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.github.yulichang/mybatis-plus-join-boot-starter -->
    <dependency>
      <groupId>com.github.yulichang</groupId>
      <artifactId>mybatis-plus-join-boot-starter</artifactId>
      <version>1.4.13</version>
    </dependency>



    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson-spring-boot-starter</artifactId>
      <version>3.26.0</version>
    </dependency>
  </dependencies>

</project>

创建Netty服务器
package com.creatar.service.common.im.core;

import com.creatar.properties.SocketProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
 * Netty服务器
 *
 * @author: 张定辉
 * @date: 2025-05-03
 * @description: Netty服务器
 */
@Component
@RequiredArgsConstructor
public class NettyServer implements ApplicationRunner {
   
    private static final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    private static final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    private final SocketProperties socketProperties;
    private final SocketMessageHandler socketMessageHandler;
    private Channel serverChannel;

    private ServerBootstrap bootstrap() {
   
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
   
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
   
                        socketChannel.pipeline().addLast(new HttpServerCodec());
                        socketChannel.pipeline().addLast(new ChunkedWriteHandler());
                        socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
                        socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler(socketProperties.getHost()));
                        socketChannel.pipeline().addLast(new LoggingHandler());
                        socketChannel.pipeline().addLast(socketMessageHandler);
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        return serverBootstrap;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
   
        serverChannel = bootstrap().bind(socketProperties.getPort()).sync().channel().closeFuture().sync().channel();
    }

    @PreDestroy
    public void stop() {
   
        serverChannel.close();
        serverChannel.parent().close();
    }
}
用户连接/在线情况业务类
package com.creatar.service.common.im.core;

import com.creatar.properties.SocketProperties;
import com.creatar.util.HttpUtil;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * 用户在线情况业务类
 *
 * @author: 张定辉
 * @date: 2025-05-03
 * @description: 用户在线情况业务类
 */
@Service
@RequiredArgsConstructor
public class OnlineService {
   
    private final SocketProperties socketProperties;
    private final RedisTemplate<String, String> redisTemplate;

    /**
     * 设置用户上线
     *
     * @param userId 用户ID
     */
    public void setUserOnline(String userId) {
   
        if (StringUtils.isBlank(userId)) {
   
            return;
        }
        redisTemplate.opsForValue().setBit(socketProperties.getOnlineKey(), Long.parseLong(userId), true);
    }

    /**
     * 设置用户下线
     */
    public void setUserOffline(String userId) {
   
        if (StringUtils.isBlank(userId)) {
   
            return;
        }
        redisTemplate.opsForValue().setBit(socketProperties.getOnlineKey(), Long.parseLong(userId), false);
    }

    /**
     * 统计在线人数
     *
     * @return 返回在线人数
     */
    public Long statOnline() {
   
        return redisTemplate.execute((RedisCallback<Long>) con -> con.stringCommands().bitCount(socketProperties.getOnlineKey().getBytes()));
    }

    /**
     * 设置用户连接节点
     */
    public void setUserConnectionNode(String userId) {
   
        redisTemplate.opsForValue().set(socketProperties.getUserConnectionNode() + userId, HttpUtil.getHostname());
        redisTemplate.expire(socketProperties.getUserConnectionNode() + userId, 1, TimeUnit

网站公告

今日签到

点亮在社区的每一天
去签到