Vert.x学习(二)—— TCP服务端、客户端和HTTP服务端、客户端

发布于:2025-04-19 ⋅ 阅读:(15) ⋅ 点赞:(0)

TCP 服务端和客户端

  • 与Event Bus的区别?

    • 通信对象:客户端-服务器通信面向外部客户端,事件总线主要用于内部 Verticle 间的通信,但可通过桥接(SockJS)扩展到外部。
    • 通信模式:客户端-服务器通信通常是请求-响应,事件总线支持更灵活的模式,如发布-订阅、请求-响应和点对点。
    • 集成性:事件总线提供统一的内部消息系统,桥接后可与外部客户端无缝集成,而客户端-服务器通信需要单独处理协议。
  • TCP服务端

    • 若您在Verticle内创建了 TCP 服务端和客户端, 它们将会在Verticle撤销时自动关闭

    • 任意一个TCP服务端中的处理器总是在相同的Event-Loop线程上执行。

      这意味着如果您在多核的服务器上运行,并且只部署了一个实例, 那么您的服务器上最多只能使用一个核,此时需要部署更多的服务器实例,比如通过部署多个Verticle的方式,在对应的start方法中开启TCP服务器并进行监听(但实际上它内部仅维护一个服务器实例。当传入新的连接时, 它以轮询的方式将其分发给任意一个连接处理器处理)。

    • TCP服务端创建:

      //服务端配置,setRegisterWriteHandler用于将每个socket注册到event bus中,意味着Verticle可以向socket的地址(通过socket.writeHandlerId()方法获取)发送Buffer数据,但这个过程只能在本地服务器进行
      NetServerOptions options = new NetServerOptions().setRegisterWriteHandler(true);
      //创建TCP服务端
      NetServer server = vertx.createNetServer(options);
      //让服务端监听localhost:1234
      server.listen(1234, "localhost", res -> {
        if (res.succeeded()) {} 
        else {}
      });
      //连接建立完时收到通知,配置connectHandler
      server.connectHandler(socket -> {
        	//从socket(buffer)中读取数据
          socket.handler(buffer -> {
              //异步写入数据到socket中
              socket.write("hello")
          });
          //在socket关闭时收到通知,配置closeHandler
          socket.closeHandler(v -> {...});
          //在socket异常时收到通知,配置exceptionHandler
          socket.exceptionHandler(e -> {...});
          //获取socket地址
          socket.localAddress();
          //获取连接的另一方地址
          socket.remoteAddress();
          //发送文件和classpath中的资源
          socket.sendFile("myfile.dat");
          //升级到SSL/TLS连接,前提必须为服务器或客户端配置SSL/TLS才能正常工作
          socket.upgradeToSsl();
      });
      //关闭TCP服务端
      server.close(res -> {
        if (res.succeeded()) {} 
        else {}
      });
      
  • TCP客户端

    • TCP客户端创建:

      //配置超时时间、初始连接的重连次数和重连间隔
      NetClientOptions options = new NetClientOptions().setConnectTimeout(10000).setReconnectAttempts(10).setReconnectInterval(500);
      //创建TCP客户端
      NetClient client = vertx.createNetClient(options);
      
    • 发起请求:

      client.connect(4321, "localhost", res -> {
        if (res.succeeded()) {
          NetSocket socket = res.result();
        }
        else {}
      });
      

