elasticsearch之记录es7.17升级8.17 springboot2.7.0 程序改造坑

发布于:2025-05-20 ⋅ 阅读:(14) ⋅ 点赞:(0)

一、硬件安装

1-1. centos7 服务器上,删除elasticsearch7.17,安装es8.17

todo
在这里插入图片描述
21.18服务器
在这里插入图片描述

二、 程序改造

2-1. Java API Client 8.17.5

  • 是 Elasticsearch 官方新一代客户端(专为 Elasticsearch 8.x 设计,支持所有新特性(如角色权限管理、安全增强等)。

  • 长期维护:官方明确表示未来所有功能迭代和 bug 修复将集中在 Java API Client,而 RestHighLevelClient 已逐步被弃用

2-2. 依赖引入


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
            <version>2.7.0</version>
            <exclusions>
                <!-- 排除旧版本ES客户端 -->
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-high-level-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 显式添加 spring-data-elasticsearch 并排除其内部的 rest-client -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-elasticsearch</artifactId>
            <version>4.4.6</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- 改用新版API Elasticsearch 8.xx Java API Client -->
        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>8.17.5</version>  <!-- 兼容 Spring Boot 2.7.x 的最新稳定版 -->
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>8.15.3</version>  <!-- 配套的 REST 客户端 -->
        </dependency>
        <!--  作用:实现与 Elasticsearch 集群的 HTTP 通信(基于 Java 11+ 的 HttpClient)。
        必要性:必须与 elasticsearch-java 搭配使用,否则无法初始化 ElasticsearchClient 实例。
        -->
        <!-- 必须的 JSON 处理库 -->
        <dependency>
            <groupId>jakarta.json</groupId>
            <artifactId>jakarta.json-api</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.glassfish</groupId>
            <artifactId>jakarta.json</artifactId>
            <version>2.0.1</version>
        </dependency>

2-3. 配置文件

2-3-1. 主配置 application.yml

# 日志配置
log_path:
  /data/logs/personnel-manager/backend

logging:
  level:
    com.ltkj: debug
    org.springframework: warn
    org.elasticsearch: info
    org.apache.http: INFO
  config: classpath:logback-spring.xml

2-3-2. 子配置 application-dev.yml

minio:
  endpoint: http//:192.168.21.3:33306
  access-key: ltkj.personnel
  secret-key: ltkj.personnel.com
  bucketName: wangl
  url: ${minio.endpoint}/${minio.bucket}/

#elasticsearch 配置:
elasticsearch:
  address: 192.168.21.3:19200,192.168.21.18:19200  #如果是集群,用逗号隔开
  connect-timeout: 3000   #连接超时时间
  socket-timeout: 30000   #连接超时时间
  connection-request-timeout: 1000
  max-connect-num: 200
  max-connect-per-route: 200
  shards: 2
  replicas: 1
  username: elastic
  password: elastic.com

management:
  health:
    elasticsearch:
      enabled: true  # 可选:禁用健康检查

2-4. Java 配置类

/*
 * Copyright (c) 2025. ltkj.com 石家庄文旅投数智科技有限公司  All rights reserved.
 */
package com.ltkj.configuration;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.ltkj.configuration.properties.ElasticSearchProperties;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.net.ssl.SSLContext;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

@Configuration
public class ElasticSearchConfig {

    /**
     * Java API Client 8.17.5
     * 是 Elasticsearch 官方新一代客户端(专为 Elasticsearch 8.x 设计,支持所有新特性(如角色权限管理、安全增强等)。
     * 长期维护:官方明确表示未来所有功能迭代和 bug 修复将集中在 Java API Client,而 RestHighLevelClient 已逐步被弃用。
     */
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchConfig.class);


    @Bean
    public RestClient restClient(ElasticSearchProperties elasticProperties) {

        // 1. 解析地址(支持完整URI格式)
        List<HttpHost> hostList = parseHosts(elasticProperties.getAddress());
        logger.info("初始化Elasticsearch 开始>>>>>> 节点列表: {}", hostList); // 关键日志
        // 2. 构建 RestClientBuilder
        RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0]))
                .setFailureListener(new LoggingFailureListener());

        // 3. 配置超时参数(新版API推荐方式)
        configureTimeouts(builder, elasticProperties);

        // 4. 配置连接池和认证
        configureConnections(builder, elasticProperties);
        return builder.build();
    }

    @Bean(name = "elasticsearchClient")
    @ConditionalOnMissingBean
    public ElasticsearchClient client(RestClient restClient) {
        // 依赖于前面的 1、2、3、4  步骤
        // 5. 创建 Transport 和 Client
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());

        logger.info("ElasticsearchClient initializing ......");
        ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transport);

        logger.info("ElasticsearchClient initialized with nodes: {}", restClient.getNodes());
        return elasticsearchClient;
    }

    /**
     * 解析地址(支持带协议的完整格式)
     * 输入示例:http://192.168.21.3:19200,https://192.168.21.18:19200
     */
    private List<HttpHost> parseHosts(String addressStr) {
        return Arrays.stream(addressStr.split(","))
                .map(addr -> {
                    try {
                        // 自动补全协议头
                        if (!addr.startsWith("http://") && !addr.startsWith("https://")) {
                            addr = "http://" + addr; // 默认HTTP
                        }
                        URI uri = URI.create(addr);
                        return new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
                    } catch (IllegalArgumentException e) {
                        throw new IllegalArgumentException("Invalid Elasticsearch address format: " + addr);
                    }
                })
                .collect(Collectors.toList());
    }

    /**
     * 配置超时参数(适配新API)
     */
    private void configureTimeouts(RestClientBuilder builder, ElasticSearchProperties props) {
        builder.setRequestConfigCallback(requestConfigBuilder ->
                requestConfigBuilder
                        .setConnectTimeout(props.getConnectTimeOut())
                        .setSocketTimeout(props.getSocketTimeOut())
                        .setConnectionRequestTimeout(props.getConnectionRequestTimeOut())
        );
    }

    /**
     * 配置连接池和认证(同步客户端)
     */
    private void configureConnections(RestClientBuilder builder, ElasticSearchProperties props) {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(
                AuthScope.ANY,
                new UsernamePasswordCredentials(props.getUsername(), props.getPassword())
        );

        // 使用异步客户端配置
        builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                // 连接池配置
                httpClientBuilder
                        .setMaxConnTotal(props.getMaxConnectNum())
                        .setMaxConnPerRoute(props.getMaxConnectPerRoute())
                        .setDefaultCredentialsProvider(credentialsProvider);

                // 启用HTTPS时配置SSL
                if ("https".equalsIgnoreCase(props.getScheme())) {
                    configureSSL(httpClientBuilder);
                }

                return httpClientBuilder;
            }
        });
    }

    /**
     * 配置SSL(异步客户端)
     */
    private void configureSSL(HttpAsyncClientBuilder httpClientBuilder) {
        try {
            SSLContext sslContext = SSLContexts.custom()
                    .loadTrustMaterial((chain, authType) -> true) // 信任所有证书(生产环境需替换)
                    .build();

            httpClientBuilder
                    .setSSLContext(sslContext)
                    .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
        } catch (Exception e) {
            throw new RuntimeException("SSL配置失败", e);
        }
    }

    /**
     * 节点故障监听器(记录黑名单事件)
     */
    private static class LoggingFailureListener extends RestClient.FailureListener {
        @Override
        public void onFailure(Node node) {
            logger.warn("Node [{}] added to blacklist", node.getHost());
        }
    }

}

