【中间件】使用ElasticSearch提供的RestClientAPI操作ES

发布于:2025-04-07 ⋅ 阅读:(28) ⋅ 点赞:(0)

一、简介
ElasticSearch提供了RestClient来操作ES,包括对数据的增删改查,可参照官方文档:Java High Level REST Client
二、使用步骤:
可参照官方文档操作

  1. 导包
<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.16.2</version>
        </dependency>
    </dependencies>
  1. 写配置
    官方文档Initialization中配置如下,指明了要操作的ES所在的服务器地址和端口号
RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http")));

由于我们经常用到该对象,如果每次都写一遍的话会很麻烦,代码冗余,因此我们可一在配置类中创建好该bean,并将其放到容器中,要使用时,用@Autowried自动注入就行,具体配置如下:

@Configuration
public class EsConfig {

    public static final RequestOptions COMMON_OPTIONS;
    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
//        builder.addHeader("Authorization", "Bearer " + TOKEN);
//        builder.setHttpAsyncResponseConsumerFactory(
//                new HttpAsyncResponseConsumerFactory
//                        .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
        COMMON_OPTIONS = builder.build();
    }

    @Bean
    public RestHighLevelClient restHighLevelClient(){
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("192.168.195.131", 9200, "http")));
        return client;
    }
}
  1. 业务使用
    例如,将数据批量保存至ES中,RestHighLevelClient中提供了所有对ES进行操作的方法:
@Slf4j
@Service
public class ProductSaveServiceImpl implements ProductSaveService {

    @Autowired
    RestHighLevelClient restHighLevelClient;
    @Override
    public boolean productStatusUp(List<SkuEsModel> skuEsModels) throws IOException {
        // 1、在es中建立一个索引,建立好映射关系--在es中执行put product即可
        // 2、保存数据到es   BulkRequest bulkRequest, RequestOptions options
        BulkRequest bulkRequest = new BulkRequest();
        skuEsModels.forEach(skuEsModel -> {
            IndexRequest indexRequest = new IndexRequest(EsConstant.PRODUCT_INDEX);
            indexRequest.id(skuEsModel.getSkuId().toString());
            indexRequest.source(JSON.toJSONString(skuEsModel), XContentType.JSON);
            bulkRequest.add(indexRequest);
        });
        BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, EsConfig.COMMON_OPTIONS);
        // 处理上架错误,记录到日志
        List<String> errors = Arrays.stream(bulk.getItems()).map(BulkItemResponse::getId).collect(Collectors.toList());
        log.info("商品上架完成:{}", errors);
        return bulk.hasFailures();
    }
}