TCP 服务端和客户端
与Event Bus的区别?
- 通信对象:客户端-服务器通信面向外部客户端,事件总线主要用于内部 Verticle 间的通信,但可通过桥接(SockJS)扩展到外部。
- 通信模式:客户端-服务器通信通常是请求-响应,事件总线支持更灵活的模式,如发布-订阅、请求-响应和点对点。
- 集成性:事件总线提供统一的内部消息系统,桥接后可与外部客户端无缝集成,而客户端-服务器通信需要单独处理协议。
TCP服务端
若您在Verticle内创建了 TCP 服务端和客户端, 它们将会在Verticle撤销时自动关闭
任意一个TCP服务端中的处理器总是在相同的Event-Loop线程上执行。
这意味着如果您在多核的服务器上运行,并且只部署了一个实例, 那么您的服务器上最多只能使用一个核,此时需要部署更多的服务器实例,比如通过部署多个Verticle的方式,在对应的start方法中开启TCP服务器并进行监听(但实际上它内部仅维护一个服务器实例。当传入新的连接时, 它以轮询的方式将其分发给任意一个连接处理器处理)。
TCP服务端创建:
//服务端配置,setRegisterWriteHandler用于将每个socket注册到event bus中,意味着Verticle可以向socket的地址(通过socket.writeHandlerId()方法获取)发送Buffer数据,但这个过程只能在本地服务器进行 NetServerOptions options = new NetServerOptions().setRegisterWriteHandler(true); //创建TCP服务端 NetServer server = vertx.createNetServer(options); //让服务端监听localhost:1234 server.listen(1234, "localhost", res -> { if (res.succeeded()) {} else {} }); //连接建立完时收到通知,配置connectHandler server.connectHandler(socket -> { //从socket(buffer)中读取数据 socket.handler(buffer -> { //异步写入数据到socket中 socket.write("hello") }); //在socket关闭时收到通知,配置closeHandler socket.closeHandler(v -> {...}); //在socket异常时收到通知,配置exceptionHandler socket.exceptionHandler(e -> {...}); //获取socket地址 socket.localAddress(); //获取连接的另一方地址 socket.remoteAddress(); //发送文件和classpath中的资源 socket.sendFile("myfile.dat"); //升级到SSL/TLS连接,前提必须为服务器或客户端配置SSL/TLS才能正常工作 socket.upgradeToSsl(); }); //关闭TCP服务端 server.close(res -> { if (res.succeeded()) {} else {} });
TCP客户端
TCP客户端创建:
//配置超时时间、初始连接的重连次数和重连间隔 NetClientOptions options = new NetClientOptions().setConnectTimeout(10000).setReconnectAttempts(10).setReconnectInterval(500); //创建TCP客户端 NetClient client = vertx.createNetClient(options);
发起请求:
client.connect(4321, "localhost", res -> { if (res.succeeded()) { NetSocket socket = res.result(); } else {} });
Http服务端和客户端
Http服务端
Http服务端创建:
//记录网络活动,用于调试 HttpServerOptions options = new HttpServerOptions().setLogActivity(true); //创建Http服务端 HttpServer server = vertx.createHttpServer(options); //处理不合法的请求,比如请求头过大 server.invalidRequestHandler(HttpServerRequest.DEFAULT_INVALID_REQUEST_HANDLER);
监听地址:
server.listen(8080, "localhoSt", res -> { if (res.succeeded()) {} else {} });
配置请求处理器:
//记录网络活动,用于调试 HttpServerOptions options = new HttpServerOptions().setLogActivity(true); //创建Http服务端 HttpServer server = vertx.createHttpServer(options); //处理不合法的请求,比如请求头过大 server.invalidRequestHandler(HttpServerRequest.DEFAULT_INVALID_REQUEST_HANDLER); //配置请求处理器,在读取完请求头后调用(因此没有请求体) server.requestHandler(request -> { //可以从request中读取uri,path,params和headers等其他信息。。。 //接收请求体:设置处理器,每次请求体的一小块数据收到时,该处理器都会被调用,避免请求体数据过大耗尽服务器可用内存 Buffer totalBuffer = Buffer.buffer(); request.handler(buffer -> { totalBuffer.appendBuffer(buffer); }); //在读取完请求体后被调用 request.endHandler(v -> { System.out.println("请求体长度:" + totalBuffer.length()); //建立response响应 HttpServerResponse response = request.response(); //响应数据 response.setStatusCode(200); //设置响应头 response.putHeader("content-type", "text/html"); //异步写入响应体,原理是将写操作进入队列 response.write("Hello world"); //在队列为空时结束响应并写入响应体 response.end("响应结束"); //设置响应异常处理器 response.exceptionHandler(e -> {}); }); //设置请求异常处理器 request.exceptionHandler(e -> {}); });
如果你确认不会传输很大的数据,则可以通过bodyHandler直接获取整个请求体:
request.bodyHandler(totalBuffer -> {});
配置HTTP/2 服务端:
HttpServerOptions options = new HttpServerOptions() //ALPN是一个TLS的扩展,它在客户端和服务器开始交换数据之前协商协议 .setUseAlpn(true) .setSsl(true) .setKeyStoreOptions(new JksOptions().setPath("/path/to/my/keystore"));
当服务器接受 HTTP/2 连接时,它会向客户端发送其
初始设置
。 定义客户端如何使用连接,服务器的默认初始设置为:getMaxConcurrentStreams
:限制单个连接上可以同时打开的流(stream)数量- 其他默认的 HTTP/2 的设置
处理HTML表单:
使用
application/x-www-form-urlencoded
或multipart/form-data
这两种 content-type 来提交 HTML 表单。对于使用 URL 编码过的表单,表单属性会被编码在URL中,如同普通查询参数一样。
对于 multipart 类型的表单,它会被编码在请求体中,在整个请求体被 完全读取之前不可用
若您想要读取 multipart 表单的属性,需要在读取请求体 之前 调用
setExpectMultipart
方法, 然后在整个请求体都被读取后,您可以使用formAttributes
方法来读取表单属性。server.requestHandler(request -> { request.setExpectMultipart(true); request.endHandler(v -> { MultiMap formAttributes = request.formAttributes(); }); });
处理文件上传:
Vert.x 可以处理以 multipart 编码形式上传的的文件
在读取请求体 之前 调用
setExpectMultipart
方法,并对请求设置uploadHandler
当服务器每次接收到上传请求时, uploadHandler将被调用一次
server.requestHandler(request -> { request.setExpectMultipart(true); request.uploadHandler(upload -> { //将文件上传到服务器磁盘的某个地方 upload.streamToFileSystem("directory/" + upload.filename()); //上传的文件可能很大,不会在单个缓冲区中包含整个数据,所以设置处理器分批接收 upload.handler(chunk -> { System.out.println("length:" + chunk.length()); }); }); });
处理cookies:
管理cookie
server.requestHandler(request -> { //获取cookie Cookie key = request.getCookie("key"); //移除cookie request.response().removeCookie("key"); //添加cookie request.response().addCookie(Cookie.cookie("name", "value")); });
给Cookie设置SameSite(限制站点发送):
cookie.setSameSite(CookieSameSite.LAX);
- None - 允许在跨域请求和非跨域请求中发送
- Strict - 只能在同站点的请求中发送
- Lax - 在跨域的子请求(例如调用加载图像或iframe)不发送该Cookie, 但当用户从外部站点导航到URL时将发送该Cookie, 例如通过链接打开。
处理压缩体:Vert.x 可以处理在客户端通过 deflate 、 gzip 或 brotli 算法压缩过的请求体信息。在创建服务器时调用HttpServerOptions对象的
setDecompressionSupported(true)
,并确保在类路径classpath上存在有 Brotli4j类库。响应结束(调用end方法)后,vertx不会自动关闭keep-alive的连接,可以手动调用close方法关闭底层的TCP连接,或调用
HttpServerOptions
对象的setIdleTimeout
方法配置空闲多少时间后自动关闭分块响应数据(HTTP/2流无效):
response.setChunked(true);
当处于分块模式时,每次调用任意一个write
方法将导致新的 HTTP 块被写出(如果不分块则会在响应结束前一直缓存在内存中)响应文件:
reponse.sendFile(Stirng name,long offset,long length)
管道式(流式)响应:将输出流写入到响应中,这样可以直接往输出流写入数据达到流式传输的效果,不需要手动调用end方法结束响应
response.setChunked(true); request.pipeTo(response);
接收/写入自定义 HTTP/2 帧:HTTP/2 帧以二进制压缩格式存放内容。可以交错发送,然后根据每个帧头的数据流标识符重新组装。因此不受流量控制限制,会被立即发送或接收。
//接收 request.customFrameHandler(frame -> { System.out.println("Received a frame type=" + frame.type() + " payload" + frame.payload().toString()); }); //写入 int frameType = 40; int frameStatus = 10; Buffer payload = Buffer.buffer("some data"); // 向客户端发送一帧 response.writeCustomFrame(frameType, frameStatus, payload);
流重置
HTTP/1.x 不允许请求或响应流执行清除重置,服务器需要接受整个响应。
HTTP/2 在请求/响应期间随时支持流重置,默认会发送
NO_ERROR
(0)错误代码request.response().reset();
在流重置完成时收到通知:
request.response().exceptionHandler(err -> { if (err instanceof StreamResetException) { StreamResetException reset = (StreamResetException) err; System.out.println("Stream reset " + reset.getCode()); } });
服务器推送(Server Push):HTTP/2 可以为单个客户端请求并行发送多个响应
eg:当服务器准备推送响应时,推送响应处理器会被调用,并会发送响应(因此需要在响应结束之前调用
push
方法)。推送响应处理器可能会接收到失败,如:客户端可能取消推送,因为在缓存中已经包含了main.js
, 不再需要它// 准备响应时会推送main.js到客户端 response.push(HttpMethod.GET, "/main.js", ar -> { if (ar.succeeded()) { // 服务器准备推送响应 HttpServerResponse pushedResponse = ar.result(); // 发送main.js响应 pushedResponse. putHeader("content-type", "application/json"). end("alert(\"Push response hello\")"); } else { System.out.println("Could not push client resource " + ar.cause()); } }); // 响应请求的资源内容 response.sendFile("<html><head><script src=\"/main.js\"></script></head><body></body></html>");
HTTP压缩:
启动HTTP压缩后。服务器将自动检查客户端请求头中是否包含了
Accept-Encoding
,若找到,将使用所支持的压缩算法之一(gzip/deflate )自动压缩响应正文。设置压缩级别:压缩可以减少网络流量,但是CPU密集度会更高,所以通过调整压缩密度解决
//支持压缩 options.setCompressionSupported(true); //设置压缩界别 options.setCompressionLevel(6); //设置压缩算法 GzipOptions gzip = StandardCompressionOptions.gzip(6, 15, 8); options.addCompressor(gzip);
关闭压缩:
response.putHeader(HttpHeaders.CONTENT_ENCODING, HttpHeaders.IDENTITY)
HTTP客户端
HTTP客户端创建:
HttpClient client = vertx.createHttpClient();
配置连接池
//启动长连接keep-alive options.setKeepAlive(false); //连接池的最大连接数 options.setMaxPoolSize(10); //设置空闲连接超时时间 options.setKeepAliveTimeout(100)
配置管道:管道意味着在在不等待响应的情况下,在同一个连接上发送另一个请求。
//启用管道 options.setPipelining(true); //单个连接的管道的请求数限制 options.setPipeliningLimit(2);
配置HTTP/2客户端
HttpClientOptions options = new HttpClientOptions() //启动HTTP/2协议 .setProtocolVersion(HttpVersion.HTTP_2) // .setSsl(true) //使用ALPN启动TLS .setUseAlpn(true) .setTrustAll(true) //记录网络活动日志 .setLogActivity(true);
HTTP/2多路复用配置
HTTP/1 的长连接和管道,实现了在一个TCP连接上不等待响应,而连续发送多次请求,但由于处理响应的顺序性,会出现队头阻塞的问题(如果一个请求处理耗时过长,会阻塞管道中的后续请求)。
HTTP/2 基于流和帧的特性,让每个HTTP请求和响应分解为多个可以乱序发送的帧,这些帧通过流ID重新组装,使得能在一个TCP连接上同时发送多个请求和响应,即多路复用。
options //限制每个连接的多路复用流数量 .setHttp2MultiplexingLimit(10) //设置HTTP/2连接池 .setHttp2MaxPoolSize(3);
发起请求:
client.request(HttpMethod.GET,8080, "localhost", "/some-uri", ar -> { //配置请求处理器 if (ar.succeeded()) { //获取请求 HttpClientRequest request = ar.result(); //设置超时时间 request.setTimeout(5000); //设置请求头 MultiMap headers = HttpHeaders.set("content-type", "application/json"); request.headers().adfdAll(headers); request.putHeader("other-header", "ailu"); //send:将整个请求的数据发送到目标服务器,并代表请求已经完成,不能再写入额外数据。 //发送请求1,携带String类型的请求体 request.send("requestBody",ar1 -> { if (ar1.succeeded()) { HttpClientResponse response = ar1.result(); } }); //发送请求2,携带Buffer类型的请求体 request.send(Buffer.buffer("requestBody"),ar1 -> {}); //发送请求3,携带实现ReadStream类型的请求体 request.send(stream,ar1 -> {}); } });
可提前配置要请求的主机/端口
HttpClientOptions options = new HttpClientOptions().setDefaultHost("localhost").setDefaultPort(8080);
使用write写入请求体,通过end结束请求,若不使用HTTP分块,则必须在写入请求前设置Content-Length头
//设置分块模式 request.setChunked(true); request.write("data1","UTF-8") request.write(Buffer.buffer("data2")) //结束请求 request.end(body);
使用流式请求将磁盘上的文件管送到HTTP 请求体中
request.setChunked(true); asyncFile.pipeTo(request);
写HTTP/2帧
int frameType = 40; int frameStatus = 10; Buffer payload = Buffer.buffer("some data"); // 发送一帧到服务器 request.writeCustomFrame(frameType, frameStatus, payload);
流重置:
request.reset(); //在流重置完成时您将会收到通知 request.exceptionHandler(err -> { if (err instanceof StreamResetException) { StreamResetException reset = (StreamResetException) err; System.out.println("Stream reset " + reset.getCode()); } });
流式响应到WriteSteam中:
response.pipeTo(WriteStream ws)
读取响应体:如果响应中包含响应体,那么响应体可能会在读取完header后,以多个分片的形式到达,即当响应体的某部分(数据)到达时,
handler
方法绑定的回调函数将会被调用response.handler(buffer -> {}); //整个响应体被完全读取时,endHandler就会被调用。 response.endHandler(v -> {});
或者直接处理所有响应体数据
response.body(ar -> { if (ar.succeeded()) { Buffer body = ar.result(); } });
请求和响应组合使用:
HttpClient 客户端有意地避免返回
Future<HttpClientResponse>
, 因为如果在 event-loop 之外设置 Future 的完成处理器可能会导致线程竞争。//会交给event loop线程执行 Future<HttpClientResponse> get = client.get("some-uri"); //假设此事件不在event-loop中,则阻塞主线程往下执行 Thread.sleep(100); get.onSuccess(response -> { // 响应事件在event loop线程可能已经发生 response.body(ar -> { }); });
将
HttpClientRequest
的使用限制在一个verticle的范围内, 因为Verticle会确保按顺序处理事件。vertx.deployVerticle(() -> new AbstractVerticle() { @Override public void start() { HttpClient client = vertx.createHttpClient(); Future<HttpClientResponse> get = client.get("some-uri"); //此事件在event-loop中,阻塞event loop线程(不建议这么做) Thread.sleep(100); get.onSuccess(response -> { // 响应事件不会提前发生,因为Verticle会保证按顺序处理事件 response.body(ar -> { }); }); } }, new DeploymentOptions());
在verticle外使用HttpClient进行交互时,可以安全地使用“组合”(compose)。
Future<JsonObject> future = client .request(HttpMethod.GET, "some-uri") .compose(request -> request .send() .compose(response -> { return response .body() .map(buffer -> buffer.toJsonObject()); } })); future.onSuccess(json -> {})
从响应中读取cookie:
response.cookies()
创建HTTP隧道
client.request(HttpMethod.CONNECT, "some-uri") .onSuccess(request -> { // 连接到服务器 request.connect(ar -> { if (ar.succeeded()) { HttpClientResponse response = ar.result(); if (response.statusCode() != 200) { // 某些原因连接失败 } else { // HTTP隧道创建成功,原始数据将传输到缓冲区 NetSocket socket = response.netSocket(); } } }); });
客户端(HTTP/2)接收推送资源
// 设置一个推送处理器来感知服务器推送的任何资源 request.pushHandler(pushedRequest -> { //如果不想收到推送,可重置流 if (pushedRequest.path().equals("/main.js")) { pushedRequest.reset(); } System.out.println("Server pushed " + pushedRequest.path()); // 为响应设置处理器 pushedRequest.response().onComplete(pushedResponse -> { System.out.println("The response for the pushed request"); }); });
接收自定义HTTP/2帧
response.customFrameHandler(frame -> { System.out.println("Received a frame type=" + frame.type() + " payload" + frame.payload().toString()); });
启用压缩
创建客户端时使用
setTryUseCompression
设置配置项启用压缩,请求头包含Accept-Encoding
头,其值为可支持的压缩算法,如Accept-Encoding: gzip, deflate
HTTP连接
服务端连接
//获取服务端上的请求连接 HttpConnection connection = request.connection(); //设置连接处理器,在任意连接建立时得到通知 HttpServer server = vertx.createHttpServer(http2Options); server.connectionHandler(connection -> {});
客户端连接
//获取客户端上的请求连接 HttpConnection connection = request.connection(); //设置连接处理器,在连接建立时通知 client.connectionHandler(connection -> {});
连接配置(HTTP/2):每个Endpoint(端点)必须遵守连接另一端的发送设置,即建立连接时,客户端和服务端会交换初始配置
//初始配置 options.setInitialSettings(new Http2Settings().setMaxConcurrentStreams(100)); //连接建立后,可随时更改设置 connection.updateSettings(); //在另一端更新设置后,这端会收到对应的远程设置 connection.remoteSettingsHandler(settings -> {});
Ping(HTTP/2):确定连接往返时间或检查连接有效性
//ping端 connection.ping(data, pong -> {}); //被ping端 connection.pingHandler(ping -> {});
连接关闭(HTTP/2)
shutdown:客户端停止发送新请求,并发送
{@literal GOAWAY} 帧
到服务端,要求其停止创建流并停止推送响应,直到当前所有流关闭,然后关闭连接。connection.shutdown()
goAway:只发送
{@literal GOAWAY} 帧
告诉远程连接停止创建流,但没有计划关闭连接。connection.shutdown()
shutdownHandler:当所有流已经关闭 或 接收到
{@literal GOAWAY} 帧
时被调用connection.shutdownHandler(v -> { // 所有流被关闭时,关闭连接 connection.close(); });
close:关闭连接,对于HTTP/1,会关闭底层Socket,对于HTTP/2,在连接关闭前发送
{@literal GOAWAY} 帧
扩展
水平扩展-客户端共享:多个verticle实例共享一个HTTP客户端
HttpClient client = vertx.createHttpClient(new HttpClientOptions().setShared(true)); vertx.deployVerticle(() -> new AbstractVerticle() { @Override public void start() throws Exception { // 使用client } }, new DeploymentOptions().setInstances(4));
水平扩展-服务端共享:当多个 HTTP 服务端在同一个端口上监听时,Vert.x 会使用轮询策略顺序委托给其中一个服务端
启用HTTPS
client.request(new RequestOptions() .setHost("localhost") .setPort(8080) .setURI("/") .setSsl(true), ar1 -> {});