三、根据 Elasticsearch 集群信息(版本 8.17.2), Java 客户端开发的注意事项和完整集成建议:

3-1. 环境兼容性验证

1. 客户端版本匹配

确保使用的 Java API Client 版本与 Elasticsearch 服务端版本一致:

<!-- pom.xml -->
<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>8.17.2</version> <!-- 必须与ES服务端版本一致 -->
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.17.1</version> <!-- 推荐使用官方认证版本 -->
</dependency>
2. 安全认证配置

由于你的集群启用了基础认证(-u elastic:elastic.com),客户端需配置安全凭证:

RestClient restClient = RestClient.builder(new HttpHost("192.168.21.18", 19200))
    .setDefaultHeaders(new Header[]{
        new BasicHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString("elastic:elastic.com".getBytes()))
    })
    .build();

ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

3-2. 核心 API 升级指南

java
// Index 文档(自动序列化POJO)
IndexResponse response = client.index(i -> i
    .index("products")
    .id(product.getId())
    .document(product)
);

// 根据ID查询
Product product = client.get(g -> g
    .index("products")
    .id("123"),
    Product.class
).source();

// 删除文档
DeleteResponse response = client.delete(d -> d
    .index("products")
    .id("123")
);
2. 搜索请求构建
SearchResponse<Product> response = client.search(s -> s
    .index("products")
    .query(q -> q
        .bool(b -> b
            .must(m -> m.match(t -> t.field("name").query("手机")))
            .filter(f -> f.range(r -> r.field("price").gte(JsonData.of(1000)))
        )
    )
    .highlight(h -> h
        .fields("name", f -> f.preTags("<em>").postTags("</em>"))
    )
    .from(0)
    .size(10),
    Product.class
);
3. 批量操作优化
BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
products.forEach(p -> 
    bulkBuilder.operations(op -> op
        .index(idx -> idx
            .index("products")
            .id(p.getId())
            .document(p)
        )
    )
);

BulkResponse response = client.bulk(bulkBuilder.build());
response.items().forEach(item -> {
    if (item.error() != null) {
        log.error("操作失败: {}", item.error().reason());
    }
});

3-3. 关键差异处理(7.x → 8.x)

功能点 7.x 实现方式 8.x 正确实现方式
查询构建 QueryBuilders.matchQuery() Query.of(q -> q.match(m -> m.field(...)))
分页参数 .setFrom() / .setSize() .from() / .size() 链式调用
高亮配置 HighlightBuilder .highlight(h -> h.fields(...))
聚合分析 AggregationBuilders .aggregations("agg_name", a -> a.terms(...)))
异步操作 client.prepareIndex().execute() client.indexAsync(...) + CompletableFuture

