物联网之使用Vertx实现HTTP/WebSocket最佳实践

发布于:2025-05-24 ⋅ 阅读:(18) ⋅ 点赞:(0)

小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现HTTP-Server和WebSocket-Server

实现Http/WebSocket【响应式】

Vertx-Web地址

实现过程

查看源码

代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
http/websocket【响应式】
<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-web</artifactId>
  <version>5.0.0</version>
</dependency>

HttpServerProperties

/**
 * @author laokou
 */
@Data
@Component
@ConfigurationProperties(prefix = "spring.http-server")
public class HttpServerProperties {

    private boolean auth = true;

    private String host = "0.0.0.0";

    private Set<Integer> ports = new HashSet<>(0);

    private boolean compressionSupported = false;

    private int compressionLevel = 6;

    private int maxWebSocketFrameSize = 65536;

    private int maxWebSocketMessageSize = 65536 * 4;

    private boolean handle100ContinueAutomatically = false;

    private int maxChunkSize = 8192;

    private int maxInitialLineLength = 4096;

    private int maxHeaderSize = 8192;

    private int maxFormAttributeSize = 8192;

    private int maxFormFields = 512;

    private int maxFormBufferedBytes = 2048;

    private Http2Settings initialSettings = new Http2Settings()
       .setMaxConcurrentStreams(DEFAULT_INITIAL_SETTINGS_MAX_CONCURRENT_STREAMS);

    private List<HttpVersion> alpnVersions = new ArrayList<>(DEFAULT_ALPN_VERSIONS);

    private boolean http2ClearTextEnabled = true;

    private int http2ConnectionWindowSize = -1;

    private boolean decompressionSupported = false;

    private boolean acceptUnmaskedFrames = false;

    private int decoderInitialBufferSize = 256;

    private boolean perFrameWebSocketCompressionSupported = true;

    private boolean perMessageWebSocketCompressionSupported = true;

    private int webSocketCompressionLevel = 6;

    private boolean webSocketAllowServerNoContext = false;

    private boolean webSocketPreferredClientNoContext = false;

    private int webSocketClosingTimeout = 30;

    private TracingPolicy tracingPolicy = TracingPolicy.ALWAYS;

    private boolean registerWebSocketWriteHandlers = false;

    private int http2RstFloodMaxRstFramePerWindow = 400;

    private int http2RstFloodWindowDuration = 60;

    private TimeUnit http2RstFloodWindowDurationTimeUnit = TimeUnit.SECONDS;

}

VertxHttpServer

/**
 * @author laokou
 */
@Slf4j
final class VertxHttpServer extends AbstractVerticle {

    private final HttpServerProperties properties;

    private final Vertx vertx;

    private final Router router;

    private volatile Flux<HttpServer> httpServer;

    private boolean isClosed = false;

    VertxHttpServer(Vertx vertx, HttpServerProperties properties) {
       this.vertx = vertx;
       this.properties = properties;
       this.router = getRouter();
    }

    @Override
    public synchronized void start() {
       httpServer = getHttpServerOptions().map(vertx::createHttpServer)
          .doOnNext(server -> server.webSocketHandler(serverWebSocket -> {
             if (!RegexUtils.matches(WebsocketMessageEnum.UP_PROPERTY_REPORT.getPath(), serverWebSocket.path())) {
                serverWebSocket.close();
                return;
             }
             serverWebSocket.textMessageHandler(message -> log.info("【Vertx-Websocket-Server】 => 收到消息:{}", message))
                .closeHandler(v -> log.error("【Vertx-Websocket-Server】 => 断开连接"))
                .exceptionHandler(err -> log.error("【Vertx-Websocket-Server】 => 错误信息:{}", err.getMessage(), err))
                .endHandler(v -> log.error("【Vertx-Websocket-Server】 => 结束"));
          }).requestHandler(router).listen().onComplete(completionHandler -> {
             if (isClosed) {
                return;
             }
             if (completionHandler.succeeded()) {
                log.info("【Vertx-Http-Server】 => HTTP服务启动成功,端口:{}", server.actualPort());
             }
             else {
                Throwable ex = completionHandler.cause();
                log.error("【Vertx-Http-Server】 => HTTP服务启动失败,错误信息:{}", ex.getMessage(), ex);
             }
          }));
       httpServer.subscribeOn(Schedulers.boundedElastic()).subscribe();
    }

