ES传输带宽优化方案

发布于:2025-02-11 ⋅ 阅读:(15) ⋅ 点赞:(0)

背景:目前日志从kafka中消费后转存ES,是通过批量发送的方式打入ES,但是如果数据量很大那么就会占用很多的带宽,而目前正在降本增效,无法增加带宽或者服务节点。

源码在最下方!!

  • 限流牺牲磁盘作为代价
  1. 从kafka消费消息后,向ES发送数据时进行限流比如50M批量发送,但是这个时候遇到的问题就是消息消费出来100M 剩下的50M怎么办,存储在本机的Mysql中或者持久化文件中,后续再慢慢消费。也不用担心所谓的消息顺序的问题,因为流量、告警中时间类字段均是在落ES之前就已经设置好了。但是磁盘总有满的一天。此方案舍弃
  2. 减少kafka生产者发送量,因为探针一直在生成日志,所以会有数据堆积,如果放在mysql或者磁盘中也会有撑满的一天,此方案舍弃
  3. 通过代码中增加消费等待时间,降低消费速度,得到限流,但是这样会出现broker消息堆积,而且kafka目前设置的是5分钟删除旧数据,消费堆积会撑满磁盘,而且因为5分钟的设置 会出现删除未消费的数据,导致消息丢失。此方案舍弃
  • 限流牺牲内存作为代价
  1. 引用上面的解决方法,将限流出现的堆积数据存储在内存中或者引入redis,都会牺牲内存。此方案舍弃
  • 增加服务节点作为代价
  1. 新增服务器,用来存储限流后堆积的数据,不管是磁盘也好 内存也罢,都对业务系统所在的服务器无影响,但是会出现成本增加,此方案舍弃
  • 删除冗余字段、删除不必要的字段类型(text)
  1. 数据发送占用带宽,字段过多也有原因,如果一条数据10个字段与30个字段,所占用的字节肯定是有区别,所以删除掉索引中不必要的字段,可以测试一下能提升多少性能。
  2. 目前所有索引的mapping字段都支持两种方式(text、keyword)对于需要分词查询的字段可以沿用两种方式,但是对于其他不涉及分词、要是集中在精确查询、排序、聚合的字段 可以改为只有 keyword,因为text由于需要存储原始文本以及经过分词后的各个词项,因此通常比keyword占用更多的存储空间

  • 使用httpClient方式发送批量操作(bulk)请求,并使用gzip进行压缩

原理:通过gzip压缩,将50万条数据转为字节流,然后再转为字节数组,放入entity实体中,然后告诉ES服务器,是通过gzip的方式压缩的,ES接收到数据后会通过gzip的方式解压落库。

压缩数据部分代码:

压缩后测试结果 5万条数据 带宽占用最高 667KB!!

未压缩数据部分代码

未压缩测试结果 5万条数据 带宽占用达到了16M 最高达到19M

结论

对于5万数据来说  压缩与不压缩相差了30倍,但是目前5万条数据大部分字段都是相同的,所以在压缩的时候 压缩比很高。实际情况可能就是10倍  但也是显而易见压缩后带宽占用很少!!

源码贴出来了,兄弟们自取

package com.event.util;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;

import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
import java.security.cert.X509Certificate;
import java.util.zip.GZIPOutputStream;
public class ElasticsearchBulkInsert {