3-4. 安全加固建议

1. HTTPS 加密传输
RestClient.builder(new HttpHost("192.168.21.18", 19200, "https"))
    .setHttpClientConfigCallback(hc -> hc
        .setSSLContext(sslContext) // 配置SSL上下文
        .setDefaultCredentialsProvider(credentialsProvider)
    );
2. API 密钥认证
Header apiKeyHeader = new BasicHeader("Authorization", 
    "ApiKey " + Base64.getEncoder().encodeToString("api_key_id:api_key_secret".getBytes()));

RestClient restClient = RestClient.builder(...)
    .setDefaultHeaders(new Header[]{apiKeyHeader})
    .build();

3-5. 性能调优参数

// 查询超时设置
SearchRequest request = SearchRequest.of(s -> s
    .timeout(t -> t.time("30s"))
);

// 批量操作优化
BulkRequest bulkRequest = BulkRequest.of(b -> b
    .refresh(Refresh.WaitFor) // 刷新策略
    .requireAlias(true)       // 强制通过别名写入
    .timeout(t -> t.time("1m")) 
);

3-6. 故障排查命令

  1. 集群健康检查
curl -u elastic:elastic.com http://192.168.21.18:19200/_cluster/health?pretty
  1. 索引状态分析
curl -u elastic:elastic.com http://192.168.21.18:19200/_cat/indices?v
  1. 慢查询日志
# elasticsearch.yml
index.search.slowlog.threshold.query.warn: 10s
index.search.slowlog.threshold.fetch.debug: 500ms

3-7. 迁移验证清单

  1. 客户端版本与ES服务端版本严格一致(8.17.2)
  2. 所有 TransportClient 代码已替换为 Java API Client
  3. 查询DSL已升级到类型安全构建器
  4. 安全认证(Basic Auth/API Key)已正确配置
  5. 日志系统已适配新响应格式(如错误日志记录 item.error().reason())
  6. 性能测试通过(对比7.x版本QPS、Latency等指标)

