简介
websocket的概念我们就不说了,网上已经说烂了,具体更加权威的内容可以去看RFC6455。
我们这里只来操作一下如何在实际开发中构建websocket服务。如果是springboot技术栈的话,其实很简单,就是几个注解的问题。不得不说springboot真的很方便。可以参考springboot的官方文档springboot websocket。
我们这里不用springboot,我们来使用vertx来构建。至于什么是vertx可以参考vertx的官网vertx。
简单来说就是他是基于netty构建的一个响应式的包,并且在这个基础上开发出了一套相对完整的生态。今天我们单说他的websocket实现。
一、环境依赖
jdk11 实际jdk8也行
maven:3.6
idea
具体的包依赖如下:
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<exec-maven-plugin.version>3.0.0</exec-maven-plugin.version>
<logback.version>1.5.12</logback.version>
<vertx.version>4.0.3</vertx.version>
<junit-jupiter.version>5.6.0</junit-jupiter.version>
<javafaker.version>1.0.2</javafaker.version>
<lombok.version>1.18.36</lombok.version>
<fastjson2.version>2.0.52</fastjson2.version>
<jackson-databind.version>2.17.3</jackson-databind.version>
<junit-jupiter.version>5.6.2</junit-jupiter.version>
<junit-platform-surefire-provider.version>1.0.1</junit-platform-surefire-provider.version>
<assertj-core.version>3.8.0</assertj-core.version>
<logback-classic.version>1.5.12</logback-classic.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-stack-depchain</artifactId>
<version>${vertx.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-bom</artifactId>
<version>2.3.2</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback-classic.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>${javafaker.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
至此我们完成了依赖的引入。
下面我们来开发代码。
二、服务端开发
vertx的部署启动是有多种方式的,这个我们后面再说。本文我们以main的方式来部署。
我们需要一个主启动类,vertx中每一个实例都是一个Verticle,我们先启动MainVerticle
package com.levi.ws;
import com.levi.ws.handler.WebSocketHandler;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
public class MainVerticle extends AbstractVerticle {
private static final Logger LOG = LoggerFactory.getLogger(MainVerticle.class);
public static void main(String[] args) {
// 启动 Vert.x 应用
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new MainVerticle());
}
@Override
public void start(Promise<Void> startPromise) throws Exception {
// 创建 HTTP 服务器并监听端口 8080, 并将 WebSocketHandler 作为请求处理程序
// 这里就能看到,vertx 是如何将请求转发给 WebSocketHandler 的
vertx.createHttpServer().webSocketHandler(new WebSocketHandler())
.listen(8080, result -> {
// 检查服务器是否启动成功
if (result.succeeded()) {
LOG.info("HTTP server started on port 8080");
startPromise.complete();
}else {
LOG.error("Failed to start HTTP server", result.cause());
startPromise.fail(result.cause());
}
});
}
}
这里的编码方式和我们在spring中的有所不同,因为vertx中几乎所有的api都是异步的。所以他不会立刻返回,都是在后面挂了一个回调通过回调的状态(succeeded)来执行后面的逻辑。
至此我们就启动了一个端口8080,并且提供了wensocket的处理器。接下来我们要实现这个处理器,完成我们的逻辑(WebSocketHandler是我们自己定义的)。
package com.levi.ws.handler;
import com.levi.ws.PriceBroadcast;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketHandler implements Handler<ServerWebSocket> {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketHandler.class);
public static final String WEBSOCKET_PATH = "/ws/echo";
private static final String CLOSE_MSG = "please close";
@Override
public void handle(ServerWebSocket ws) {
// 获取到客户端的请求地址
String wsPath = ws.path();
// 获取到客户端连接的唯一标记
String textHandlerID = ws.textHandlerID();
// 如果客户端的请求地址不是我们想要的,注意,我们只监听了8080端口,也就是所有的
// 8080请求我们都处理,但是我们一般还是限制一下我们的想要的路径
if (!WEBSOCKET_PATH.equalsIgnoreCase(wsPath)) {
LOG.info("WebSocket path is {},and only accept path is {}", wsPath,WEBSOCKET_PATH);
// 像客户端回写消息,你的地址不对,检查一下
ws.writeFinalTextFrame("wrong path and only accept path is "+WEBSOCKET_PATH+" ,please check your path");
// 并且关闭这条连接
ws.close();
return;
}
LOG.info("openning WebSocket {} is connected,path is {}", textHandlerID, wsPath);
// 来到这里就是我们接受了这条连接
ws.accept();
// 给这个连接挂一个处理逻辑,当客户端发送数据过来的时候,触发这个回调,处理我们的数据
ws.frameHandler(webSocketFrameHandler(ws));
// 给这个连接挂一个连接关闭时候的回调,当连接关闭的时候,我们做一些处理
ws.endHandler( onClose -> {
LOG.info("WebSocket {} is closed", textHandlerID);
});
// 给这个连接挂一个异常的回调
ws.exceptionHandler(err -> LOG.error( "websocket is fail" ,err));
// 连接成功之后,我们给客户端发一条消息
ws.writeTextMessage( "connected");
}
// 这里就是当服务端接收到消息的时候,触发的方法
private static Handler<WebSocketFrame> webSocketFrameHandler(ServerWebSocket ws) {
String textHandlerID = ws.textHandlerID();
return buffer -> {
String msg = buffer.textData();
// 如果发的是关闭的消息,那我们就回写一条消息并且断开连接
if (CLOSE_MSG.equalsIgnoreCase(msg)) {
LOG.info("WebSocket {} closed", textHandlerID);
ws.writeFinalTextFrame("WebSocket " + textHandlerID + " closed");
ws.close();
} else {
// 正常消息,给客户端返回即可
LOG.info("WebSocket {} received message: {}", textHandlerID, msg);
ws.writeTextMessage("WebSocket " + textHandlerID + " received message: " + msg);
}
};
}
}
其实到这里我们已经完成了一个模型了,那我们就来测试一下。
三、初步测试
我们直接运行MainVerticle,成功启动在8080上。
然后我们来测试websocket协议,这里我推荐一个网站,websocket测试网站
我们可以在这里发起连接,然后观察一下情况。
1、测试连接
连接成功会触发ws.writeTextMessage( “connected”); 也就是服务端会给客户端回写connected字符串。
连接被服务端端开了,因为我们的请求地址是,ws://localhost:8080/ws/123 这个地址其实不是我们代码里面要放行的地址,所以按预期会触发
if (!WEBSOCKET_PATH.equalsIgnoreCase(wsPath)) {
LOG.info("WebSocket path is {},and only accept path is {}", wsPath,WEBSOCKET_PATH);
ws.writeFinalTextFrame("wrong path and only accept path is "+WEBSOCKET_PATH+" ,please check your path");
ws.close();
return;
}
客户端会收到wrong path and only accept path is 。。。并且服务端主动断开连接。我们来看正确的地址:
没有问题,正确连接到了,那我们发一个消息给服务端呢。
ok,到此为止,一切都正常,那我们客户端主动断开连接看下服务端会不会触发我们挂的endHandler。
我们发现也正常。
四、进一步优化服务端
我们的连接每次上来之后我们并没有管理起来,这样其实你无法做到哪个连接和哪个连接通信,我们来实现一个管理的机制,并且测试一下多连接上来的时候是如何响应的。
我们先来引入一个做数据的实现类,他可以帮助我们模拟各种数据。
package com.levi.ws.factory;
import com.github.javafaker.Faker;
public class InstanceFactory {
private static final Faker FAKER = Faker.instance();
public static Faker faker() {
return FAKER;
}
}
我们实现一个注册的类功能
package com.levi.ws;
import com.github.javafaker.Faker;
import com.levi.ws.factory.InstanceFactory;
import io.vertx.core.Vertx;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class PriceBroadcast {
private static final Logger LOG = LoggerFactory.getLogger(PriceBroadcast.class);
// 管理所有的连接
private static final Map<String, ServerWebSocket> connectionClients = new ConcurrentHashMap<>();
// 构造函数
public PriceBroadcast(Vertx vertx) {
periodicUpdate(vertx);
}
// 实现一个定时的机制,没一秒服务端给对应的连接回写造的假数据
private void periodicUpdate(Vertx vertx) {
Faker faker = InstanceFactory.faker();
String updatePrice = new JsonObject()
.put("email", faker.internet().emailAddress())
.put("price", faker.random().nextDouble())
.toString();
vertx.setPeriodic(Duration.ofSeconds(1).toMillis(), timerId -> {
connectionClients.values().forEach(ws -> {
LOG.info("send message to client:{}", ws.textHandlerID());
ws.writeTextMessage(updatePrice);
});
});
}
// 注册连接
public void register(ServerWebSocket webSocket) {
LOG.info("register client:{}", webSocket.textHandlerID());
connectionClients.put(webSocket.textHandlerID(), webSocket);
}
// 移除连接
public void unregister(ServerWebSocket webSocket) {
LOG.info("unRegister client:{}", webSocket.textHandlerID());
connectionClients.remove(webSocket.textHandlerID());
}
}
接下来我们需要修改一下其他的类,让这个工具类注入进来。
package com.levi.ws;
import com.levi.ws.handler.WebSocketHandler;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
public class MainVerticle extends AbstractVerticle {
private static final Logger LOG = LoggerFactory.getLogger(MainVerticle.class);
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new MainVerticle());
}
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.createHttpServer().webSocketHandler(new WebSocketHandler(vertx))
.listen(8080, result -> {
if (result.succeeded()) {
LOG.info("HTTP server started on port 8080");
startPromise.complete();
}else {
LOG.error("Failed to start HTTP server", result.cause());
startPromise.fail(result.cause());
}
});
}
}
package com.levi.ws.handler;
import com.levi.ws.PriceBroadcast;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketHandler implements Handler<ServerWebSocket> {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketHandler.class);
public static final String WEBSOCKET_PATH = "/ws/echo";
private static final String CLOSE_MSG = "please close";
private PriceBroadcast priceBroadcast;
public WebSocketHandler(Vertx vertx) {
this.priceBroadcast = new PriceBroadcast(vertx);
}
@Override
public void handle(ServerWebSocket ws) {
String wsPath = ws.path();
String textHandlerID = ws.textHandlerID();
if (!WEBSOCKET_PATH.equalsIgnoreCase(wsPath)) {
LOG.info("WebSocket path is {},and only accept path is {}", wsPath,WEBSOCKET_PATH);
ws.writeFinalTextFrame("wrong path and only accept path is "+WEBSOCKET_PATH+" ,please check your path");
ws.close();
return;
}
LOG.info("openning WebSocket {} is connected,path is {}", textHandlerID, wsPath);
ws.accept();
ws.frameHandler(webSocketFrameHandler(ws));
ws.endHandler( onClose -> {
LOG.info("WebSocket {} is closed", textHandlerID);
// 连接断开的时候就移除
priceBroadcast.unregister(ws);
});
ws.exceptionHandler(err -> LOG.error( "websocket is fail" ,err));
ws.writeTextMessage( "connected");
// 连接上来的时候就注册进来
priceBroadcast.register(ws);
}
private static Handler<WebSocketFrame> webSocketFrameHandler(ServerWebSocket ws) {
String textHandlerID = ws.textHandlerID();
return buffer -> {
String msg = buffer.textData();
if (CLOSE_MSG.equalsIgnoreCase(msg)) {
LOG.info("WebSocket {} closed", textHandlerID);
ws.writeFinalTextFrame("WebSocket " + textHandlerID + " closed");
ws.close();
} else {
LOG.info("WebSocket {} received message: {}", textHandlerID, msg);
ws.writeTextMessage("WebSocket " + textHandlerID + " received message: " + msg);
}
};
}
}
于是我们此时就完成了代码。我们来测试效果。我们这次开两个客户端。
实际效果如下:
websocket发送数据
一切正常。我们此时就完成了多个客户端的管理,后面我们就可以基于这个代码继续扩展,实现一个聊天的功能。