Http服务端和客户端

  • Http服务端

    • Http服务端创建:

      //记录网络活动,用于调试
      HttpServerOptions options = new HttpServerOptions().setLogActivity(true);
      //创建Http服务端
      HttpServer server = vertx.createHttpServer(options);
      //处理不合法的请求,比如请求头过大
      server.invalidRequestHandler(HttpServerRequest.DEFAULT_INVALID_REQUEST_HANDLER);
      
    • 监听地址:

      server.listen(8080, "localhoSt", res -> {
        if (res.succeeded()) {} 
        else {}
      });
      
    • 配置请求处理器:

      //记录网络活动,用于调试
      HttpServerOptions options = new HttpServerOptions().setLogActivity(true);
      //创建Http服务端
      HttpServer server = vertx.createHttpServer(options);
      //处理不合法的请求,比如请求头过大
      server.invalidRequestHandler(HttpServerRequest.DEFAULT_INVALID_REQUEST_HANDLER);
      //配置请求处理器,在读取完请求头后调用(因此没有请求体)
      server.requestHandler(request -> {
          //可以从request中读取uri,path,params和headers等其他信息。。。
      
          //接收请求体:设置处理器,每次请求体的一小块数据收到时,该处理器都会被调用,避免请求体数据过大耗尽服务器可用内存
          Buffer totalBuffer = Buffer.buffer();
          request.handler(buffer -> {
      		totalBuffer.appendBuffer(buffer);
          });
          //在读取完请求体后被调用
          request.endHandler(v -> {
      		System.out.println("请求体长度:" + totalBuffer.length());
              //建立response响应
              HttpServerResponse response = request.response();
              //响应数据
              response.setStatusCode(200);
              //设置响应头
      		response.putHeader("content-type", "text/html");
              //异步写入响应体,原理是将写操作进入队列
              response.write("Hello world");
              //在队列为空时结束响应并写入响应体
              response.end("响应结束");
              //设置响应异常处理器
              response.exceptionHandler(e -> {});
          });
          //设置请求异常处理器
          request.exceptionHandler(e -> {});
      });
      
    • 如果你确认不会传输很大的数据,则可以通过bodyHandler直接获取整个请求体: request.bodyHandler(totalBuffer -> {});

    • 配置HTTP/2 服务端:

      HttpServerOptions options = new HttpServerOptions()
          //ALPN是一个TLS的扩展,它在客户端和服务器开始交换数据之前协商协议
          .setUseAlpn(true)
          .setSsl(true)
          .setKeyStoreOptions(new JksOptions().setPath("/path/to/my/keystore"));
      

      当服务器接受 HTTP/2 连接时,它会向客户端发送其 初始设置 。 定义客户端如何使用连接,服务器的默认初始设置为:

      • getMaxConcurrentStreams :限制单个连接上可以同时打开的流(stream)数量
      • 其他默认的 HTTP/2 的设置
    • 处理HTML表单:

      • 使用 application/x-www-form-urlencodedmultipart/form-data 这两种 content-type 来提交 HTML 表单。

      • 对于使用 URL 编码过的表单,表单属性会被编码在URL中,如同普通查询参数一样。

      • 对于 multipart 类型的表单,它会被编码在请求体中,在整个请求体被 完全读取之前不可用

      • 若您想要读取 multipart 表单的属性,需要在读取请求体 之前 调用 setExpectMultipart 方法, 然后在整个请求体都被读取后,您可以使用 formAttributes 方法来读取表单属性。

        server.requestHandler(request -> {
          request.setExpectMultipart(true);
          request.endHandler(v -> {
            MultiMap formAttributes = request.formAttributes();
          });
        });
        
    • 处理文件上传:

      • Vert.x 可以处理以 multipart 编码形式上传的的文件

      • 在读取请求体 之前 调用 setExpectMultipart 方法,并对请求设置 uploadHandler

      • 当服务器每次接收到上传请求时, uploadHandler将被调用一次

        server.requestHandler(request -> {
          request.setExpectMultipart(true);
          request.uploadHandler(upload -> {
              //将文件上传到服务器磁盘的某个地方
              upload.streamToFileSystem("directory/" + upload.filename());
              //上传的文件可能很大,不会在单个缓冲区中包含整个数据,所以设置处理器分批接收
              upload.handler(chunk -> {
                System.out.println("length:" + chunk.length());
              });
          });
        });
        
    • 处理cookies:

      • 管理cookie

        server.requestHandler(request -> {
            //获取cookie
            Cookie key = request.getCookie("key");
            //移除cookie
            request.response().removeCookie("key");
            //添加cookie
            request.response().addCookie(Cookie.cookie("name", "value"));
        });
        
      • 给Cookie设置SameSite(限制站点发送):cookie.setSameSite(CookieSameSite.LAX);

        • None - 允许在跨域请求和非跨域请求中发送
        • Strict - 只能在同站点的请求中发送
        • Lax - 在跨域的子请求(例如调用加载图像或iframe)不发送该Cookie, 但当用户从外部站点导航到URL时将发送该Cookie, 例如通过链接打开。
    • 处理压缩体:Vert.x 可以处理在客户端通过 deflategzipbrotli 算法压缩过的请求体信息。在创建服务器时调用HttpServerOptions对象的 setDecompressionSupported(true) ,并确保在类路径classpath上存在有 Brotli4j类库。

    • 响应结束(调用end方法)后,vertx不会自动关闭keep-alive的连接,可以手动调用close方法关闭底层的TCP连接,或调用HttpServerOptions对象的setIdleTimeout方法配置空闲多少时间后自动关闭

    • 分块响应数据(HTTP/2流无效):response.setChunked(true);当处于分块模式时,每次调用任意一个 write 方法将导致新的 HTTP 块被写出(如果不分块则会在响应结束前一直缓存在内存中)

    • 响应文件:reponse.sendFile(Stirng name,long offset,long length)

    • 管道式(流式)响应:将输出流写入到响应中,这样可以直接往输出流写入数据达到流式传输的效果,不需要手动调用end方法结束响应

      response.setChunked(true);
      request.pipeTo(response);
      
    • 接收/写入自定义 HTTP/2 帧:HTTP/2 帧以二进制压缩格式存放内容。可以交错发送,然后根据每个帧头的数据流标识符重新组装。因此不受流量控制限制,会被立即发送或接收。

      //接收
      request.customFrameHandler(frame -> {
        System.out.println("Received a frame type=" + frame.type() +
            " payload" + frame.payload().toString());
      });
      //写入
      int frameType = 40;
      int frameStatus = 10;
      Buffer payload = Buffer.buffer("some data");
      
      // 向客户端发送一帧
      response.writeCustomFrame(frameType, frameStatus, payload);
      
    • 流重置

      HTTP/1.x 不允许请求或响应流执行清除重置,服务器需要接受整个响应。

      HTTP/2 在请求/响应期间随时支持流重置,默认会发送 NO_ERROR (0)错误代码

      request.response().reset();
      

      在流重置完成时收到通知:

      request.response().exceptionHandler(err -> {
        if (err instanceof StreamResetException) {
          StreamResetException reset = (StreamResetException) err;
          System.out.println("Stream reset " + reset.getCode());
        }
      });
      
    • 服务器推送(Server Push):HTTP/2 可以为单个客户端请求并行发送多个响应

      eg:当服务器准备推送响应时,推送响应处理器会被调用,并会发送响应(因此需要在响应结束之前调用 push 方法)。推送响应处理器可能会接收到失败,如:客户端可能取消推送,因为在缓存中已经包含了 main.js, 不再需要它

      // 准备响应时会推送main.js到客户端
      response.push(HttpMethod.GET, "/main.js", ar -> {
        if (ar.succeeded()) {
          // 服务器准备推送响应
          HttpServerResponse pushedResponse = ar.result();
          // 发送main.js响应
          pushedResponse.
              putHeader("content-type", "application/json").
              end("alert(\"Push response hello\")");
        } else {
          System.out.println("Could not push client resource " + ar.cause());
        }
      });
      
      // 响应请求的资源内容
      response.sendFile("<html><head><script src=\"/main.js\"></script></head><body></body></html>");
      
    • HTTP压缩:

      启动HTTP压缩后。服务器将自动检查客户端请求头中是否包含了 Accept-Encoding ,若找到,将使用所支持的压缩算法之一(gzip/deflate )自动压缩响应正文。

      设置压缩级别:压缩可以减少网络流量,但是CPU密集度会更高,所以通过调整压缩密度解决

      //支持压缩
      options.setCompressionSupported(true);
      //设置压缩界别
      options.setCompressionLevel(6);
      //设置压缩算法
      GzipOptions gzip = StandardCompressionOptions.gzip(6, 15, 8);
      options.addCompressor(gzip);
      

      关闭压缩:response.putHeader(HttpHeaders.CONTENT_ENCODING, HttpHeaders.IDENTITY)

  • HTTP客户端

    • HTTP客户端创建:

      HttpClient client = vertx.createHttpClient();
      
    • 配置连接池

      //启动长连接keep-alive
      options.setKeepAlive(false);
      //连接池的最大连接数
      options.setMaxPoolSize(10);
      //设置空闲连接超时时间
      options.setKeepAliveTimeout(100)
      
    • 配置管道:管道意味着在在不等待响应的情况下,在同一个连接发送另一个请求

      //启用管道
      options.setPipelining(true);
      //单个连接的管道的请求数限制
      options.setPipeliningLimit(2);
      
    • 配置HTTP/2客户端

      HttpClientOptions options = new HttpClientOptions()
          //启动HTTP/2协议
          .setProtocolVersion(HttpVersion.HTTP_2)
          //
          .setSsl(true)
          //使用ALPN启动TLS
          .setUseAlpn(true)
          .setTrustAll(true)
          //记录网络活动日志
          .setLogActivity(true);
      
    • HTTP/2多路复用配置

      HTTP/1 的长连接和管道,实现了在一个TCP连接上不等待响应,而连续发送多次请求,但由于处理响应的顺序性,会出现队头阻塞的问题(如果一个请求处理耗时过长,会阻塞管道中的后续请求)。

      HTTP/2 基于流和帧的特性,让每个HTTP请求和响应分解为多个可以乱序发送的帧,这些帧通过流ID重新组装,使得能在一个TCP连接上同时发送多个请求和响应,即多路复用。

      options
          //限制每个连接的多路复用流数量
          .setHttp2MultiplexingLimit(10)
          //设置HTTP/2连接池
          .setHttp2MaxPoolSize(3);
      
    • 发起请求:

      client.request(HttpMethod.GET,8080, "localhost", "/some-uri", ar -> {
        //配置请求处理器
        if (ar.succeeded()) {
          //获取请求
        	HttpClientRequest request = ar.result();
          //设置超时时间
          request.setTimeout(5000);
          //设置请求头
          MultiMap headers = HttpHeaders.set("content-type", "application/json");
          request.headers().adfdAll(headers);
          request.putHeader("other-header", "ailu");
          //send:将整个请求的数据发送到目标服务器,并代表请求已经完成,不能再写入额外数据。
          
          //发送请求1,携带String类型的请求体
          request.send("requestBody",ar1 -> {
            if (ar1.succeeded()) {
              HttpClientResponse response = ar1.result();
            }
          });
          //发送请求2,携带Buffer类型的请求体
          request.send(Buffer.buffer("requestBody"),ar1 -> {});
          //发送请求3,携带实现ReadStream类型的请求体
          request.send(stream,ar1 -> {});
        }
      });
      
    • 可提前配置要请求的主机/端口

      HttpClientOptions options = new HttpClientOptions().setDefaultHost("localhost").setDefaultPort(8080);
      
    • 使用write写入请求体,通过end结束请求,若不使用HTTP分块,则必须在写入请求前设置Content-Length头

      //设置分块模式
      request.setChunked(true);
      request.write("data1","UTF-8")
      request.write(Buffer.buffer("data2"))
      //结束请求
      request.end(body);
      
    • 使用流式请求将磁盘上的文件管送到HTTP 请求体中

      request.setChunked(true);
      asyncFile.pipeTo(request);
      
    • 写HTTP/2帧

      int frameType = 40;
      int frameStatus = 10;
      Buffer payload = Buffer.buffer("some data");
      
      // 发送一帧到服务器
      request.writeCustomFrame(frameType, frameStatus, payload);
      
    • 流重置:

      request.reset();
      //在流重置完成时您将会收到通知
      request.exceptionHandler(err -> {
        if (err instanceof StreamResetException) {
          StreamResetException reset = (StreamResetException) err;
          System.out.println("Stream reset " + reset.getCode());
        }
      });
      
    • 流式响应到WriteSteam中:response.pipeTo(WriteStream ws)

    • 读取响应体:如果响应中包含响应体,那么响应体可能会在读取完header后,以多个分片的形式到达,即当响应体的某部分(数据)到达时,handler 方法绑定的回调函数将会被调用

      response.handler(buffer -> {});
      //整个响应体被完全读取时,endHandler就会被调用。
      response.endHandler(v -> {});
      

      或者直接处理所有响应体数据

      response.body(ar -> {
        if (ar.succeeded()) {
          Buffer body = ar.result();
        }
      });
      
    • 请求和响应组合使用:

      HttpClient 客户端有意地避免返回 Future<HttpClientResponse> , 因为如果在 event-loop 之外设置 Future 的完成处理器可能会导致线程竞争。

      //会交给event loop线程执行
      Future<HttpClientResponse> get = client.get("some-uri");
      
      //假设此事件不在event-loop中,则阻塞主线程往下执行
      Thread.sleep(100);
      
      get.onSuccess(response -> {
        // 响应事件在event loop线程可能已经发生
        response.body(ar -> {
      
        });
      });
      

      HttpClientRequest 的使用限制在一个verticle的范围内, 因为Verticle会确保按顺序处理事件。

      vertx.deployVerticle(() -> new AbstractVerticle() {
       @Override
       public void start() {
      
          HttpClient client = vertx.createHttpClient();
          Future<HttpClientResponse> get = client.get("some-uri");
      
          //此事件在event-loop中,阻塞event loop线程(不建议这么做)
          Thread.sleep(100);
      
          get.onSuccess(response -> {
            // 响应事件不会提前发生,因为Verticle会保证按顺序处理事件
            response.body(ar -> {
      
            });
          });
       }
      }, new DeploymentOptions());
      

      verticle外使用HttpClient进行交互时,可以安全地使用“组合”(compose)。

      Future<JsonObject> future = client
        .request(HttpMethod.GET, "some-uri")
        .compose(request -> request
          .send()
          .compose(response -> {
              return response
                .body()
                .map(buffer -> buffer.toJsonObject());
            }
          }));
      future.onSuccess(json -> {})
      
    • 从响应中读取cookie:response.cookies()

    • 创建HTTP隧道

      client.request(HttpMethod.CONNECT, "some-uri")
        .onSuccess(request -> {
      
          // 连接到服务器
          request.connect(ar -> {
            if (ar.succeeded()) {
              HttpClientResponse response = ar.result();
      
              if (response.statusCode() != 200) {
                // 某些原因连接失败
              } else {
                // HTTP隧道创建成功,原始数据将传输到缓冲区
                NetSocket socket = response.netSocket();
              }
            }
          });
      });
      
    • 客户端(HTTP/2)接收推送资源

      // 设置一个推送处理器来感知服务器推送的任何资源
      request.pushHandler(pushedRequest -> {
        //如果不想收到推送,可重置流
        if (pushedRequest.path().equals("/main.js")) {
          pushedRequest.reset();
        } 
          
        System.out.println("Server pushed " + pushedRequest.path());
      
        // 为响应设置处理器
        pushedRequest.response().onComplete(pushedResponse -> {
          System.out.println("The response for the pushed request");
        });
      });
      
    • 接收自定义HTTP/2帧

      response.customFrameHandler(frame -> {
      
        System.out.println("Received a frame type=" + frame.type() +
            " payload" + frame.payload().toString());
      });
      
    • 启用压缩

      创建客户端时使用 setTryUseCompression 设置配置项启用压缩,请求头包含Accept-Encoding 头,其值为可支持的压缩算法,如Accept-Encoding: gzip, deflate

  • HTTP连接

    • 服务端连接

      //获取服务端上的请求连接
      HttpConnection connection = request.connection();
      //设置连接处理器,在任意连接建立时得到通知
      HttpServer server = vertx.createHttpServer(http2Options);
      server.connectionHandler(connection -> {});
      
    • 客户端连接

      //获取客户端上的请求连接
      HttpConnection connection = request.connection();
      //设置连接处理器,在连接建立时通知
      client.connectionHandler(connection -> {});
      
    • 连接配置(HTTP/2):每个Endpoint(端点)必须遵守连接另一端的发送设置,即建立连接时,客户端和服务端会交换初始配置

      //初始配置
      options.setInitialSettings(new Http2Settings().setMaxConcurrentStreams(100));
      //连接建立后,可随时更改设置
      connection.updateSettings();
      //在另一端更新设置后,这端会收到对应的远程设置
      connection.remoteSettingsHandler(settings -> {});
      
    • Ping(HTTP/2):确定连接往返时间或检查连接有效性

      //ping端
      connection.ping(data, pong -> {});
      
      //被ping端
      connection.pingHandler(ping -> {});
      
    • 连接关闭(HTTP/2)

      shutdown:客户端停止发送新请求,并发送 {@literal GOAWAY} 帧到服务端,要求其停止创建流并停止推送响应,直到当前所有流关闭,然后关闭连接。connection.shutdown()

      goAway:只发送 {@literal GOAWAY} 帧告诉远程连接停止创建流,但没有计划关闭连接。connection.shutdown()

      shutdownHandler:当所有流已经关闭 或 接收到 {@literal GOAWAY} 帧时被调用

      connection.shutdownHandler(v -> {
        // 所有流被关闭时,关闭连接
        connection.close();
      });
      

      close:关闭连接,对于HTTP/1,会关闭底层Socket,对于HTTP/2,在连接关闭前发送 {@literal GOAWAY} 帧

  • 扩展

    • 水平扩展-客户端共享:多个verticle实例共享一个HTTP客户端

      HttpClient client = vertx.createHttpClient(new HttpClientOptions().setShared(true));
      vertx.deployVerticle(() -> new AbstractVerticle() {
        @Override
        public void start() throws Exception {
          // 使用client
        }
      }, new DeploymentOptions().setInstances(4));
      
    • 水平扩展-服务端共享:当多个 HTTP 服务端在同一个端口上监听时,Vert.x 会使用轮询策略顺序委托给其中一个服务端

  • 启用HTTPS

    client.request(new RequestOptions()
        .setHost("localhost")
        .setPort(8080)
        .setURI("/")
        .setSsl(true), ar1 -> {});
    


网站公告

今日签到

点亮在社区的每一天
去签到