问题一

问题二 健康检查的ip:port 不对

上面 2-3 中,我的配置文件为:


elasticsearch:
  address: 192.168.21.3:19200,192.168.21.18:19200  #如果是集群,用逗号隔开
  connect-timeout: 3000   #连接超时时间
  socket-timeout: 30000   #连接超时时间
  connection-request-timeout: 1000
  max-connect-num: 200
  max-connect-per-route: 200
  shards: 2
  replicas: 1
  username: elastic
  password: elastic.com

为什么会报错如下:
2025-05-15 17:40:45.968 [RMI TCP Connection(2)-192.168.68.19] DEBUG org.elasticsearch.client.RestClient - added [[host=http://localhost:9200]] to blacklist

java.net.ConnectException: Connection refused: no further information
	at java.base/sun.nio.ch.Net.pollConnect(Native Method)
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
	at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
	at java.base/java.lang.Thread.run(Thread.java:833)
2025-05-19 09:40:56.212 [elasticsearch-rest-client-1-thread-1] DEBUG o.a.h.i.n.c.InternalHttpAsyncClient - [exchange: 1] connection request failed
2025-05-19 09:40:56.214 [RMI TCP Connection(2)-192.168.68.19] DEBUG org.elasticsearch.client.RestClient - request [GET http://localhost:9200/_cluster/health/] failed
java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused: no further information
	at org.apache.http.concurrent.BasicFuture.getResult(BasicFuture.java:71)
	at org.apache.http.concurrent.BasicFuture.get(BasicFuture.java:84)
	at org.apache.http.impl.nio.client.FutureWrapper.get(FutureWrapper.java:70)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:300)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:292)
	at org.springframework.boot.actuate.elasticsearch.ElasticsearchRestClientHealthIndicator.doHealthCheck(ElasticsearchRestClientHealthIndicator.java:60)
	at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82)
	at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37)
	at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:77)
	at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:40)
	at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:130)
	at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateContribution(HealthEndpointSupport.java:141)
	at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:126)
	at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:95)
	at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:66)
	at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:71)
	at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:61)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)
	at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:74)
	at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60)
	at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:122)
	at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:97)
	at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:814)
	at java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:802)
	at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1472)
	at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1310)
	at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1405)
	at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
	at java.base/jdk.internal.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at java.rmi/sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:360)
	at java.rmi/sun.rmi.transport.Transport$1.run(Transport.java:200)
	at java.rmi/sun.rmi.transport.Transport$1.run(Transport.java:197)
	at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
	at java.rmi/sun.rmi.transport.Transport.serviceCall(Transport.java:196)
	at java.rmi/sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:587)
	at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:828)
	at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:705)
	at java.base/java.security.AccessController.doPrivileged(AccessController.java:399)
	at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:704)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.net.ConnectException: Connection refused: no further information


问题原因分析

从错误日志可以看出,健康检查仍然尝试连接 localhost:9200,而非您配置的 192.168.21.3:19200。这是因为 Spring Boot Actuator 的 Elasticsearch 健康检查默认使用独立的客户端配置,而非您自定义的 ElasticsearchClient。以下是完整解决方案:

解决方案
  1. 禁用默认健康检查(可选)
    在 application.yml 中关闭 Elasticsearch 默认健康检查:
management:
  health:
    elasticsearch:
      enabled: false
  1. 自定义健康检查(推荐)
    创建自定义健康检查配置类,强制使用您的客户端配置:
    CustomElasticsearchHealthIndicator.java
@Component
public class CustomElasticsearchHealthIndicator extends AbstractHealthIndicator {

    private final ElasticsearchClient client;

    public CustomElasticsearchHealthIndicator(ElasticsearchClient client) {
        this.client = client;
    }

    @Override
    protected void doHealthCheck(Health.Builder builder) throws Exception {
        try {
            InfoResponse info = client.info();
            builder.up()
                .withDetail("cluster_name", info.clusterName())
                .withDetail("version", info.version().number());
        } catch (IOException e) {
            builder.down(e);
        }
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到