【橘子websocket】如何基于vertx来构建websocket聊天室(上)

发布于:2025-03-19 ⋅ 阅读:(15) ⋅ 点赞:(0)

简介

websocket的概念我们就不说了,网上已经说烂了,具体更加权威的内容可以去看RFC6455
我们这里只来操作一下如何在实际开发中构建websocket服务。如果是springboot技术栈的话,其实很简单,就是几个注解的问题。不得不说springboot真的很方便。可以参考springboot的官方文档springboot websocket
我们这里不用springboot,我们来使用vertx来构建。至于什么是vertx可以参考vertx的官网vertx
简单来说就是他是基于netty构建的一个响应式的包,并且在这个基础上开发出了一套相对完整的生态。今天我们单说他的websocket实现。

一、环境依赖

jdk11 实际jdk8也行
maven:3.6
idea

具体的包依赖如下:

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
    <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
    <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
    <exec-maven-plugin.version>3.0.0</exec-maven-plugin.version>
    <logback.version>1.5.12</logback.version>


    <vertx.version>4.0.3</vertx.version>
    <junit-jupiter.version>5.6.0</junit-jupiter.version>
    <javafaker.version>1.0.2</javafaker.version>
    <lombok.version>1.18.36</lombok.version>
    <fastjson2.version>2.0.52</fastjson2.version>
    <jackson-databind.version>2.17.3</jackson-databind.version>
    <junit-jupiter.version>5.6.2</junit-jupiter.version>
    <junit-platform-surefire-provider.version>1.0.1</junit-platform-surefire-provider.version>
    <assertj-core.version>3.8.0</assertj-core.version>
    <logback-classic.version>1.5.12</logback-classic.version>

</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-stack-depchain</artifactId>
            <version>${vertx.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-bom</artifactId>
            <version>2.3.2</version>
            <scope>import</scope>
            <type>pom</type>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-core</artifactId>
        <version>${vertx.version}</version>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>${jackson-databind.version}</version>
    </dependency>

    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback-classic.version}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback.version}</version>
    </dependency>
    <dependency>
        <groupId>com.github.javafaker</groupId>
        <artifactId>javafaker</artifactId>
        <version>${javafaker.version}</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>${fastjson2.version}</version>
    </dependency>

    <!-- Test dependencies -->
    <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-junit5</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter</artifactId>
        <version>${junit-jupiter.version}</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.assertj</groupId>
        <artifactId>assertj-core</artifactId>
        <version>${assertj-core.version}</version>
        <scope>test</scope>
    </dependency>
</dependencies>

至此我们完成了依赖的引入。
下面我们来开发代码。

二、服务端开发

vertx的部署启动是有多种方式的,这个我们后面再说。本文我们以main的方式来部署。
我们需要一个主启动类,vertx中每一个实例都是一个Verticle,我们先启动MainVerticle

package com.levi.ws;

import com.levi.ws.handler.WebSocketHandler;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;

public class MainVerticle extends AbstractVerticle {

    private static final Logger LOG = LoggerFactory.getLogger(MainVerticle.class);

    public static void main(String[] args) {
        // 启动 Vert.x 应用
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new MainVerticle());
    }

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        // 创建 HTTP 服务器并监听端口 8080, 并将 WebSocketHandler 作为请求处理程序
        // 这里就能看到,vertx 是如何将请求转发给 WebSocketHandler 的
        vertx.createHttpServer().webSocketHandler(new WebSocketHandler())
                .listen(8080, result -> {
                    // 检查服务器是否启动成功
                    if (result.succeeded()) {
                        LOG.info("HTTP server started on port 8080");
                        startPromise.complete();
                    }else {
                        LOG.error("Failed to start HTTP server", result.cause());
                        startPromise.fail(result.cause());
                    }
                });
    }
}

这里的编码方式和我们在spring中的有所不同,因为vertx中几乎所有的api都是异步的。所以他不会立刻返回,都是在后面挂了一个回调通过回调的状态(succeeded)来执行后面的逻辑。
至此我们就启动了一个端口8080,并且提供了wensocket的处理器。接下来我们要实现这个处理器,完成我们的逻辑(WebSocketHandler是我们自己定义的)。

package com.levi.ws.handler;