    public static void main(String[] args) throws Exception {
        // 设置凭据提供者
        BasicCredentialsProvider credsProvider = new BasicCredentialsProvider();
        credsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials("root", "root"));
        // 创建允许所有证书的SSL上下文
        SSLContext sslContext = SSLContexts.custom()
                .loadTrustMaterial(null, (X509Certificate[] chain, String authType) -> true)
                .build();
        Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
                .register("https", new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE))
                .build();
        // 创建连接池管理器
        PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(registry);
        // 创建HttpClient,并设置默认请求头接受gzip编码
        CloseableHttpClient httpClient = HttpClients.custom()
                .setConnectionManager(connectionManager)
                .setDefaultCredentialsProvider(credsProvider)
                .addInterceptorFirst((HttpResponseInterceptor) (request, context) -> request.setHeader("Accept-Encoding", "gzip"))
                .build();
        try {
            // 准备多个文档组成的bulk请求体
            /**
             * 下面是通过gzip压缩的逻辑
             */
            StringBuilder bulkRequestBody = new StringBuilder();
            for (int i=0;i<50000;i++){
                bulkRequestBody.append("{\"index\":{\"_index\":\"http20250208\",\"op_type\":\"create\"}}\n");
                bulkRequestBody.append("{\"agentId\":\"szhangsan-test-"+i+"\",\"requestByteRange6\":0,\"requestByteRange7\":0,\"client_city\":\"192.168.0.0/16\",\"db_position\":\"内对内\",\"vlanType\":0,\"totalPktps\":2,\"serverPayload\":84,\"responseByteRange3\":0,\"totalPayload\":84,\"server_country_code\":\"CN\",\"clientIp\":\"192.168.2.201\",\"logOffsets\":[],\"clientBitps\":0,\"dataOffsets\":[288236297029019500],\"serverBitps\":1344,\"totalPkts\":2,\"clientPkts\":0,\"deviceId\":\"037ef078-4ccc-480c-97c5-eb9c70dfdd79\",\"serverIpType\":0,\"id\":\"4615197014644469761kQ\",\"client_country_code\":\"CN\",\"client_province\":\"沈庄数据中心\",\"responseByteRange4\":0,\"responseByteRange5\":0,\"clientNetSegmentIds\":[],\"dataLen\":0,\"vlanId2\":0,\"eventType\":2,\"appProtocol\":0,\"requestByteRange3\":0,\"client_longitude\":\"116.400000\",\"dataLens\":[248],\"serverIp\":\"192.168.2.203\",\"alarmCount\":0,\"server_longitude\":\"116.400000\",\"totalBytes\":168,\"clientIpType\":0,\"serverTransRate\":50,\"requestByteRange4\":0,\"serverPktps\":2,\"transProtocol\":2,\"durationTimeNs\":1000000000,\"clientTransRate\":0,\"requestByteRange1\":1,\"appId\":249,\"kafkakey\":\"event\",\"probeIds\":[11],\"serverPkts\":2,\"serverPort\":26051,\"clientPayload\":0,\"clientPortList\":[],\"totalTransRate\":50,\"responseByteRange6\":0,\"responseByteRange2\":1,\"server_country\":\"中国\",\"dataOffset\":0,\"serverBytes\":168,\"flowBeginTimeNs\":1738914456332849000,\"statBeginTimeSec\":1738944831,\"requestByteRange5\":0,\"eventStatId\":461519701464447000,\"statEndTimeSec\":1738944831,\"requestByteRange2\":0,\"vai\":141,\"date\":\"2025-02-08\",\"logLens\":[],\"eventCount\":1,\"clientPktps\":0,\"responseByteRange7\":0,\"client_latitude\":\"39.900000\",\"server_city\":\"192.168.111.111\",\"totalBitps\":1344,\"server_province\":\"北京数据中心\",\"client_country\":\"中国\",\"server_latitude\":\"39.900000\",\"vlanId\":0,\"clientBytes\":0,\"flowEndTimeNs\":1738944831883650000,\"responseByteRange1\":0,\"serverNetSegmentIds\":[]}\n"); // 第一条记录
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
                gzipOutputStream.write(bulkRequestBody.toString().getBytes("UTF-8"));
            }
            HttpPost httpPost = new HttpPost("/_bulk");
            ByteArrayEntity entity = new ByteArrayEntity(byteArrayOutputStream.toByteArray());
            entity.setContentType("application/json");
            entity.setContentEncoding("gzip");
            httpPost.setEntity(entity);
            /**
             * 下面是不压缩的
             */
           /* StringBuilder bulkRequestBody = new StringBuilder();
            for (int i=0;i<50000;i++){
                bulkRequestBody.append("{\"index\":{\"_index\":\"ecs_http20250208\",\"op_type\":\"create\"}}\n");
                bulkRequestBody.append("{\"agentId\":\"shichuanzong-test-"+i+"\",\"requestByteRange6\":0,\"requestByteRange7\":0,\"client_city\":\"192.168.0.0/16\",\"db_position\":\"内对内\",\"vlanType\":0,\"totalPktps\":2,\"serverPayload\":84,\"responseByteRange3\":0,\"totalPayload\":84,\"server_country_code\":\"CN\",\"clientIp\":\"192.168.2.201\",\"logOffsets\":[],\"clientBitps\":0,\"dataOffsets\":[288236297029019500],\"serverBitps\":1344,\"totalPkts\":2,\"clientPkts\":0,\"deviceId\":\"037ef078-4ccc-480c-97c5-eb9c70dfdd79\",\"serverIpType\":0,\"id\":\"4615197014644469761kQ\",\"client_country_code\":\"CN\",\"client_province\":\"沈庄数据中心\",\"responseByteRange4\":0,\"responseByteRange5\":0,\"clientNetSegmentIds\":[],\"dataLen\":0,\"vlanId2\":0,\"eventType\":2,\"appProtocol\":0,\"requestByteRange3\":0,\"client_longitude\":\"116.400000\",\"dataLens\":[248],\"serverIp\":\"192.168.2.203\",\"alarmCount\":0,\"server_longitude\":\"116.400000\",\"totalBytes\":168,\"clientIpType\":0,\"serverTransRate\":50,\"requestByteRange4\":0,\"serverPktps\":2,\"transProtocol\":2,\"durationTimeNs\":1000000000,\"clientTransRate\":0,\"requestByteRange1\":1,\"appId\":249,\"kafkakey\":\"event\",\"probeIds\":[11],\"serverPkts\":2,\"serverPort\":26051,\"clientPayload\":0,\"clientPortList\":[],\"totalTransRate\":50,\"responseByteRange6\":0,\"responseByteRange2\":1,\"server_country\":\"中国\",\"dataOffset\":0,\"serverBytes\":168,\"flowBeginTimeNs\":1738914456332849000,\"statBeginTimeSec\":1738944831,\"requestByteRange5\":0,\"eventStatId\":461519701464447000,\"statEndTimeSec\":1738944831,\"requestByteRange2\":0,\"vai\":141,\"date\":\"2025-02-08\",\"logLens\":[],\"eventCount\":1,\"clientPktps\":0,\"responseByteRange7\":0,\"client_latitude\":\"39.900000\",\"server_city\":\"192.168.111.111\",\"totalBitps\":1344,\"server_province\":\"北京数据中心\",\"client_country\":\"中国\",\"server_latitude\":\"39.900000\",\"vlanId\":0,\"clientBytes\":0,\"flowEndTimeNs\":1738944831883650000,\"responseByteRange1\":0,\"serverNetSegmentIds\":[]}\n"); // 第一条记录
            }
            HttpPost httpPost = new HttpPost("/_bulk");
            StringEntity entity = new StringEntity(bulkRequestBody.toString(),"UTF-8");
            entity.setContentType("application/json");
            httpPost.setEntity(entity);*/

            // 目标主机
            HttpHost targetHost = new HttpHost("192.168.2.1", 19200, "https");
            // 执行请求
            CloseableHttpResponse response = httpClient.execute(targetHost, httpPost);
            try {
                HttpEntity entity1 = response.getEntity();
                if (entity1 != null) {
                    // 打印响应体内容
                    String responseString = EntityUtils.toString(entity1, "UTF-8");
                    System.out.println(responseString); // 这里将显示具体的错误信息
                }
            } finally {
                response.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            httpClient.close(); // 关闭httpClient以释放资源
        }
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到