    @Override
    public synchronized void stop() {
       isClosed = true;
       httpServer.doOnNext(server -> server.close().onComplete(result -> {
          if (result.succeeded()) {
             log.info("【Vertx-Http-Server】 => HTTP服务停止成功,端口:{}", server.actualPort());
          }
          else {
             Throwable ex = result.cause();
             log.error("【Vertx-Http-Server】 => HTTP服务停止失败,错误信息:{}", ex.getMessage(), ex);
          }
       })).subscribeOn(Schedulers.boundedElastic()).subscribe();
    }

    public void deploy() {
       // 部署服务
       vertx.deployVerticle(this);
       // 停止服务
       Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
    }

    private Router getRouter() {
       Router router = Router.router(vertx);
       router.route().handler(BodyHandler.create());
       router.post(HttpMessageEnum.UP_PROPERTY_REPORT.getRouter()).handler(ctx -> {
          String body = ctx.body().asString();
          Long deviceId = Long.valueOf(ctx.pathParam("deviceId"));
          Long productId = Long.valueOf(ctx.pathParam("productId"));
          log.info("productId:{},deviceId:{},body:{}", productId, deviceId, body);
          ctx.response().end();
       });
       return router;
    }

    private Flux<HttpServerOptions> getHttpServerOptions() {
       return Flux.fromIterable(properties.getPorts()).map(this::getHttpServerOption);
    }

    private HttpServerOptions getHttpServerOption(int port) {
       HttpServerOptions options = new HttpServerOptions();
       options.setHost(properties.getHost());
       options.setPort(port);
       options.setCompressionSupported(properties.isCompressionSupported());
       options.setDecompressionSupported(properties.isDecompressionSupported());
       options.setCompressionLevel(properties.getCompressionLevel());
       options.setMaxWebSocketFrameSize(properties.getMaxWebSocketFrameSize());
       options.setMaxWebSocketMessageSize(properties.getMaxWebSocketMessageSize());
       options.setHandle100ContinueAutomatically(properties.isHandle100ContinueAutomatically());
       options.setMaxChunkSize(properties.getMaxChunkSize());
       options.setMaxInitialLineLength(properties.getMaxInitialLineLength());
       options.setMaxHeaderSize(properties.getMaxHeaderSize());
       options.setMaxFormAttributeSize(properties.getMaxFormAttributeSize());
       options.setMaxFormFields(properties.getMaxFormFields());
       options.setMaxFormBufferedBytes(properties.getMaxFormBufferedBytes());
       options.setInitialSettings(properties.getInitialSettings());
       options.setAlpnVersions(properties.getAlpnVersions());
       options.setHttp2ClearTextEnabled(properties.isHttp2ClearTextEnabled());
       options.setHttp2ConnectionWindowSize(properties.getHttp2ConnectionWindowSize());
       options.setDecoderInitialBufferSize(properties.getDecoderInitialBufferSize());
       options.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());
       options.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());
       options.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());
       options.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());
       options.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());
       options.setWebSocketClosingTimeout(properties.getWebSocketClosingTimeout());
       options.setTracingPolicy(properties.getTracingPolicy());
       options.setRegisterWebSocketWriteHandlers(properties.isRegisterWebSocketWriteHandlers());
       options.setHttp2RstFloodMaxRstFramePerWindow(properties.getHttp2RstFloodMaxRstFramePerWindow());
       options.setHttp2RstFloodWindowDuration(properties.getHttp2RstFloodWindowDuration());
       options.setHttp2RstFloodWindowDurationTimeUnit(properties.getHttp2RstFloodWindowDurationTimeUnit());
       return options;
    }

}

这只是一个demo,实际情况,需要对http请求进行鉴权,推荐使用OAuth2

我是老寇,我们下次再见啦!


网站公告

今日签到

点亮在社区的每一天
去签到