import com.levi.ws.PriceBroadcast;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class WebSocketHandler implements Handler<ServerWebSocket> {

    private static final Logger LOG = LoggerFactory.getLogger(WebSocketHandler.class);

    public static final String WEBSOCKET_PATH = "/ws/echo";

    private static final String CLOSE_MSG = "please close";
    
    @Override
    public void handle(ServerWebSocket ws) {
    	// 获取到客户端的请求地址
        String wsPath = ws.path();
        // 获取到客户端连接的唯一标记
        String textHandlerID = ws.textHandlerID();
        // 如果客户端的请求地址不是我们想要的,注意,我们只监听了8080端口,也就是所有的
        // 8080请求我们都处理,但是我们一般还是限制一下我们的想要的路径
        if (!WEBSOCKET_PATH.equalsIgnoreCase(wsPath)) {
            LOG.info("WebSocket path is {},and only accept path is {}", wsPath,WEBSOCKET_PATH);
            // 像客户端回写消息,你的地址不对,检查一下
            ws.writeFinalTextFrame("wrong path and only accept path is "+WEBSOCKET_PATH+" ,please check your path");
            // 并且关闭这条连接
            ws.close();
            return;
        }
        LOG.info("openning WebSocket {} is connected,path is {}", textHandlerID, wsPath);
        // 来到这里就是我们接受了这条连接
        ws.accept();
        // 给这个连接挂一个处理逻辑,当客户端发送数据过来的时候,触发这个回调,处理我们的数据
        ws.frameHandler(webSocketFrameHandler(ws));
        // 给这个连接挂一个连接关闭时候的回调,当连接关闭的时候,我们做一些处理
        ws.endHandler( onClose -> {
            LOG.info("WebSocket {} is  closed", textHandlerID);
        });
        // 给这个连接挂一个异常的回调
        ws.exceptionHandler(err -> LOG.error( "websocket is fail" ,err));
        // 连接成功之后,我们给客户端发一条消息
        ws.writeTextMessage( "connected");
    }

	// 这里就是当服务端接收到消息的时候,触发的方法 
    private static Handler<WebSocketFrame> webSocketFrameHandler(ServerWebSocket ws) {
        String textHandlerID = ws.textHandlerID();
        return buffer -> {
            String msg = buffer.textData();
            // 如果发的是关闭的消息,那我们就回写一条消息并且断开连接
            if (CLOSE_MSG.equalsIgnoreCase(msg)) {
                LOG.info("WebSocket  {} closed", textHandlerID);
                ws.writeFinalTextFrame("WebSocket  " + textHandlerID + " closed");
                ws.close();
            } else {
            	// 正常消息,给客户端返回即可
                LOG.info("WebSocket {} received message: {}", textHandlerID, msg);
                ws.writeTextMessage("WebSocket " + textHandlerID + " received message: " + msg);
            }
        };
    }
}

其实到这里我们已经完成了一个模型了,那我们就来测试一下。

三、初步测试

我们直接运行MainVerticle,成功启动在8080上。
在这里插入图片描述
然后我们来测试websocket协议,这里我推荐一个网站,websocket测试网站
我们可以在这里发起连接,然后观察一下情况。
1、测试连接
连接成功会触发ws.writeTextMessage( “connected”); 也就是服务端会给客户端回写connected字符串。
在这里插入图片描述
连接被服务端端开了,因为我们的请求地址是,ws://localhost:8080/ws/123 这个地址其实不是我们代码里面要放行的地址,所以按预期会触发

if (!WEBSOCKET_PATH.equalsIgnoreCase(wsPath)) {
    LOG.info("WebSocket path is {},and only accept path is {}", wsPath,WEBSOCKET_PATH);
    ws.writeFinalTextFrame("wrong path and only accept path is "+WEBSOCKET_PATH+" ,please check your path");
    ws.close();
    return;
}

客户端会收到wrong path and only accept path is 。。。并且服务端主动断开连接。我们来看正确的地址:
在这里插入图片描述
没有问题,正确连接到了,那我们发一个消息给服务端呢。
在这里插入图片描述
ok,到此为止,一切都正常,那我们客户端主动断开连接看下服务端会不会触发我们挂的endHandler。
在这里插入图片描述
我们发现也正常。

四、进一步优化服务端

我们的连接每次上来之后我们并没有管理起来,这样其实你无法做到哪个连接和哪个连接通信,我们来实现一个管理的机制,并且测试一下多连接上来的时候是如何响应的。
我们先来引入一个做数据的实现类,他可以帮助我们模拟各种数据。

package com.levi.ws.factory;

import com.github.javafaker.Faker;

public class InstanceFactory {

  private static final Faker FAKER = Faker.instance();

  public static Faker faker() {
    return FAKER;
  }
}

我们实现一个注册的类功能

package com.levi.ws;

import com.github.javafaker.Faker;
import com.levi.ws.factory.InstanceFactory;
import io.vertx.core.Vertx;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class PriceBroadcast {

    private static final Logger LOG = LoggerFactory.getLogger(PriceBroadcast.class);

	// 管理所有的连接
    private static final Map<String, ServerWebSocket> connectionClients = new ConcurrentHashMap<>();

	// 构造函数
    public PriceBroadcast(Vertx vertx) {
        periodicUpdate(vertx);
    }
	
	// 实现一个定时的机制,没一秒服务端给对应的连接回写造的假数据
    private void periodicUpdate(Vertx vertx) {
        Faker faker = InstanceFactory.faker();
        String updatePrice = new JsonObject()
                .put("email", faker.internet().emailAddress())
                .put("price", faker.random().nextDouble())
                .toString();
        vertx.setPeriodic(Duration.ofSeconds(1).toMillis(), timerId -> {
            connectionClients.values().forEach(ws -> {
                LOG.info("send message to client:{}", ws.textHandlerID());
                ws.writeTextMessage(updatePrice);
            });
        });
    }
	
	// 注册连接
    public void register(ServerWebSocket webSocket) {
        LOG.info("register client:{}", webSocket.textHandlerID());
        connectionClients.put(webSocket.textHandlerID(), webSocket);
    }
	
	// 移除连接
    public void unregister(ServerWebSocket webSocket) {
        LOG.info("unRegister client:{}", webSocket.textHandlerID());
        connectionClients.remove(webSocket.textHandlerID());
    }

}

接下来我们需要修改一下其他的类,让这个工具类注入进来。

package com.levi.ws;

import com.levi.ws.handler.WebSocketHandler;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;

public class MainVerticle extends AbstractVerticle {

    private static final Logger LOG = LoggerFactory.getLogger(MainVerticle.class);

    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new MainVerticle());
    }

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        vertx.createHttpServer().webSocketHandler(new WebSocketHandler(vertx))
                .listen(8080, result -> {
                    if (result.succeeded()) {
                        LOG.info("HTTP server started on port 8080");
                        startPromise.complete();
                    }else {
                        LOG.error("Failed to start HTTP server", result.cause());
                        startPromise.fail(result.cause());
                    }
                });
    }
}

package com.levi.ws.handler;

import com.levi.ws.PriceBroadcast;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class WebSocketHandler implements Handler<ServerWebSocket> {

    private static final Logger LOG = LoggerFactory.getLogger(WebSocketHandler.class);

    public static final String WEBSOCKET_PATH = "/ws/echo";

    private static final String CLOSE_MSG = "please close";

    private PriceBroadcast priceBroadcast;

    public WebSocketHandler(Vertx vertx) {
        this.priceBroadcast = new PriceBroadcast(vertx);
    }

    @Override
    public void handle(ServerWebSocket ws) {
        String wsPath = ws.path();
        String textHandlerID = ws.textHandlerID();
        if (!WEBSOCKET_PATH.equalsIgnoreCase(wsPath)) {
            LOG.info("WebSocket path is {},and only accept path is {}", wsPath,WEBSOCKET_PATH);
            ws.writeFinalTextFrame("wrong path and only accept path is "+WEBSOCKET_PATH+" ,please check your path");
            ws.close();
            return;
        }
        LOG.info("openning WebSocket {} is connected,path is {}", textHandlerID, wsPath);
        ws.accept();
        ws.frameHandler(webSocketFrameHandler(ws));
        ws.endHandler( onClose -> {
            LOG.info("WebSocket {} is  closed", textHandlerID);
            // 连接断开的时候就移除
            priceBroadcast.unregister(ws);
        });
        ws.exceptionHandler(err -> LOG.error( "websocket is fail" ,err));
        ws.writeTextMessage( "connected");
        // 连接上来的时候就注册进来
        priceBroadcast.register(ws);
    }

    private static Handler<WebSocketFrame> webSocketFrameHandler(ServerWebSocket ws) {
        String textHandlerID = ws.textHandlerID();
        return buffer -> {
            String msg = buffer.textData();
            if (CLOSE_MSG.equalsIgnoreCase(msg)) {
                LOG.info("WebSocket  {} closed", textHandlerID);
                ws.writeFinalTextFrame("WebSocket  " + textHandlerID + " closed");
                ws.close();
            } else {
                LOG.info("WebSocket {} received message: {}", textHandlerID, msg);
                ws.writeTextMessage("WebSocket " + textHandlerID + " received message: " + msg);
            }
        };
    }
}

于是我们此时就完成了代码。我们来测试效果。我们这次开两个客户端。
在这里插入图片描述
实际效果如下:

websocket发送数据

一切正常。我们此时就完成了多个客户端的管理,后面我们就可以基于这个代码继续扩展,实现一个聊天的功能。


网站公告

今日签到

点亮在社区的每一天
去签到